流式传输
流式传输对于提升基于 大语言模型 的应用程序的响应速度至关重要。通过在完整响应生成之前逐步显示输出,流式传输显著改善了用户体验(UX),尤其是在处理大语言模型延迟时。
概览
从大型语言模型(LLMs)生成完整响应通常会延迟几秒钟,这在需要多次调用模型的复杂应用中尤为明显。幸运的是,LLMs 会迭代生成响应,允许在生成过程中显示中间结果。通过流式传输这些中间输出,LangChain 能够在 LLM 驱动的应用中实现更流畅的用户体验,并在其设计核心内置了流式传输支持。
在本指南中,我们将讨论大型语言模型(LLM)应用程序中的流式处理,并探讨 LangChain 的流式 API 如何促进应用程序中各个组件的实时输出。
LLM应用程序中要流式传输的内容
在涉及大型语言模型(LLMs)的应用中,可以通过流式传输多种类型的数据来改善用户体验,减少感知延迟并提高透明度。这些数据包括:
1. 流式传输LLM输出
最常需要流式传输的数据是大型语言模型(LLM)自身生成的输出。LLM 通常需要一段时间才能生成完整的响应,而通过实时流式传输输出,用户可以在结果生成时看到部分结果。这能提供即时反馈,并帮助减少用户的等待时间。
2. 流式处理管道或工作流进度
除了流式传输大型语言模型(LLM)的输出之外,通过更复杂的流程或管道流式传输进度也很有用,这能让用户感受到应用程序整体的进展情况。这可能包括:
-
在 LangGraph 工作流中: 使用 LangGraph,工作流由表示各个步骤的节点和边组成。此处的流式处理涉及跟踪 图状态 的变化,当各个 节点 请求更新时进行追踪。这使得可以更细致地监控当前处于活动状态的工作流节点,并在工作流经过不同阶段时提供实时的状态更新。
-
在LCEL管道中: 从 LCEL 管道获取流式更新涉及捕获各个 子可运行对象 的进度。例如,当管道的不同步骤或组件执行时,您可以流式传输当前正在运行的子可运行对象,从而实时了解整个管道的进度。
流式处理管道或工作流程进度对于向用户提供应用程序执行过程中的清晰视图至关重要。
3. 流式传输自定义数据
在某些情况下,您可能需要流式传输超出管道或工作流结构所提供信息的自定义数据。这种自定义信息会在工作流中的特定步骤内注入,无论该步骤是工具还是LangGraph节点。例如,您可以实时流式传输工具正在执行的操作信息,或者LangGraph节点中的进度信息。这种直接从步骤内部发出的细粒度数据,为工作流的执行提供了更详细的洞察,尤其适用于需要更多可见性的复杂流程。
流式API
LangChain 有两种主要的 API,用于实时流式输出。这些 API 可由任何实现 Runnable 接口 的组件支持,包括 大型语言模型(LLMs)、编译后的 LangGraph 图,以及使用 LCEL 生成的任何 Runnable。
- 同步 流 和异步 异步流:用于在单个可运行对象(例如聊天模型)生成输出时进行流式传输,或对使用 LangGraph 创建的任何工作流进行流式传输。
- 仅异步的 astream_events:使用此 API 可以访问完全使用 LCEL 构建的 LLM 应用程序中的自定义事件和中间输出。请注意,此 API 虽然可用,但在使用 LangGraph 时并不需要。
此外,还有一个 遗留 的异步 astream_log API。此API不建议用于新项目,因为它比其他流式传输API更复杂且功能更少。
stream() 和 astream()
stream() 方法返回一个迭代器,该迭代器会同步生成输出块。您可以使用 for 循环实时处理每个块。例如,在使用 LLM 时,这允许输出在生成过程中逐步流式传输,从而减少用户的等待时间。
stream() 和 astream() 方法产生的块类型取决于正在流式传输的组件。例如,当从 LLM 流式传输时,每个组件都是 AIMessageChunk;但对于其他组件,块可能会有所不同。
stream() 方法返回一个迭代器,该迭代器在生成这些块时逐个产出它们。例如,
for chunk in component.stream(some_input):
# IMPORTANT: Keep the processing of each chunk as efficient as possible.
# While you're processing the current chunk, the upstream component is
# waiting to produce the next one. For example, if working with LangGraph,
# graph execution is paused while the current chunk is being processed.
# In extreme cases, this could even result in timeouts (e.g., when llm outputs are
# streamed from an API that has a timeout).
print(chunk)
异步版本,astream(),工作方式类似,但专为非阻塞工作流设计。你可以在异步代码中使用它,以实现相同的实时流式传输行为。
与聊天模型一起使用
当使用 stream() 或 astream() 与聊天模型配合时,输出会作为 AIMessageChunks 流式传输,这是由 LLM 生成的。这使您能够在 LLM 输出生成过程中逐步呈现或处理它,这在交互式应用程序或界面中特别有用。
与 LangGraph 一起使用
LangGraph 编译后的图是 可运行的,并支持标准的流式 API。
当使用 LangGraph 的 stream 和 astream 方法时,您可以选择 一种或多种 流式传输模式,以控制流式输出的类型。可用的流式传输模式包括:
- "values": 为每一步发出 状态 的所有值。
- "更新": 仅在每一步之后发出节点名称和节点返回的更新内容。
- "调试": 为每个步骤发出调试事件。
- "messages": 发出LLM 消息 逐标记。
- "自定义": 使用 LangGraph 的 StreamWriter 编写的自定义输出。
更多信息,请参见:
- LangGraph 流式传输概念指南,了解在使用 LangGraph 时如何进行流式传输的更多信息。
- LangGraph 流式处理教程,其中包含 LangGraph 中流式处理的具体示例。
与 LCEL 一起使用
如果您使用 LangChain 的表达式语言 (LCEL) 组合多个 Runnables,按照约定,stream() 和 astream() 方法将流式传输链中最后一步的输出。这使得最终处理结果可以逐步流式传输。LCEL 会尝试优化流水线中的流式延迟,以便尽可能快地获得最后一步的流式结果。
astream_events
对于使用 LCEL 构建的链,.stream() 方法仅流式传输链中最后一步的输出。这可能对某些应用来说已经足够,但当你构建包含多个 LLM 调用的更复杂链时,你可能希望在最终输出之外使用链的中间值。例如,在构建文档聊天应用时,你可能希望在返回最终生成结果的同时也返回来源信息。
有多种方法可以实现这一点 使用回调函数,或者通过构建链式结构,使中间值以类似串联 .assign() 调用的方式传递到最终结果,但 LangChain 还包含一个
.astream_events() 方法,它结合了回调函数的灵活性与 .stream() 的易用性。调用时,该方法会返回一个迭代器,
该迭代器会生成 各种类型的事件,您可以根据项目的需要对其进行筛选和处理。
这里有一个小示例,仅打印包含流式聊天模型输出的事件:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_anthropic import ChatAnthropic
model = ChatAnthropic(model="claude-3-sonnet-20240229")
prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser
async for event in chain.astream_events({"topic": "parrot"}):
kind = event["event"]
if kind == "on_chat_model_stream":
print(event, end="|", flush=True)
你可以将其大致理解为回调事件的迭代器(尽管格式有所不同)——并且几乎可以在所有 LangChain 组件上使用!
请参阅 此指南 以获取有关如何使用 .astream_events() 的更详细信息,包括列出可用事件的表格。
向流中写入自定义数据
要向流中写入自定义数据,您需要根据正在使用的组件选择以下方法之一:
- LangGraph的 StreamWriter 可用于编写自定义数据,这些数据将在使用 LangGraph 时通过 stream 和 astream API 显示。 重要 这是 LangGraph 的功能,因此在使用纯 LCEL 时不可用。有关更多信息,请参阅 如何流式传输自定义数据。
- dispatch_events / adispatch_events 可用于编写自定义数据,这些数据将通过 astream_events API 显示。详细了解 如何分发自定义回调事件。
“自动流式”聊天模型
LangChain 通过在某些情况下自动启用流式传输模式,简化了从 聊天模型 获取流式数据的过程,即使你没有显式调用流式传输方法也是如此。当你使用非流式 invoke 方法但仍希望流式传输整个应用程序(包括来自聊天模型的中间结果)时,这种方法特别有用。
工作原理
当你在聊天模型上调用 invoke(或 ainvoke)方法时,如果 LangChain 检测到你正在尝试流式传输整个应用程序,它会自动切换到流式模式。
在内部,它将使用 invoke(或 ainvoke)来调用 stream(或 astream)方法以生成其输出。调用的结果对使用 invoke 的代码而言是相同的;然而,在聊天模型流式传输期间,LangChain 将负责在 LangChain 的 回调系统 中触发 on_llm_new_token 事件。这些回调事件
允许 LangGraph stream/astream 和 astream_events 实时显示聊天模型的输出。
Example:
def node(state):
...
# The code below uses the invoke method, but LangChain will
# automatically switch to streaming mode
# when it detects that the overall
# application is being streamed.
ai_message = model.invoke(state["messages"])
...
for chunk in compiled_graph.stream(..., mode="messages"):
...
异步编程
LangChain 提供了许多方法的同步(sync)和异步(async)版本。异步方法通常以字母 "a" 作为前缀(例如,ainvoke,astream)。在编写异步代码时,必须始终使用这些异步方法,以确保非阻塞行为并获得最佳性能。
如果实时数据未能正常显示,请确保您正在使用适用于您工作流的正确异步方法。
请查阅 LangChain 异步编程指南,了解有关使用 LangChain 编写异步代码的更多信息。
相关资源
请参阅以下 LangChain 中流式传输的具体示例教程:
- LangGraph 流式传输概念指南
- LangGraph 流式处理入门指南
- 如何流式运行可运行对象: 本教程介绍了与LangChain组件(例如聊天模型)以及 LCEL 一起使用的常见流式传输模式。
- 如何流式传输聊天模型
- 如何流式传输工具调用
要向流中写入自定义数据,请参阅以下资源:
- 如果使用 LangGraph,请参阅 如何流式传输自定义数据。
- 如果使用 LCEL,请参阅 如何分派自定义回调事件。