Skip to main content
Open In Colab在 GitHub 上打开

如何流式传输 runnables

先决条件

本指南假定您熟悉以下概念:

流式处理对于使基于 LLM 的应用程序能够响应最终用户至关重要。

重要的 LangChain 原语(如聊天模型输出解析器提示检索器和代理)实现了 LangChain Runnable Interface

此接口提供了两种常规的流式处理内容方法:

  1. 同步stream和 asyncastream:流式处理的默认实现,用于流式处理来自链的最终输出
  2. 异步astream_events和 asyncastream_log:这些方法提供了一种从链中流式传输中间步骤最终输出的方法。

让我们看一下这两种方法,并尝试了解如何使用它们。

信息

有关 LangChain 中流式处理技术的更高级别概述,请参阅概念指南的此部分

使用 Stream

AllRunnable对象实现一个名为stream和一个名为astream.

这些方法旨在以块的形式流式传输最终输出,并在每个块可用时立即生成每个块。

只有当程序中的所有步骤都知道如何处理 input 流时,才能进行流式处理;即,一次处理一个 input 块,并产生相应的输出块。

这种处理的复杂性可能会有所不同,从简单的任务(如发出 LLM 生成的令牌)到更具挑战性的任务(如在整个 JSON 完成之前流式传输 JSON 结果的一部分)。

开始探索流媒体的最佳位置是使用 LLM 应用程序中最重要的组件 - LLM 本身!

LLM 和聊天模型

大型语言模型及其聊天变体是基于 LLM 的应用程序的主要瓶颈。

大型语言模型可能需要几秒钟才能生成对查询的完整响应。这比应用程序感觉对最终用户的响应速度要慢得多 ~200-300 毫秒的阈值。

使应用程序感觉响应更灵敏的关键策略是显示中间进度;即,逐个令牌流式传输模型令牌的输出。

我们将展示使用聊天模型进行流式传输的示例。从以下选项中选择一个:

pip install -qU "langchain[openai]"
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain.chat_models import init_chat_model

model = init_chat_model("gpt-4o-mini", model_provider="openai")

让我们从同步开始stream应用程序接口:

chunks = []
for chunk in model.stream("what color is the sky?"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)
The| sky| appears| blue| during| the| day|.|

或者,如果您在异步环境中工作,则可以考虑使用 asyncastream应用程序接口:

chunks = []
async for chunk in model.astream("what color is the sky?"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)
The| sky| appears| blue| during| the| day|.|

让我们检查其中一个 chunk

chunks[0]
AIMessageChunk(content='The', id='run-b36bea64-5511-4d7a-b6a3-a07b3db0c8e7')

我们得到了一种叫做AIMessageChunk.此块表示AIMessage.

消息块在设计上是累加的 —— 只需将它们相加即可得到到目前为止的响应状态!

chunks[0] + chunks[1] + chunks[2] + chunks[3] + chunks[4]
AIMessageChunk(content='The sky appears blue during', id='run-b36bea64-5511-4d7a-b6a3-a07b3db0c8e7')

Chains

实际上,所有 LLM 应用程序都涉及更多步骤,而不仅仅是调用语言模型。

让我们使用LangChain Expression Language (LCEL),它结合了 prompt、model 和 parser 并验证 streaming 是否有效。

我们将使用StrOutputParser来解析模型的输出。这是一个简单的解析器,用于提取content字段AIMessageChunk,为我们提供了token由模型返回。

提示

LCEL 是一种通过将不同的 LangChain 原语链接在一起来指定 “程序” 的声明性方式。使用 LCEL 创建的链受益于streamastream允许流式传输最终输出。事实上,使用 LCEL 创建的链实现了整个标准的 Runnable 接口。

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for chunk in chain.astream({"topic": "parrot"}):
print(chunk, end="|", flush=True)
Here|'s| a| joke| about| a| par|rot|:|

A man| goes| to| a| pet| shop| to| buy| a| par|rot|.| The| shop| owner| shows| him| two| stunning| pa|rr|ots| with| beautiful| pl|um|age|.|

