Skip to main content
在 GitHub 上打开

流式处理对于增强基于 LLM 构建的应用程序的响应能力至关重要。通过逐步显示输出,甚至在完整响应准备就绪之前,流式处理可以显著改善用户体验 (UX),尤其是在处理 LLM 的延迟时。

概述

LLM 生成完整响应通常会导致几秒钟的延迟,这在具有多个模型调用的复杂应用程序中变得更加明显。幸运的是,LLM 以迭代方式生成响应,从而允许在生成中间结果时显示它们。通过流式传输这些中间输出,LangChain 可在 LLM 驱动的应用程序中实现更流畅的用户体验,并在其设计核心提供对流式传输的内置支持。

在本指南中,我们将讨论 LLM 应用程序中的流式处理,并探索 LangChain 的流式处理 API 如何促进应用程序中各种组件的实时输出。

在 LLM 应用程序中流式传输的内容

在涉及 LLM 的应用程序中,可以流式传输多种类型的数据,通过减少感知延迟和提高透明度来改善用户体验。这些包括:

1. 流式处理 LLM 输出

要流式传输的最常见和最关键的数据是 LLM 本身生成的输出。LLM 通常需要一些时间来生成完整的响应,通过实时流式传输输出,用户可以在生成部分结果时看到部分结果。这提供了即时反馈,并有助于减少用户的等待时间。

2. 流式处理管道或工作流进度

除了流式传输 LLM 输出之外,通过更复杂的工作流或管道流式传输进度也很有用,让用户了解应用程序的整体进展情况。这可能包括:

  • 在 LangGraph 工作流中:使用 LangGraph,工作流由代表各种步骤的节点和边缘组成。此处的流式处理涉及在单个节点请求更新时跟踪图形状态的更改。这样可以更精细地监控工作流中当前处于活动状态的节点,从而在工作流经历不同阶段时提供有关工作流状态的实时更新。

  • 在 LCEL 管道中:来自 LCEL 管道的流式更新涉及从单个子可运行项捕获进度。例如,当管道的不同步骤或组件执行时,您可以流式传输当前正在运行的子 runnable,从而实时了解整个管道的进度。

流式处理管道或工作流进度对于为用户提供应用程序在执行过程中所处位置的清晰图片至关重要。

3. 流式处理自定义数据

在某些情况下,您可能需要流式传输超出管道或工作流结构提供的信息的自定义数据。此自定义信息将注入工作流的特定步骤中,无论该步骤是工具还是 LangGraph 节点。例如,您可以通过 LangGraph 节点实时流式传输有关工具正在做什么或进度的更新。此精细数据直接从步骤中发出,可提供有关工作流执行情况的更详细见解,在需要更多可见性的复杂流程中特别有用。

流式处理 API

LangChain 有两个主要的 API 用于实时流式传输输出。实现 Runnable 接口的任何组件都支持这些 API,包括 LLM编译的 LangGraph 图形以及使用 LCEL 生成的任何 Runnable。

  1. sync stream 和 async astream:用于在生成单个 Runnables(例如聊天模型)时流式传输输出,或流式传输使用 LangGraph 创建的任何工作流。
  2. 仅异步astream_events:使用此 API 可访问完全使用 LCEL 构建的 LLM 应用程序的自定义事件和中间输出。请注意,此 API 可用,但在使用 LangGraph 时不需要。
注意

此外,还有一个旧版异步 astream_log API。不建议将此 API 用于新项目,它比其他流式处理 API 更复杂且功能更丰富。

stream()astream()

stream()method 返回一个迭代器,该迭代器在生成输出块时同步生成输出块。您可以使用for循环实时处理每个块。例如,在使用 LLM 时,这允许在生成输出时以增量方式流式传输输出,从而减少用户的等待时间。

stream()astream()方法依赖于正在流式传输的组件。例如,当从 LLM 流式传输时,每个组件都将是一个 AIMessageChunk;但是,对于其他组件,块可能有所不同。

stream()method 返回一个迭代器,该迭代器在生成这些块时生成这些块。例如

for chunk in component.stream(some_input):
# IMPORTANT: Keep the processing of each chunk as efficient as possible.
# While you're processing the current chunk, the upstream component is
# waiting to produce the next one. For example, if working with LangGraph,
# graph execution is paused while the current chunk is being processed.
# In extreme cases, this could even result in timeouts (e.g., when llm outputs are
# streamed from an API that has a timeout).
print(chunk)

异步版本astream()的工作方式类似,但专为非阻塞工作流而设计。您可以在异步代码中使用它来实现相同的实时流式处理行为。

与聊天模型一起使用

使用stream()astream()对于聊天模型,输出将作为 AIMessageChunks 流式传输,因为它是由 LLM 生成的。这允许您在生成 LLM 的输出时以增量方式呈现或处理 LLM 的输出,这在交互式应用程序或界面中特别有用。

