Skip to main content
Open In ColabOpen on GitHub

从 MapReduceDocumentsChain 迁移

MapReduceDocumentsChain 实现了一种在(可能较长的)文本上的映射-归约策略。该策略如下:

  • 将文本拆分为更小的文档;
  • 将一个处理过程映射到较小的文档上;
  • 将该过程的结果减少或合并为最终结果。

请注意,映射步骤通常会对输入文档进行并行处理。

在此背景下,一个常见的处理流程是摘要生成,其中映射步骤对单个文档进行摘要,归约步骤则生成这些摘要的汇总。

在归约步骤中,MapReduceDocumentsChain 支持递归地“折叠”摘要:输入将根据标记限制进行分区,并为各个分区生成摘要。此步骤将重复执行,直到摘要的总长度在所需限制之内,从而实现对任意长度文本的摘要处理。这对于上下文窗口较小的模型尤其有用。

LangGraph 支持 映射-归约 工作流,并为此类问题带来多项优势:

  • LangGraph 允许对各个步骤(如连续的摘要生成)进行流式处理,从而实现对执行过程的更精细控制;
  • LangGraph 的 检查点 支持错误恢复,可扩展至人机协作工作流,并更轻松地集成到对话式应用中。
  • LangGraph 的实现更易于扩展,如下所示。

下面我们将分别介绍 MapReduceDocumentsChain 和相应的 LangGraph 实现,首先通过一个简单的示例进行说明,其次通过一个较长的文本示例来演示递归归约步骤。

让我们首先加载一个聊天模型:

pip install -qU "langchain[openai]"
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain.chat_models import init_chat_model

llm = init_chat_model("gpt-4o-mini", model_provider="openai")

基本示例(短文档)

让我们使用以下3个文档作为示例。

from langchain_core.documents import Document

documents = [
Document(page_content="Apples are red", metadata={"title": "apple_book"}),
Document(page_content="Blueberries are blue", metadata={"title": "blueberry_book"}),
Document(page_content="Bananas are yelow", metadata={"title": "banana_book"}),
]
API 参考:文档

旧版

详细信息

以下是使用 MapReduceDocumentsChain 的实现示例。我们为 map 和 reduce 步骤定义提示模板,为这些步骤分别实例化链,并最终实例化 MapReduceDocumentsChain

from langchain.chains import MapReduceDocumentsChain, ReduceDocumentsChain
from langchain.chains.combine_documents.stuff import StuffDocumentsChain
from langchain.chains.llm import LLMChain
from langchain_core.prompts import ChatPromptTemplate
from langchain_text_splitters import CharacterTextSplitter

# Map
map_template = "Write a concise summary of the following: {docs}."
map_prompt = ChatPromptTemplate([("human", map_template)])
map_chain = LLMChain(llm=llm, prompt=map_prompt)


# Reduce
reduce_template = """
The following is a set of summaries:
{docs}
Take these and distill it into a final, consolidated summary
of the main themes.
"""
reduce_prompt = ChatPromptTemplate([("human", reduce_template)])
reduce_chain = LLMChain(llm=llm, prompt=reduce_prompt)


# Takes a list of documents, combines them into a single string, and passes this to an LLMChain
combine_documents_chain = StuffDocumentsChain(
llm_chain=reduce_chain, document_variable_name="docs"
)

# Combines and iteratively reduces the mapped documents
reduce_documents_chain = ReduceDocumentsChain(
# This is final chain that is called.
combine_documents_chain=combine_documents_chain,
# If documents exceed context for `StuffDocumentsChain`
collapse_documents_chain=combine_documents_chain,
# The maximum number of tokens to group documents into.
token_max=1000,
)

# Combining documents by mapping a chain over them, then combining results
map_reduce_chain = MapReduceDocumentsChain(
# Map chain
llm_chain=map_chain,
# Reduce chain
reduce_documents_chain=reduce_documents_chain,
# The variable name in the llm_chain to put the documents in
document_variable_name="docs",
# Return the results of the map steps in the output
return_intermediate_steps=False,
)
result = map_reduce_chain.invoke(documents)

print(result["output_text"])
Fruits come in a variety of colors, with apples being red, blueberries being blue, and bananas being yellow.

LangSmith trace 中,我们观察到四次大语言模型调用:三次分别总结三个输入文档,一次总结这些摘要。

LangGraph

下面我们展示一个 LangGraph 实现,使用了与上述相同的提示模板。该图包含一个用于生成摘要的节点,该节点会映射到输入文档列表中的每个文档。然后,此节点会流向第二个节点,用于生成最终摘要。

详细信息

我们将需要安装 langgraph

%pip install -qU langgraph
import operator
from typing import Annotated, List, TypedDict

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langgraph.constants import Send
from langgraph.graph import END, START, StateGraph

map_template = "Write a concise summary of the following: {context}."

reduce_template = """
The following is a set of summaries:
{docs}
Take these and distill it into a final, consolidated summary
of the main themes.
"""