"|There|'s| a| talking| par|rot| an|d a| non|-|talking| par|rot|,"| the| owner| says|.| "|The| talking| par|rot| costs| $|100|,| an|d the| non|-|talking| par|rot| is| $|20|."|

The| man| says|,| "|I|'ll| take| the| non|-|talking| par|rot| at| $|20|."|

He| pays| an|d leaves| with| the| par|rot|.| As| he|'s| walking| down| the| street|,| the| par|rot| looks| up| at| him| an|d says|,| "|You| know|,| you| really| are| a| stupi|d man|!"|

The| man| is| stun|ne|d an|d looks| at| the| par|rot| in| dis|bel|ief|.| The| par|rot| continues|,| "|Yes|,| you| got| r|ippe|d off| big| time|!| I| can| talk| just| as| well| as| that| other| par|rot|,| an|d you| only| pai|d $|20| |for| me|!"|

请注意,即使我们使用parser在上面链的末端。这parser单独对每个 Streaming Chunk 进行作。许多 LCEL 基元还支持这种转换式直通流,这在构建应用程序时非常方便。

自定义函数可以设计为返回生成器,这些生成器能够对流进行作。

某些可运行对象(如提示模板聊天模型)无法处理单个块,而是聚合所有前面的步骤。此类 runnable 可能会中断流式处理过程。

注意

LangChain 表达式语言允许您将链的构造与其使用模式(例如,sync/async、batch/streaming 等)分开。如果这与您正在构建的内容无关,您还可以通过以下方式依赖标准的命令式编程方法 校准invoke,batchstream在每个组件上,将结果分配给变量,然后在下游根据需要使用它们。

使用 Input Streams

如果您想在生成 JSON 时从输出中流式传输 JSON,该怎么办?

如果您要依靠json.loads要解析部分 JSON,解析将失败,因为部分 JSON 不是有效的 JSON。

您可能会完全不知道该怎么做,并声称无法流式传输 JSON。

嗯,事实证明有一种方法可以做到这一点 —— 解析器需要对 Importing 流进行作,并尝试将部分 json “自动完成”为有效状态。

让我们看看这样一个解析器的运行情况,以理解这意味着什么。

from langchain_core.output_parsers import JsonOutputParser

chain = (
model | JsonOutputParser()
) # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
async for text in chain.astream(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`"
):
print(text, flush=True)
API 参考:JsonOutputParser
{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 67}]}
{'countries': [{'name': 'France', 'population': 67413}]}
{'countries': [{'name': 'France', 'population': 67413000}]}
{'countries': [{'name': 'France', 'population': 67413000}, {}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan'}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan', 'population': 125}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan', 'population': 125584}]}
{'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351567}, {'name': 'Japan', 'population': 125584000}]}

现在,让我们打破流媒体。我们将使用前面的示例,并在末尾附加一个提取函数,该函数从最终的 JSON 中提取国家/地区名称。

警告

链中对最终输入而不是输入流作的任何步骤都可以通过以下方式中断流式处理功能streamastream.

提示

稍后,我们将讨论astream_eventsAPI 的 API,它流式传输来自中间步骤的结果。此 API 将流式传输来自中间步骤的结果,即使链包含仅对最终输入进行作的步骤。

from langchain_core.output_parsers import (
JsonOutputParser,
)


# A function that operates on finalized inputs
# rather than on an input_stream
def _extract_country_names(inputs):
"""A function that does not operates on input streams and breaks streaming."""
if not isinstance(inputs, dict):
return ""

if "countries" not in inputs:
return ""

countries = inputs["countries"]

if not isinstance(countries, list):
return ""

country_names = [
country.get("name") for country in countries if isinstance(country, dict)
]
return country_names


chain = model | JsonOutputParser() | _extract_country_names

async for text in chain.astream(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`"
):
print(text, end="|", flush=True)
API 参考:JsonOutputParser
['France', 'Spain', 'Japan']|

生成器函数

让我们使用可以对输入流进行作的生成器函数来修复流。

提示

生成器函数(使用yield) 允许编写对输入流进行作的代码

