Skip to main content
Open In ColabOpen on GitHub

如何从您的RAG应用程序中流式传输结果

本指南解释了如何从 RAG 应用程序中流式传输结果。它涵盖了从最终输出中流式传输标记,以及链的中间步骤(例如,从查询重写)。

我们将基于Lilian Weng在LLM驱动的自主代理博客文章中构建的带源引用的问答应用,以及RAG教程中的内容进行开发。

设置

依赖项

我们将使用以下软件包:

%pip install --upgrade --quiet  langchain langchain-community langchainhub beautifulsoup4

LangSmith

使用 LangChain 构建的许多应用程序都包含多个步骤,以及多次调用大型语言模型(LLM)。 随着这些应用程序变得越来越复杂,能够检查链或代理内部的具体情况变得至关重要。 实现这一点的最佳方式是使用 LangSmith

请注意,LangSmith 并非必需,但使用它会有所帮助。如果您确实希望使用 LangSmith,请在上方链接注册后,确保设置您的环境变量以开始记录追踪信息:

os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_API_KEY"] = getpass.getpass()

组件

我们需要从 LangChain 的集成套件中选择三个组件。

一个 聊天模型

pip install -qU "langchain[openai]"
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain.chat_models import init_chat_model

llm = init_chat_model("gpt-4o-mini", model_provider="openai")

一个 嵌入模型

pip install -qU langchain-openai
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-3-large")

并且一个 向量存储

pip install -qU langchain-core
from langchain_core.vectorstores import InMemoryVectorStore

vector_store = InMemoryVectorStore(embeddings)

RAG应用

让我们通过Lilian Weng在基于大型语言模型的自主代理博客文章中构建的源,重新构建问答应用,该内容出自RAG 教程

首先我们对文档进行索引:

import bs4
from langchain import hub
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from typing_extensions import List, TypedDict

# Load and chunk contents of the blog
loader = WebBaseLoader(
web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),
bs_kwargs=dict(
parse_only=bs4.SoupStrainer(
class_=("post-content", "post-title", "post-header")
)
),
)
docs = loader.load()

text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
all_splits = text_splitter.split_documents(docs)
# Index chunks
_ = vector_store.add_documents(documents=all_splits)

接下来我们构建应用程序:

from langchain import hub
from langchain_core.documents import Document
from langgraph.graph import START, StateGraph
from typing_extensions import List, TypedDict

# Define prompt for question-answering
prompt = hub.pull("rlm/rag-prompt")


# Define state for application
class State(TypedDict):
question: str
context: List[Document]
answer: str


# Define application steps
def retrieve(state: State):
retrieved_docs = vector_store.similarity_search(state["question"])
return {"context": retrieved_docs}


def generate(state: State):
docs_content = "\n\n".join(doc.page_content for doc in state["context"])
messages = prompt.invoke({"question": state["question"], "context": docs_content})
response = llm.invoke(messages)
return {"answer": response.content}


# Compile application and test
graph_builder = StateGraph(State).add_sequence([retrieve, generate])
graph_builder.add_edge(START, "retrieve")
graph = graph_builder.compile()
API 参考:hub |文档 |StateGraph
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

流式输出最终结果

LangGraph 支持多种 流式传输模式,可以通过指定 stream_mode 参数进行控制。将 stream_mode="messages" 设置为开启状态,即可从聊天模型调用中流式传输标记。

通常,应用程序中可能包含多个聊天模型调用(尽管此处仅有一个)。下面,我们通过对应节点的名称来筛选,仅保留最后一步:

input_message = "What is Task Decomposition?"

for message, metadata in graph.stream(
{"question": "What is Task Decomposition?"},
stream_mode="messages",
):
if metadata["langgraph_node"] == "generate":
print(message.content, end="|")
|Task| De|composition| is| a| technique| used| to| break| down| complex| tasks| into| smaller|,| more| manageable| steps|.| It| often| involves| prompting| models| to| "|think| step| by| step|,"| allowing| for| clearer| reasoning| and| better| performance| on| intricate| problems|.| This| can| be| achieved| through| various| methods|,| including| simple| prompts|,| task|-specific| instructions|,| or| human| input|.||

流式传输中间步骤

其他流式模式通常会从调用中流式传输步骤——即各个节点的状态更新。在这种情况下,每个节点只是向状态中追加一个新键:

for step in graph.stream(
{"question": "What is Task Decomposition?"},
stream_mode="updates",
):
print(f"{step}\n\n----------------\n")
{'retrieve': {'context': [Document(id='5bf5e308-6ccb-4f09-94d2-d0c36b8c9980', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}, page_content='Fig. 1. Overview of a LLM-powered autonomous agent system.\nComponent One: Planning#\nA complicated task usually involves many steps. An agent needs to know what they are and plan ahead.\nTask Decomposition#\nChain of thought (CoT; Wei et al. 2022) has become a standard prompting technique for enhancing model performance on complex tasks. The model is instructed to “think step by step” to utilize more test-time computation to decompose hard tasks into smaller and simpler steps. CoT transforms big tasks into multiple manageable tasks and shed lights into an interpretation of the model’s thinking process.'), Document(id='d8aed221-7943-414d-8ed7-63c2b0e7523b', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}, page_content='Tree of Thoughts (Yao et al. 2023) extends CoT by exploring multiple reasoning possibilities at each step. It first decomposes the problem into multiple thought steps and generates multiple thoughts per step, creating a tree structure. The search process can be BFS (breadth-first search) or DFS (depth-first search) with each state evaluated by a classifier (via a prompt) or majority vote.\nTask decomposition can be done (1) by LLM with simple prompting like "Steps for XYZ.\\n1.", "What are the subgoals for achieving XYZ?", (2) by using task-specific instructions; e.g. "Write a story outline." for writing a novel, or (3) with human inputs.'), Document(id='bfa87007-02ef-4f81-a008-4522ecea1025', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}, page_content='Resources:\n1. Internet access for searches and information gathering.\n2. Long Term memory management.\n3. GPT-3.5 powered Agents for delegation of simple tasks.\n4. File output.\n\nPerformance Evaluation:\n1. Continuously review and analyze your actions to ensure you are performing to the best of your abilities.\n2. Constructively self-criticize your big-picture behavior constantly.\n3. Reflect on past decisions and strategies to refine your approach.\n4. Every command has a cost, so be smart and efficient. Aim to complete tasks in the least number of steps.'), Document(id='6aff7fc0-5c21-4986-9f1e-91e89715d934', metadata={'source': 'https://lilianweng.github.io/posts/2023-06-23-agent/'}, page_content="(3) Task execution: Expert models execute on the specific tasks and log results.\nInstruction:\n\nWith the input and the inference results, the AI assistant needs to describe the process and results. The previous stages can be formed as - User Input: {{ User Input }}, Task Planning: {{ Tasks }}, Model Selection: {{ Model Assignment }}, Task Execution: {{ Predictions }}. You must first answer the user's request in a straightforward manner. Then describe the task process and show your analysis and model inference results to the user in the first person. If inference results contain a file path, must tell the user the complete file path.")]}}

----------------

{'generate': {'answer': 'Task Decomposition is the process of breaking down a complex task into smaller, manageable steps to enhance understanding and execution. Techniques like Chain of Thought (CoT) and Tree of Thoughts (ToT) guide models to think through steps systematically, allowing for better problem-solving. It can be achieved through simple prompting, task-specific instructions, or human input.'}}

----------------

有关使用 LangGraph 流式传输的更多信息,请参阅其 流式传输文档。有关 LangChain 可运行对象 流式传输的更多详情,请参考 此指南