Skip to main content
Open In Colab在 GitHub 上打开

如何并行调用 runnables

先决条件

本指南假定您熟悉以下概念:

RunnableParallelprimitive 本质上是一个字典,其值是 runnables(或可以强制转换为 runnables 的东西,如函数)。它并行运行其所有值,并且每个值都使用RunnableParallel.最终返回值是一个 dict,每个值的结果都位于其相应的键下。

格式化RunnableParallels

RunnableParallels对于并行化作很有用,但也可用于处理一个 Runnable 的输出以匹配序列中下一个 Runnable 的输入格式。您可以使用它们来拆分或分叉链,以便多个组件可以并行处理输入。稍后,其他组件可以联接或合并结果以合成最终响应。这种类型的链会创建一个如下所示的计算图:

     Input
/ \
/ \
Branch1 Branch2
\ /
\ /
Combine

下面,prompt 的输入应该是带有键的 map"context""question".用户输入只是一个问题。因此,我们需要使用检索器获取上下文,并在"question"钥匙。

from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

vectorstore = FAISS.from_texts(
["harrison worked at kensho"], embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()
template = """Answer the question based only on the following context:
{context}

Question: {question}
"""

# The prompt expects input with keys for "context" and "question"
prompt = ChatPromptTemplate.from_template(template)

model = ChatOpenAI()

retrieval_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| model
| StrOutputParser()
)

retrieval_chain.invoke("where did harrison work?")
'Harrison worked at Kensho.'
提示

请注意,当将 RunnableParallel 与另一个 Runnable 组合在一起时,我们甚至不需要将字典包装在 RunnableParallel 类中 — 类型转换是为我们处理的。在链的上下文中,这些是等效的:

{"context": retriever, "question": RunnablePassthrough()}
RunnableParallel({"context": retriever, "question": RunnablePassthrough()})
RunnableParallel(context=retriever, question=RunnablePassthrough())

有关更多信息,请参阅有关强制的部分。

使用 itemgetter 作为简写

请注意,您可以使用 Python 的itemgetter作为 与 结合使用时从 map 中提取数据的简写RunnableParallel.您可以在 Python 文档中找到有关 itemgetter 的更多信息。

在下面的示例中,我们使用 itemgetter 从 map 中提取特定的 key:

from operator import itemgetter

from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

vectorstore = FAISS.from_texts(
["harrison worked at kensho"], embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()

template = """Answer the question based only on the following context:
{context}

Question: {question}

Answer in the following language: {language}
"""
prompt = ChatPromptTemplate.from_template(template)

chain = (
{
"context": itemgetter("question") | retriever,
"question": itemgetter("question"),
"language": itemgetter("language"),
}
| prompt
| model
| StrOutputParser()
)

chain.invoke({"question": "where did harrison work", "language": "italian"})
'Harrison ha lavorato a Kensho.'

并行化步骤

RunnableParallels 可以轻松地并行执行多个 Runnable,并将这些 Runnables 的输出作为 map 返回。

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI

model = ChatOpenAI()
joke_chain = ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
poem_chain = (
ChatPromptTemplate.from_template("write a 2-line poem about {topic}") | model
)

map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain)

map_chain.invoke({"topic": "bear"})
{'joke': AIMessage(content="Why don't bears like fast food? Because they can't catch it!", response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 13, 'total_tokens': 28}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_d9767fc5b9', 'finish_reason': 'stop', 'logprobs': None}, id='run-fe024170-c251-4b7a-bfd4-64a3737c67f2-0'),
'poem': AIMessage(content='In the quiet of the forest, the bear roams free\nMajestic and wild, a sight to see.', response_metadata={'token_usage': {'completion_tokens': 24, 'prompt_tokens': 15, 'total_tokens': 39}, 'model_name': 'gpt-3.5-turbo', 'system_fingerprint': 'fp_c2295e73ad', 'finish_reason': 'stop', 'logprobs': None}, id='run-2707913e-a743-4101-b6ec-840df4568a76-0')}

排比

RunnableParallel 对于并行运行独立进程也很有用,因为 map 中的每个 Runnable 都是并行执行的。例如,我们可以看到joke_chain,poem_chainmap_chain它们都具有大致相同的运行时,即使map_chain执行其他两个

%%timeit

joke_chain.invoke({"topic": "bear"})
610 ms ± 64 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit

poem_chain.invoke({"topic": "bear"})
599 ms ± 73.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit

map_chain.invoke({"topic": "bear"})
643 ms ± 77.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

后续步骤

您现在知道了一些格式化和并行化链步骤的方法RunnableParallel.

要了解更多信息,请参阅本节中有关 runnables 的其他操作指南。