from langchain_core.output_parsers import JsonOutputParser


async def _extract_country_names_streaming(input_stream):
"""A function that operates on input streams."""
country_names_so_far = set()

async for input in input_stream:
if not isinstance(input, dict):
continue

if "countries" not in input:
continue

countries = input["countries"]

if not isinstance(countries, list):
continue

for country in countries:
name = country.get("name")
if not name:
continue
if name not in country_names_so_far:
yield name
country_names_so_far.add(name)


chain = model | JsonOutputParser() | _extract_country_names_streaming

async for text in chain.astream(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
):
print(text, end="|", flush=True)
API 参考:JsonOutputParser
France|Spain|Japan|
注意

由于上面的代码依赖于 JSON 自动完成,因此您可能会看到部分国家/地区名称(例如SpSpain),这不是人们想要的提取结果!

我们关注的是流概念,而不一定是链的结果。

非流式处理组件

一些内置组件(如 Retrievers)不提供任何streaming.如果我们尝试stream他们?🤨

from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(
["harrison worked at kensho", "harrison likes spicy food"],
embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()

chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
chunks
[[Document(page_content='harrison worked at kensho'),
Document(page_content='harrison likes spicy food')]]

Stream 只是从该组件生成最终结果。

这没关系🥹!并非所有组件都必须实现流 —— 在某些情况要么是不必要的、困难的,要么就是没有意义。

提示

使用非流式处理组件构建的 LCEL 链在很多情况下仍然能够流式处理,部分输出的流式处理在链中的最后一个非流式处理步骤之后开始。

retrieval_chain = (
{
"context": retriever.with_config(run_name="Docs"),
"question": RunnablePassthrough(),
}
| prompt
| model
| StrOutputParser()
)
for chunk in retrieval_chain.stream(
"Where did harrison work? " "Write 3 made up sentences about this place."
):
print(chunk, end="|", flush=True)
Base|d on| the| given| context|,| Harrison| worke|d at| K|ens|ho|.|

Here| are| |3| |made| up| sentences| about| this| place|:|

1|.| K|ens|ho| was| a| cutting|-|edge| technology| company| known| for| its| innovative| solutions| in| artificial| intelligence| an|d data| analytics|.|

2|.| The| modern| office| space| at| K|ens|ho| feature|d open| floor| plans|,| collaborative| work|sp|aces|,| an|d a| vib|rant| atmosphere| that| fos|tere|d creativity| an|d team|work|.|

3|.| With| its| prime| location| in| the| heart| of| the| city|,| K|ens|ho| attracte|d top| talent| from| aroun|d the| worl|d,| creating| a| diverse| an|d dynamic| work| environment|.|

现在我们已经了解了streamastream工作,让我们冒险进入流媒体活动的世界。🏞️

使用 Stream 事件

Event Streaming 是一个测试版 API。此 API 可能会根据反馈进行一些更改。

注意

本指南演示了V2API 并需要 langchain-core >= 0.2。对于V1API 兼容旧版本的 LangChain,请看这里

import langchain_core

langchain_core.__version__

对于astream_eventsAPI 正常工作:

  • async在整个代码中尽可能地(例如,异步工具等)
  • 如果定义自定义函数 / runnables,则传播回调
  • 每当使用没有 LCEL 的 runnables 时,请确保调用.astream()在 LLM 上,而不是.ainvoke强制 LLM 流式传输令牌。
  • 如果有任何事情没有按预期工作,请告诉我们!:)

事件参考

下面是一个参考表,其中显示了各种 Runnable 对象可能发出的一些事件。

注意

正确实现流式处理后,在完全消耗 Importing 流之前,无法知道 runnable 的输入。这意味着inputs通常仅包含在end事件,而不是 forstart事件。