map_prompt = ChatPromptTemplate([("human", map_template)])
reduce_prompt = ChatPromptTemplate([("human", reduce_template)])

map_chain = map_prompt | llm | StrOutputParser()
reduce_chain = reduce_prompt | llm | StrOutputParser()

# Graph components: define the components that will make up the graph


# This will be the overall state of the main graph.
# It will contain the input document contents, corresponding
# summaries, and a final summary.
class OverallState(TypedDict):
# Notice here we use the operator.add
# This is because we want combine all the summaries we generate
# from individual nodes back into one list - this is essentially
# the "reduce" part
contents: List[str]
summaries: Annotated[list, operator.add]
final_summary: str


# This will be the state of the node that we will "map" all
# documents to in order to generate summaries
class SummaryState(TypedDict):
content: str


# Here we generate a summary, given a document
async def generate_summary(state: SummaryState):
response = await map_chain.ainvoke(state["content"])
return {"summaries": [response]}


# Here we define the logic to map out over the documents
# We will use this an edge in the graph
def map_summaries(state: OverallState):
# We will return a list of `Send` objects
# Each `Send` object consists of the name of a node in the graph
# as well as the state to send to that node
return [
Send("generate_summary", {"content": content}) for content in state["contents"]
]


# Here we will generate the final summary
async def generate_final_summary(state: OverallState):
response = await reduce_chain.ainvoke(state["summaries"])
return {"final_summary": response}


# Construct the graph: here we put everything together to construct our graph
graph = StateGraph(OverallState)
graph.add_node("generate_summary", generate_summary)
graph.add_node("generate_final_summary", generate_final_summary)
graph.add_conditional_edges(START, map_summaries, ["generate_summary"])
graph.add_edge("generate_summary", "generate_final_summary")
graph.add_edge("generate_final_summary", END)
app = graph.compile()
from IPython.display import Image

Image(app.get_graph().draw_mermaid_png())

请注意,以流式模式调用图允许我们监控各个步骤,并在执行过程中对它们采取相应操作。

# Call the graph:
async for step in app.astream({"contents": [doc.page_content for doc in documents]}):
print(step)
{'generate_summary': {'summaries': ['Apples are typically red in color.']}}
{'generate_summary': {'summaries': ['Bananas are yellow in color.']}}
{'generate_summary': {'summaries': ['Blueberries are a type of fruit that are blue in color.']}}
{'generate_final_summary': {'final_summary': 'The main themes are the colors of different fruits: apples are red, blueberries are blue, and bananas are yellow.'}}

LangSmith trace 中,我们恢复了与之前相同的四个LLM调用。

总结长文档

当文本长度超过大型语言模型(LLM)的上下文窗口时,Map-reduce 流程特别有用。MapReduceDocumentsChain 支持递归“折叠”摘要:输入根据标记限制进行分区,并为各个分区生成摘要。此步骤重复进行,直到摘要的总长度在所需限制之内,从而实现对任意长度文本的摘要处理。

这个“折叠”步骤是通过在 MapReduceDocumentsChain 中实现的 while 循环来完成的。我们可以通过一段更长的文本来演示这一步骤,例如 Lilian Weng 撰写的关于“LLM 驱动的自主代理”的博客文章(该文章出现在 RAG 教程 和其他文档中)。

首先我们加载帖子,并将其拆分为更小的“子文档”:

from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import CharacterTextSplitter

loader = WebBaseLoader("https://lilianweng.github.io/posts/2023-06-23-agent/")
documents = loader.load()

text_splitter = CharacterTextSplitter.from_tiktoken_encoder(
chunk_size=1000, chunk_overlap=0
)
split_docs = text_splitter.split_documents(documents)
print(f"Generated {len(split_docs)} documents.")
USER_AGENT environment variable not set, consider setting it to identify your requests.
Created a chunk of size 1003, which is longer than the specified 1000
``````output
Generated 14 documents.

旧版

详细信息

我们仍可以像之前一样调用 MapReduceDocumentsChain

result = map_reduce_chain.invoke(split_docs)

print(result["output_text"])
The article discusses the use of Large Language Models (LLMs) to power autonomous agents in various tasks, showcasing their capabilities in problem-solving beyond generating written content. Key components such as planning, memory optimization, and tool use are explored, with proof-of-concept demos like AutoGPT and GPT-Engineer demonstrating the potential of LLM-powered agents. Challenges include limitations in historical information retention and natural language interface reliability, while the potential of LLMs in enhancing reasoning, problem-solving, and planning proficiency for autonomous agents is highlighted. Overall, the article emphasizes the versatility and power of LLMs in creating intelligent agents for tasks like scientific discovery and experiment design.

