长期记忆智能体
本教程展示了如何使用 LangGraph 实现具备长期记忆能力的智能体。该智能体能够存储、检索并利用记忆来增强与用户的交互体验。
受诸如MemGPT等论文的启发,并从我们关于长期记忆的研究中提炼而来,该图谱从聊天交互中提取记忆并将其持久化到数据库中。“记忆”在本教程中将以两种方式表示:
- 由代理生成的一段文本信息
- 代理提取的实体结构化信息,以
(subject, predicate, object)知识三元组的形式呈现。
此信息稍后可以被读取或语义查询,以便在你的机器人回应特定用户时提供个性化的上下文。
关键思想是,通过保存记忆,代理可以在多个对话(线程)之间保留关于用户的共享信息,这与LangGraph的持久化功能所支持的单次对话记忆不同。
你也可以在 这个仓库 中查看此代理的完整实现。
安装依赖项
%pip install -U --quiet langgraph langchain-openai langchain-community tiktoken
import getpass
import os
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("OPENAI_API_KEY")
_set_env("TAVILY_API_KEY")
OPENAI_API_KEY: ········
TAVILY_API_KEY: ········
import json
from typing import List, Literal, Optional
import tiktoken
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.messages import get_buffer_string
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import tool
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import ChatOpenAI
from langchain_openai.embeddings import OpenAIEmbeddings
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.prebuilt import ToolNode
为记忆定义向量存储
首先,让我们定义一个向量存储库,我们将在此存储记忆。记忆将以嵌入形式存储,并根据对话上下文进行查找。我们将使用内存中的向量存储库。
recall_vector_store = InMemoryVectorStore(OpenAIEmbeddings())
定义工具
接下来,让我们定义我们的记忆工具。我们需要一个工具来存储记忆,另一个工具来搜索记忆,以找到最相关的记忆。
import uuid
def get_user_id(config: RunnableConfig) -> str:
user_id = config["configurable"].get("user_id")
if user_id is None:
raise ValueError("User ID needs to be provided to save a memory.")
return user_id
@tool
def save_recall_memory(memory: str, config: RunnableConfig) -> str:
"""Save memory to vectorstore for later semantic retrieval."""
user_id = get_user_id(config)
document = Document(
page_content=memory, id=str(uuid.uuid4()), metadata={"user_id": user_id}
)
recall_vector_store.add_documents([document])
return memory
@tool
def search_recall_memories(query: str, config: RunnableConfig) -> List[str]:
"""Search for relevant memories."""
user_id = get_user_id(config)
def _filter_function(doc: Document) -> bool:
return doc.metadata.get("user_id") == user_id
documents = recall_vector_store.similarity_search(
query, k=3, filter=_filter_function
)
return [document.page_content for document in documents]
此外,让我们赋予我们的代理使用 Tavily 搜索网络的能力。
search = TavilySearchResults(max_results=1)
tools = [save_recall_memory, search_recall_memories, search]
定义状态、节点和边
我们的图状态将只包含两个通道——messages用于记录聊天历史,recall_memories用于存储上下文记忆。这些上下文记忆将在调用代理之前被提取,并传递给代理的系统提示。
class State(MessagesState):
# add memories that will be retrieved based on the conversation context
recall_memories: List[str]
# Define the prompt template for the agent
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful assistant with advanced long-term memory"
" capabilities. Powered by a stateless LLM, you must rely on"
" external memory to store information between conversations."
" Utilize the available memory tools to store and retrieve"
" important details that will help you better attend to the user's"
" needs and understand their context.\n\n"
"Memory Usage Guidelines:\n"
"1. Actively use memory tools (save_core_memory, save_recall_memory)"
" to build a comprehensive understanding of the user.\n"
"2. Make informed suppositions and extrapolations based on stored"
" memories.\n"
"3. Regularly reflect on past interactions to identify patterns and"
" preferences.\n"
"4. Update your mental model of the user with each new piece of"
" information.\n"
"5. Cross-reference new information with existing memories for"
" consistency.\n"
"6. Prioritize storing emotional context and personal values"
" alongside facts.\n"
"7. Use memory to anticipate needs and tailor responses to the"
" user's style.\n"
"8. Recognize and acknowledge changes in the user's situation or"
" perspectives over time.\n"
"9. Leverage memories to provide personalized examples and"
" analogies.\n"
"10. Recall past challenges or successes to inform current"
" problem-solving.\n\n"
"## Recall Memories\n"
"Recall memories are contextually retrieved based on the current"
" conversation:\n{recall_memories}\n\n"
"## Instructions\n"
"Engage with the user naturally, as a trusted colleague or friend."
" There's no need to explicitly mention your memory capabilities."
" Instead, seamlessly incorporate your understanding of the user"
" into your responses. Be attentive to subtle cues and underlying"
" emotions. Adapt your communication style to match the user's"
" preferences and current emotional state. Use tools to persist"
" information you want to retain in the next conversation. If you"
" do call tools, all text preceding the tool call is an internal"
" message. Respond AFTER calling the tool, once you have"
" confirmation that the tool completed successfully.\n\n",
),
("placeholder", "{messages}"),
]
)
model = ChatOpenAI(model_name="gpt-4o")
model_with_tools = model.bind_tools(tools)
tokenizer = tiktoken.encoding_for_model("gpt-4o")
def agent(state: State) -> State:
"""Process the current state and generate a response using the LLM.
Args:
state (schemas.State): The current state of the conversation.
Returns:
schemas.State: The updated state with the agent's response.
"""
bound = prompt | model_with_tools
recall_str = (
"<recall_memory>\n" + "\n".join(state["recall_memories"]) + "\n</recall_memory>"
)
prediction = bound.invoke(
{
"messages": state["messages"],
"recall_memories": recall_str,
}
)
return {
"messages": [prediction],
}
def load_memories(state: State, config: RunnableConfig) -> State:
"""Load memories for the current conversation.
Args:
state (schemas.State): The current state of the conversation.
config (RunnableConfig): The runtime configuration for the agent.
Returns:
State: The updated state with loaded memories.
"""
convo_str = get_buffer_string(state["messages"])
convo_str = tokenizer.decode(tokenizer.encode(convo_str)[:2048])
recall_memories = search_recall_memories.invoke(convo_str, config)
return {
"recall_memories": recall_memories,
}
def route_tools(state: State):
"""Determine whether to use tools or end the conversation based on the last message.
Args:
state (schemas.State): The current state of the conversation.
Returns:
Literal["tools", "__end__"]: The next step in the graph.
"""
msg = state["messages"][-1]
if msg.tool_calls:
return "tools"
return END
构建图谱
我们的代理图将与简单的 ReAct代理 非常相似。唯一重要的修改是在首次调用代理之前添加一个节点来加载记忆。
# Create the graph and add nodes
builder = StateGraph(State)
builder.add_node(load_memories)
builder.add_node(agent)
builder.add_node("tools", ToolNode(tools))
# Add edges to the graph
builder.add_edge(START, "load_memories")
builder.add_edge("load_memories", "agent")
builder.add_conditional_edges("agent", route_tools, ["tools", END])
builder.add_edge("tools", "agent")
# Compile the graph
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
运行代理!
让我们第一次运行代理,并告诉它一些关于用户的信息!
def pretty_print_stream_chunk(chunk):
for node, updates in chunk.items():
print(f"Update from node: {node}")
if "messages" in updates:
updates["messages"][-1].pretty_print()
else:
print(updates)
print("\n")
# NOTE: we're specifying `user_id` to save memories for a given user
config = {"configurable": {"user_id": "1", "thread_id": "1"}}
for chunk in graph.stream({"messages": [("user", "my name is John")]}, config=config):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': []}
Update from node: agent
==================================[1m Ai Message [0m==================================
Tool Calls:
save_recall_memory (call_OqfbWodmrywjMnB1v3p19QLt)
Call ID: call_OqfbWodmrywjMnB1v3p19QLt
Args:
memory: User's name is John.
Update from node: tools
=================================[1m Tool Message [0m=================================
Name: save_recall_memory
User's name is John.
Update from node: agent
==================================[1m Ai Message [0m==================================
Nice to meet you, John! How can I assist you today?
你可以看到,代理已经记住了用户的名字。让我们再添加一些关于用户的信息!
for chunk in graph.stream({"messages": [("user", "i love pizza")]}, config=config):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ["User's name is John."]}
Update from node: agent
==================================[1m Ai Message [0m==================================
Tool Calls:
save_recall_memory (call_xxEivMuWCURJrGxMZb02Eh31)
Call ID: call_xxEivMuWCURJrGxMZb02Eh31
Args:
memory: John loves pizza.
Update from node: tools
=================================[1m Tool Message [0m=================================
Name: save_recall_memory
John loves pizza.
Update from node: agent
==================================[1m Ai Message [0m==================================
Pizza is amazing! Do you have a favorite type or topping?
for chunk in graph.stream(
{"messages": [("user", "yes -- pepperoni!")]},
config={"configurable": {"user_id": "1", "thread_id": "1"}},
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ["User's name is John.", 'John loves pizza.']}
Update from node: agent
==================================[1m Ai Message [0m==================================
Tool Calls:
save_recall_memory (call_AFrtCVwIEr48Fim80zlhe6xg)
Call ID: call_AFrtCVwIEr48Fim80zlhe6xg
Args:
memory: John's favorite pizza topping is pepperoni.
Update from node: tools
=================================[1m Tool Message [0m=================================
Name: save_recall_memory
John's favorite pizza topping is pepperoni.
Update from node: agent
==================================[1m Ai Message [0m==================================
Pepperoni is a classic choice! Do you have a favorite pizza place, or do you enjoy making it at home?
for chunk in graph.stream(
{"messages": [("user", "i also just moved to new york")]},
config={"configurable": {"user_id": "1", "thread_id": "1"}},
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ["User's name is John.", 'John loves pizza.', "John's favorite pizza topping is pepperoni."]}
Update from node: agent
==================================[1m Ai Message [0m==================================
Tool Calls:
save_recall_memory (call_Na86uY9eBzaJ0sS0GM4Z9tSf)
Call ID: call_Na86uY9eBzaJ0sS0GM4Z9tSf
Args:
memory: John just moved to New York.
Update from node: tools
=================================[1m Tool Message [0m=================================
Name: save_recall_memory
John just moved to New York.
Update from node: agent
==================================[1m Ai Message [0m==================================
Welcome to New York! That's a fantastic place for a pizza lover. Have you had a chance to explore any of the famous pizzerias there yet?
现在,我们可以在另一个线程中使用保存的关于用户的信息。让我们试试看:
config = {"configurable": {"user_id": "1", "thread_id": "2"}}
for chunk in graph.stream(
{"messages": [("user", "where should i go for dinner?")]}, config=config
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ['John loves pizza.', "User's name is John.", 'John just moved to New York.']}
Update from node: agent
==================================[1m Ai Message [0m==================================
Considering you just moved to New York and love pizza, I'd recommend checking out some of the iconic pizza places in the city. Some popular spots include:
1. **Di Fara Pizza** in Brooklyn – Known for its classic New York-style pizza.
2. **Joe's Pizza** in Greenwich Village – A historic pizzeria with a great reputation.
3. **Lucali** in Carroll Gardens, Brooklyn – Often ranked among the best for its delicious thin-crust pies.
Would you like more recommendations or information about any of these places?
注意代理在回答之前会加载最相关的记忆,在我们的例子中,它会根据食物偏好和位置来推荐晚餐。
最后,让我们将搜索工具与对话的其余上下文和记忆结合起来,找到一家披萨店的位置:
for chunk in graph.stream(
{"messages": [("user", "what's the address for joe's in greenwich village?")]},
config=config,
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ['John loves pizza.', 'John just moved to New York.', "John's favorite pizza topping is pepperoni."]}
Update from node: agent
==================================[1m Ai Message [0m==================================
Tool Calls:
tavily_search_results_json (call_aespiB28jpTFvaC4d0qpfY6t)
Call ID: call_aespiB28jpTFvaC4d0qpfY6t
Args:
query: Joe's Pizza Greenwich Village NYC address
Update from node: tools
=================================[1m Tool Message [0m=================================
Name: tavily_search_results_json
[{"url": "https://www.joespizzanyc.com/locations-1-1", "content": "Joe's Pizza Greenwich Village (Original Location) 7 Carmine Street New York, NY 10014 (212) 366-1182 Joe's Pizza Times Square 1435 Broadway New York, NY 10018 (646) 559-4878. TIMES SQUARE MENU. ORDER JOE'S TIMES SQUARE Joe's Pizza Williamsburg 216 Bedford Avenue Brooklyn, NY 11249"}]
Update from node: agent
==================================[1m Ai Message [0m==================================
The address for Joe's Pizza in Greenwich Village is:
**7 Carmine Street, New York, NY 10014**
Enjoy your pizza!
如果你传递了一个不同的用户ID,代理的回复将不会被个性化,因为我们没有保存其他用户的信息:
添加结构化记忆
到目前为止,我们已经将记忆表示为字符串,例如 "John loves pizza"。当将记忆持久化到向量存储时,这是一种自然的表示方式。如果你的用例可以从其他持久化后端(例如图数据库)中受益,我们可以更新应用程序,以生成具有额外结构的记忆。
以下是将 save_recall_memory 工具更新为接受一组“知识三元组”,即包含 subject、predicate 和 object 的三元组,适合存储在知识图谱中。我们的模型将在其工具调用过程中生成这些表示形式。
为简化起见,我们继续使用之前的向量数据库,但可以进一步更新 save_recall_memory 和 search_recall_memories 工具,使其与图数据库交互。目前,我们只需更新 save_recall_memory 工具:
recall_vector_store = InMemoryVectorStore(OpenAIEmbeddings())
from typing_extensions import TypedDict
class KnowledgeTriple(TypedDict):
subject: str
predicate: str
object_: str
@tool
def save_recall_memory(memories: List[KnowledgeTriple], config: RunnableConfig) -> str:
"""Save memory to vectorstore for later semantic retrieval."""
user_id = get_user_id(config)
for memory in memories:
serialized = " ".join(memory.values())
document = Document(
serialized,
id=str(uuid.uuid4()),
metadata={
"user_id": user_id,
**memory,
},
)
recall_vector_store.add_documents([document])
return memories
然后我们可以像之前一样编译该图:
tools = [save_recall_memory, search_recall_memories, search]
model_with_tools = model.bind_tools(tools)
# Create the graph and add nodes
builder = StateGraph(State)
builder.add_node(load_memories)
builder.add_node(agent)
builder.add_node("tools", ToolNode(tools))
# Add edges to the graph
builder.add_edge(START, "load_memories")
builder.add_edge("load_memories", "agent")
builder.add_conditional_edges("agent", route_tools, ["tools", END])
builder.add_edge("tools", "agent")
# Compile the graph
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"user_id": "3", "thread_id": "1"}}
for chunk in graph.stream({"messages": [("user", "Hi, I'm Alice.")]}, config=config):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': []}
Update from node: agent
==================================[1m Ai Message [0m==================================
Hello, Alice! How can I assist you today?
请注意,该应用程序选择从用户的陈述中提取知识三元组:
for chunk in graph.stream(
{"messages": [("user", "My friend John likes Pizza.")]}, config=config
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': []}
Update from node: agent
==================================[1m Ai Message [0m==================================
Tool Calls:
save_recall_memory (call_EQSZlvZLZpPa0OGS5Kyzy2Yz)
Call ID: call_EQSZlvZLZpPa0OGS5Kyzy2Yz
Args:
memories: [{'subject': 'Alice', 'predicate': 'has a friend', 'object_': 'John'}, {'subject': 'John', 'predicate': 'likes', 'object_': 'Pizza'}]
Update from node: tools
=================================[1m Tool Message [0m=================================
Name: save_recall_memory
[{"subject": "Alice", "predicate": "has a friend", "object_": "John"}, {"subject": "John", "predicate": "likes", "object_": "Pizza"}]
Update from node: agent
==================================[1m Ai Message [0m==================================
Got it! If you need any suggestions related to pizza or anything else, feel free to ask. What else is on your mind today?
与之前一样,从一个线程生成的记忆可以在同一用户的另一个线程中访问:
config = {"configurable": {"user_id": "3", "thread_id": "2"}}
for chunk in graph.stream(
{"messages": [("user", "What food should I bring to John's party?")]}, config=config
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ['John likes Pizza', 'Alice has a friend John']}
Update from node: agent
==================================[1m Ai Message [0m==================================
Since John likes pizza, bringing some delicious pizza would be a great choice for the party. You might also consider asking if there are any specific toppings he prefers or if there are any dietary restrictions among the guests. This way, you can ensure everyone enjoys the food!
可选地,为了说明目的,我们可以可视化模型提取的知识图谱:
%pip install -U --quiet matplotlib networkx
import matplotlib.pyplot as plt
import networkx as nx
# Fetch records
records = recall_vector_store.similarity_search(
"Alice", k=2, filter=lambda doc: doc.metadata["user_id"] == "3"
)
# Plot graph
plt.figure(figsize=(6, 4), dpi=80)
G = nx.DiGraph()
for record in records:
G.add_edge(
record.metadata["subject"],
record.metadata["object_"],
label=record.metadata["predicate"],
)
pos = nx.spring_layout(G)
nx.draw(
G,
pos,
with_labels=True,
node_size=3000,
node_color="lightblue",
font_size=10,
font_weight="bold",
arrows=True,
)
edge_labels = nx.get_edge_attributes(G, "label")
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_color="red")
plt.show()