事件名字输入输出
on_chat_model_start[model name]{"messages": [[SystemMessage, HumanMessage]]}
on_chat_model_stream[model name]AIMessageChunk(content="hello")
on_chat_model_end[model name]{"messages": [[SystemMessage, HumanMessage]]}AIMessageChunk(content="hello world")
on_llm_start[model name]{'input': 'hello'}
on_llm_stream[model name]'Hello'
on_llm_end[model name]'Hello human!'
on_chain_startformat_docs
on_chain_streamformat_docs"hello world!, goodbye world!"
on_chain_endformat_docs[Document(...)]"hello world!, goodbye world!"
on_tool_startsome_tool{"x": 1, "y": "2"}
on_tool_endsome_tool{"x": 1, "y": "2"}
on_retriever_start[retriever name]{"query": "hello"}
on_retriever_end[retriever name]{"query": "hello"}[Document(...), ..]
on_prompt_start[template_name]{"question": "hello"}
on_prompt_end[template_name]{"question": "hello"}ChatPromptValue(messages: [SystemMessage, ...])

聊天模型

让我们先看一下聊天模型生成的事件。

events = []
async for event in model.astream_events("hello"):
events.append(event)
注意

langchain-core<0.3.37中,将versionkwarg 显式的model.astream_events("hello", version="v2")).

我们来看一下 start 事件和结束事件中的几个。

events[:3]
[{'event': 'on_chat_model_start',
'data': {'input': 'hello'},
'name': 'ChatAnthropic',
'tags': [],
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'parent_ids': []},
{'event': 'on_chat_model_stream',
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-b18d016d-8b9b-49e7-a555-44db498fcf66', usage_metadata={'input_tokens': 8, 'output_tokens': 4, 'total_tokens': 12, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})},
'parent_ids': []},
{'event': 'on_chat_model_stream',
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'data': {'chunk': AIMessageChunk(content='Hello! How can', additional_kwargs={}, response_metadata={}, id='run-b18d016d-8b9b-49e7-a555-44db498fcf66')},
'parent_ids': []}]
events[-2:]
[{'event': 'on_chat_model_stream',
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={'stop_reason': 'end_turn', 'stop_sequence': None}, id='run-b18d016d-8b9b-49e7-a555-44db498fcf66', usage_metadata={'input_tokens': 0, 'output_tokens': 12, 'total_tokens': 12, 'input_token_details': {}})},
'parent_ids': []},
{'event': 'on_chat_model_end',
'data': {'output': AIMessageChunk(content='Hello! How can I assist you today?', additional_kwargs={}, response_metadata={'stop_reason': 'end_turn', 'stop_sequence': None}, id='run-b18d016d-8b9b-49e7-a555-44db498fcf66', usage_metadata={'input_tokens': 8, 'output_tokens': 16, 'total_tokens': 24, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})},
'run_id': 'b18d016d-8b9b-49e7-a555-44db498fcf66',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {'ls_provider': 'anthropic',
'ls_model_name': 'claude-3-sonnet-20240229',
'ls_model_type': 'chat',
'ls_temperature': 0.0,
'ls_max_tokens': 1024},
'parent_ids': []}]

Chains

让我们重新访问解析流式 JSON 的示例链,以探索流式事件 API。

chain = (
model | JsonOutputParser()
) # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models

events = [
event
async for event in chain.astream_events(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
)
]

如果您检查前几个事件,您会注意到有 3 个不同的 start 事件,而不是 2 个 start 事件。

这三个启动事件对应于:

  1. 链(模型 + 解析器)
  2. 模型
  3. 解析器
events[:3]
[{'event': 'on_chain_start',
'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'},
'name': 'RunnableSequence',
'tags': [],
'run_id': '4765006b-16e2-4b1d-a523-edd9fd64cb92',
'metadata': {}},
{'event': 'on_chat_model_start',
'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}},
'name': 'ChatAnthropic',
'tags': ['seq:step:1'],
'run_id': '0320c234-7b52-4a14-ae4e-5f100949e589',
'metadata': {}},
{'event': 'on_chat_model_stream',
'data': {'chunk': AIMessageChunk(content='{', id='run-0320c234-7b52-4a14-ae4e-5f100949e589')},
'run_id': '0320c234-7b52-4a14-ae4e-5f100949e589',
'name': 'ChatAnthropic',
'tags': ['seq:step:1'],
'metadata': {}}]

如果你看一下最近的 3 个事件,你认为你会看到什么?中间呢?

让我们使用此 API 从模型和解析器获取流事件的输出。我们忽略了链中的 start 事件、end 事件和 events。

num_events = 0

async for event in chain.astream_events(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
):
kind = event["event"]
if kind == "on_chat_model_stream":
print(
f"Chat model chunk: {repr(event['data']['chunk'].content)}",
flush=True,
)
if kind == "on_parser_stream":
print(f"Parser chunk: {event['data']['chunk']}", flush=True)
num_events += 1
if num_events > 30:
# Truncate the output
print("...")
break
Chat model chunk: ''
Chat model chunk: '{'
Parser chunk: {}
Chat model chunk: '\n "countries'
Chat model chunk: '": [\n '
Parser chunk: {'countries': []}
Chat model chunk: '{\n "'
Parser chunk: {'countries': [{}]}
Chat model chunk: 'name": "France'
Parser chunk: {'countries': [{'name': 'France'}]}
Chat model chunk: '",\n "'
Chat model chunk: 'population": 67'
Parser chunk: {'countries': [{'name': 'France', 'population': 67}]}
Chat model chunk: '413'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413}]}
Chat model chunk: '000\n },'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}]}
Chat model chunk: '\n {'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {}]}
Chat model chunk: '\n "name":'
...

因为模型和解析器都支持流式处理,所以我们可以实时看到来自两个组件的流式处理事件!挺酷的,不是吗?🦜

筛选事件

由于此 API 会生成如此多的事件,因此能够筛选事件非常有用。

您可以按任一组件进行筛选name元件tags或组件type.

按名称

chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
{"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
include_names=["my_parser"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_parser_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'my_parser', 'tags': ['seq:step:2'], 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'metadata': {}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': []}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France'}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}, {}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain'}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
{'event': 'on_parser_stream', 'run_id': '37ee9e85-481c-415e-863b-c9e132d24948', 'name': 'my_parser', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47}]}}, 'parent_ids': ['5a0bc625-09fd-4bdf-9932-54909a9a8c29']}
...

按类型

chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
{"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
include_types=["chat_model"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_chat_model_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'model', 'tags': ['seq:step:1'], 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c', usage_metadata={'input_tokens': 56, 'output_tokens': 1, 'total_tokens': 57, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\n "countries', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='": [\n ', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{\n "', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='name": "France', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='",\n "', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='population": 67', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='413', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='000\n },', additional_kwargs={}, response_metadata={}, id='run-156c3e40-82fb-49ff-8e41-9e998061be8c')}, 'run_id': '156c3e40-82fb-49ff-8e41-9e998061be8c', 'name': 'model', 'tags': ['seq:step:1'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['7b927055-bc1b-4b50-a34c-10d3cfcb3899']}
...

按标签

谨慎

标签由给定 runnable 的子组件继承。

如果您使用标签进行筛选,请确保这是您想要的。

chain = (model | JsonOutputParser()).with_config({"tags": ["my_chain"]})

max_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
include_tags=["my_chain"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_chain_start', 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}, 'name': 'RunnableSequence', 'tags': ['my_chain'], 'run_id': '58d1302e-36ce-4df7-a3cb-47cb73d57e44', 'metadata': {}, 'parent_ids': []}
{'event': 'on_chat_model_start', 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`', additional_kwargs={}, response_metadata={})]]}}, 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-8222e8a1-d978-4f30-87fc-b2dba838774b', usage_metadata={'input_tokens': 56, 'output_tokens': 1, 'total_tokens': 57, 'input_token_details': {'cache_creation': 0, 'cache_read': 0}})}, 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_parser_start', 'data': {}, 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'run_id': '75604c84-e1e6-494a-8b2a-950f45d932e8', 'metadata': {}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='{', additional_kwargs={}, response_metadata={}, id='run-8222e8a1-d978-4f30-87fc-b2dba838774b')}, 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_parser_stream', 'run_id': '75604c84-e1e6-494a-8b2a-950f45d932e8', 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chain_stream', 'run_id': '58d1302e-36ce-4df7-a3cb-47cb73d57e44', 'name': 'RunnableSequence', 'tags': ['my_chain'], 'metadata': {}, 'data': {'chunk': {}}, 'parent_ids': []}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='\n "countries', additional_kwargs={}, response_metadata={}, id='run-8222e8a1-d978-4f30-87fc-b2dba838774b')}, 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chat_model_stream', 'data': {'chunk': AIMessageChunk(content='": [\n ', additional_kwargs={}, response_metadata={}, id='run-8222e8a1-d978-4f30-87fc-b2dba838774b')}, 'run_id': '8222e8a1-d978-4f30-87fc-b2dba838774b', 'name': 'ChatAnthropic', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {'ls_provider': 'anthropic', 'ls_model_name': 'claude-3-sonnet-20240229', 'ls_model_type': 'chat', 'ls_temperature': 0.0, 'ls_max_tokens': 1024}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_parser_stream', 'run_id': '75604c84-e1e6-494a-8b2a-950f45d932e8', 'name': 'JsonOutputParser', 'tags': ['seq:step:2', 'my_chain'], 'metadata': {}, 'data': {'chunk': {'countries': []}}, 'parent_ids': ['58d1302e-36ce-4df7-a3cb-47cb73d57e44']}
{'event': 'on_chain_stream', 'run_id': '58d1302e-36ce-4df7-a3cb-47cb73d57e44', 'name': 'RunnableSequence', 'tags': ['my_chain'], 'metadata': {}, 'data': {'chunk': {'countries': []}}, 'parent_ids': []}
...

非流式处理组件

还记得某些组件由于不在输入流上运行而无法很好地流式传输吗?

虽然此类组件在使用astream,astream_events仍将从支持流式处理的中间步骤中生成流式处理事件!

# Function that does not support streaming.
# It operates on the finalizes inputs rather than
# operating on the input stream.
def _extract_country_names(inputs):
"""A function that does not operates on input streams and breaks streaming."""
if not isinstance(inputs, dict):
return ""

if "countries" not in inputs:
return ""

countries = inputs["countries"]

if not isinstance(countries, list):
return ""

country_names = [
country.get("name") for country in countries if isinstance(country, dict)
]
return country_names


chain = (
model | JsonOutputParser() | _extract_country_names
) # This parser only works with OpenAI right now

正如预期的那样,astreamAPI 无法正常工作,因为_extract_country_names不对流进行作。

async for chunk in chain.astream(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
):
print(chunk, flush=True)
['France', 'Spain', 'Japan']

现在,让我们确认一下,使用 astream_events我们仍然可以看到来自模型和解析器的流式输出。

num_events = 0

async for event in chain.astream_events(
"output a list of the countries france, spain and japan and their populations in JSON format. "
'Use a dict with an outer key of "countries" which contains a list of countries. '
"Each country should have the key `name` and `population`",
):
kind = event["event"]
if kind == "on_chat_model_stream":
print(
f"Chat model chunk: {repr(event['data']['chunk'].content)}",
flush=True,
)
if kind == "on_parser_stream":
print(f"Parser chunk: {event['data']['chunk']}", flush=True)
num_events += 1
if num_events > 30:
# Truncate the output
print("...")
break
Chat model chunk: ''
Chat model chunk: '{'
Parser chunk: {}
Chat model chunk: '\n "countries'
Chat model chunk: '": [\n '
Parser chunk: {'countries': []}
Chat model chunk: '{\n "'
Parser chunk: {'countries': [{}]}
Chat model chunk: 'name": "France'
Parser chunk: {'countries': [{'name': 'France'}]}
Chat model chunk: '",\n "'
Chat model chunk: 'population": 67'
Parser chunk: {'countries': [{'name': 'France', 'population': 67}]}
Chat model chunk: '413'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413}]}
Chat model chunk: '000\n },'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}]}
Chat model chunk: '\n {'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {}]}
Chat model chunk: '\n "name":'
Chat model chunk: ' "Spain",'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain'}]}
Chat model chunk: '\n "population":'
Chat model chunk: ' 47'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47}]}
Chat model chunk: '351'
Parser chunk: {'countries': [{'name': 'France', 'population': 67413000}, {'name': 'Spain', 'population': 47351}]}
...

传播回调

谨慎

如果你在工具中使用调用 runnable,则需要将回调传播到 runnable;否则,不会生成 Stream 事件。

注意

使用RunnableLambdas@chaindecorator 的 Alpha 函数,回调会在后台自动传播。

from langchain_core.runnables import RunnableLambda
from langchain_core.tools import tool


def reverse_word(word: str):
return word[::-1]


reverse_word = RunnableLambda(reverse_word)


@tool
def bad_tool(word: str):
"""Custom tool that doesn't propagate callbacks."""
return reverse_word.invoke(word)


async for event in bad_tool.astream_events("hello"):
print(event)
API 参考:RunnableLambda | 工具
{'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'bad_tool', 'tags': [], 'run_id': 'ea900472-a8f7-425d-b627-facdef936ee8', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '77b01284-0515-48f4-8d7c-eb27c1882f86', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '77b01284-0515-48f4-8d7c-eb27c1882f86', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': 'ea900472-a8f7-425d-b627-facdef936ee8', 'name': 'bad_tool', 'tags': [], 'metadata': {}}

这是一个正确传播回调的重新实现。您会注意到,现在我们从reverse_word也可以运行。

@tool
def correct_tool(word: str, callbacks):
"""A tool that correctly propagates callbacks."""
return reverse_word.invoke(word, {"callbacks": callbacks})


async for event in correct_tool.astream_events("hello"):
print(event)
{'event': 'on_tool_start', 'data': {'input': 'hello'}, 'name': 'correct_tool', 'tags': [], 'run_id': 'd5ea83b9-9278-49cc-9f1d-aa302d671040', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': 'hello'}, 'name': 'reverse_word', 'tags': [], 'run_id': '44dafbf4-2f87-412b-ae0e-9f71713810df', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': 'olleh', 'input': 'hello'}, 'run_id': '44dafbf4-2f87-412b-ae0e-9f71713810df', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_tool_end', 'data': {'output': 'olleh'}, 'run_id': 'd5ea83b9-9278-49cc-9f1d-aa302d671040', 'name': 'correct_tool', 'tags': [], 'metadata': {}}

如果要从 Runnable Lambda 或@chains,则系统会自动代表您传递回传。

from langchain_core.runnables import RunnableLambda


async def reverse_and_double(word: str):
return await reverse_word.ainvoke(word) * 2


reverse_and_double = RunnableLambda(reverse_and_double)

await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234"):
print(event)
API 参考:RunnableLambda
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': '5cf26fc8-840b-4642-98ed-623dda28707a', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': '5cf26fc8-840b-4642-98ed-623dda28707a', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_chain_stream', 'data': {'chunk': '43214321'}, 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': '03b0e6a1-3e60-42fc-8373-1e7829198d80', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}

而使用@chain装饰:

from langchain_core.runnables import chain


@chain
async def reverse_and_double(word: str):
return await reverse_word.ainvoke(word) * 2


await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234"):
print(event)
API 参考:Chains
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_and_double', 'tags': [], 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'metadata': {}}
{'event': 'on_chain_start', 'data': {'input': '1234'}, 'name': 'reverse_word', 'tags': [], 'run_id': '64fc99f0-5d7d-442b-b4f5-4537129f67d1', 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '4321', 'input': '1234'}, 'run_id': '64fc99f0-5d7d-442b-b4f5-4537129f67d1', 'name': 'reverse_word', 'tags': [], 'metadata': {}}
{'event': 'on_chain_stream', 'data': {'chunk': '43214321'}, 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}
{'event': 'on_chain_end', 'data': {'output': '43214321'}, 'run_id': '1bfcaedc-f4aa-4d8e-beee-9bba6ef17008', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}}

后续步骤

现在,您已经了解了一些使用 LangChain 流式传输最终输出和内部步骤的方法。

要了解更多信息,请查看本节中的其他操作指南,或有关 Langchain 表达式语言的概念指南