考虑上述调用的 LangSmith trace。在实例化我们的 ReduceDocumentsChain 时,我们设置了 token_max 为 1,000 个标记。这导致总共进行了 17 次 LLM 调用:

  • 14 次调用用于总结我们文本分割器生成的 14 个子文档。
  • 这生成的摘要总共有大约1,000 - 2,000个标记。因为我们设置了token_max为1,000,所以还需要再调用两次来总结(或“折叠”)这些摘要。
  • 最后一个调用用于生成对两个“压缩”后摘要的最终总结。

LangGraph

详细信息

我们可以扩展 LangGraph 中原始的 map-reduce 实现,以实现相同的递归折叠步骤。我们做出以下更改:

  • 向状态中添加一个 collapsed_summaries 键以存储折叠后的摘要;
  • 更新最终的摘要节点,以汇总压缩后的摘要;
  • 添加一个 collapse_summaries 节点,根据标记长度(此处为1,000个标记,与之前相同)对文档列表进行分割,并生成每个分区的摘要,结果存储在 collapsed_summaries 中。

我们从 collapse_summaries 到自身添加一条条件边以形成一个循环:如果合并后的摘要总和超过 token_max,我们就重新运行该节点。

from typing import Literal

from langchain.chains.combine_documents.reduce import (
acollapse_docs,
split_list_of_docs,
)


def length_function(documents: List[Document]) -> int:
"""Get number of tokens for input contents."""
return sum(llm.get_num_tokens(doc.page_content) for doc in documents)


token_max = 1000


class OverallState(TypedDict):
contents: List[str]
summaries: Annotated[list, operator.add]
collapsed_summaries: List[Document] # add key for collapsed summaries
final_summary: str


# Add node to store summaries for collapsing
def collect_summaries(state: OverallState):
return {
"collapsed_summaries": [Document(summary) for summary in state["summaries"]]
}


# Modify final summary to read off collapsed summaries
async def generate_final_summary(state: OverallState):
response = await reduce_chain.ainvoke(state["collapsed_summaries"])
return {"final_summary": response}


graph = StateGraph(OverallState)
graph.add_node("generate_summary", generate_summary) # same as before
graph.add_node("collect_summaries", collect_summaries)
graph.add_node("generate_final_summary", generate_final_summary)


# Add node to collapse summaries
async def collapse_summaries(state: OverallState):
doc_lists = split_list_of_docs(
state["collapsed_summaries"], length_function, token_max
)
results = []
for doc_list in doc_lists:
results.append(await acollapse_docs(doc_list, reduce_chain.ainvoke))

return {"collapsed_summaries": results}


graph.add_node("collapse_summaries", collapse_summaries)


def should_collapse(
state: OverallState,
) -> Literal["collapse_summaries", "generate_final_summary"]:
num_tokens = length_function(state["collapsed_summaries"])
if num_tokens > token_max:
return "collapse_summaries"
else:
return "generate_final_summary"


graph.add_conditional_edges(START, map_summaries, ["generate_summary"])
graph.add_edge("generate_summary", "collect_summaries")
graph.add_conditional_edges("collect_summaries", should_collapse)
graph.add_conditional_edges("collapse_summaries", should_collapse)
graph.add_edge("generate_final_summary", END)
app = graph.compile()

LangGraph 可以将图结构绘制出来,以帮助可视化其功能:

from IPython.display import Image

Image(app.get_graph().draw_mermaid_png())

和之前一样,我们可以流式传输该图以观察其步骤序列。下面,我们只需打印出每一步的名称。

请注意,由于图中存在循环,指定执行的 recursion_limit 可能会有所帮助。这类似于 ReduceDocumentsChain.token_max,当超过指定限制时会引发特定错误。

async for step in app.astream(
{"contents": [doc.page_content for doc in split_docs]},
{"recursion_limit": 10},
):
print(list(step.keys()))
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['generate_summary']
['collect_summaries']
['collapse_summaries']
['generate_final_summary']
print(step)
{'generate_final_summary': {'final_summary': 'The summaries discuss the use of Large Language Models (LLMs) to power autonomous agents in various tasks such as problem-solving, planning, and tool use. Key components like planning, memory, and task decomposition are highlighted, along with challenges such as inefficient planning and hallucination. Techniques like Algorithm Distillation and Maximum Inner Product Search are explored for optimization, while frameworks like ReAct and Reflexion show improvements in knowledge-intensive tasks. The importance of accurate interpretation of user input and well-structured code for functional autonomy is emphasized, along with the potential of LLMs in prompting, reasoning, and emergent social behavior in simulation environments. Challenges in real-world scenarios and the use of LLMs with expert-designed tools for tasks like organic synthesis and drug discovery are also discussed.'}}

在相应的 LangSmith trace 中,我们可以看到与之前相同的 17 次 LLM 调用,这次它们按各自的节点分组。

下一步

查看 LangGraph 文档 了解使用 LangGraph 构建的详细信息,包括 本指南 中关于 LangGraph 中映射-归约的详细内容。

查看 此教程 了解更多的基于大型语言模型的摘要策略。