与 LangGraph 一起使用

LangGraph 编译的图是 Runnables 并支持标准的流式 API。

在 LangGraph 中使用 streamastream 方法时,您可以选择一个或多个流模式,这些模式允许您控制流式输出的类型。可用的流媒体模式包括:

  • “values”:发出每个步骤的所有 state 值。
  • “updates”:仅发出节点名称以及每个步骤后节点返回的更新。
  • “debug”:为每个步骤发出调试事件。
  • “messages”逐个 Token 发出 LLM 消息
  • “custom”: 使用 LangGraph 的 StreamWriter 编写的自定义输出。

有关更多信息,请参阅:

与 LCEL 一起使用

如果您使用 LangChain 的表达式语言 (LCEL) 组合多个 Runnable,则stream()astream()按照惯例,方法将流式传输链中最后一步的输出。这允许以增量方式流式传输最终处理的结果。LCEL 会尝试优化管道中的流式处理延迟,以便尽快提供最后一步的流式处理结果。

astream_events

提示

使用astream_events用于访问完全使用 LCEL 构建的 LLM 应用程序的自定义数据和中间输出的 API。

虽然此 API 也可用于 LangGraph,但在使用 LangGraph 时通常不是必需的,因为streamastream方法为 LangGraph 图提供了全面的流式处理功能。

对于使用 LCEL 构建的链,.stream()method 仅从链中流式传输最后一步的输出。这对于某些应用程序来说可能就足够了,但是当您一起构建多个 LLM 调用的更复杂的链时,您可能希望将链的中间值与最终输出一起使用。例如,在构建 chat-over-documents 应用程序时,您可能希望将源与最后一代一起返回。

有一些方法可以使用回调来做到这一点,或者以通过中间的方式构建你的链 values 的末尾替换为 chained.assign()调用,但 LangChain 还包含一个.astream_events()方法,该方法将回调的灵活性与.stream().调用时,它返回一个迭代器 生成各种类型的事件,您可以根据这些事件进行筛选和处理 以满足您的项目需求。

下面是一个小示例,它仅打印包含流式聊天模型输出的事件:

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-3-sonnet-20240229")

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for event in chain.astream_events({"topic": "parrot"}):
kind = event["event"]
if kind == "on_chat_model_stream":
print(event, end="|", flush=True)

你可以粗略地把它看作是回调事件的迭代器(尽管格式不同)——你几乎可以在所有 LangChain 组件上使用它!

有关如何使用的更多详细信息,请参阅本指南.astream_events(),包括列出可用事件的表。

将自定义数据写入流

要将自定义数据写入流,您需要根据正在使用的组件选择以下方法之一:

  1. LangGraph 的 StreamWriter 可用于编写自定义数据,在使用 LangGraph 时,这些数据将通过 streamastream API 显示。重要这是 LangGraph 功能,因此在使用纯 LCEL 时不可用。有关更多信息,请参阅如何流式传输自定义数据
  2. dispatch_events / adispatch_events 可用于编写将通过 astream_events API 显示的自定义数据。有关更多信息,请参阅 如何调度自定义回调事件。

“自动流式传输”聊天模型

LangChain 通过在某些情况下自动启用流式处理模式来简化聊天模型的流式处理,即使您没有显式调用流式处理方法也是如此。当您使用非流式处理invoke方法,但仍希望流式传输整个应用程序,包括来自 Chat 模型的中间结果。

如何运作

当您调用invoke(或ainvoke) 方法,如果 LangChain 检测到你正在尝试流式传输整个应用程序,它将自动切换到流式传输模式。

在引擎盖下,它将具有invoke(或ainvoke) 使用stream(或astream) 方法来生成其输出。调用的结果将与使用invoke令人担忧;但是,当聊天模型被流式传输时,LangChain 将负责调用on_llm_new_token事件这些回调事件 允许 LangGraphstream/astreamastream_events以实时显示聊天模型的输出。

例:

def node(state):
...
# The code below uses the invoke method, but LangChain will
# automatically switch to streaming mode
# when it detects that the overall
# application is being streamed.
ai_message = model.invoke(state["messages"])
...

for chunk in compiled_graph.stream(..., mode="messages"):
...

异步编程

LangChain 提供了许多方法的同步 (sync) 和异步 (async) 版本。异步方法通常以 “a” 为前缀(例如ainvoke,astream).在编写异步代码时,始终使用这些异步方法以确保非阻塞行为和最佳性能至关重要。

如果流数据无法实时显示,请确保您为工作流使用正确的异步方法。

有关使用 LangChain 编写异步代码的更多信息,请查看 LangChain 中的异步编程指南

请参阅以下操作指南,了解 LangChain 中流式传输的具体示例:

有关将自定义数据写入流的信息,请参阅以下资源: