Skip to main content
Open In Colab在 GitHub 上打开

长期记忆代理

本教程介绍如何使用 LangGraph 实现具有长期记忆功能的代理。代理可以存储、检索和使用记忆来增强其与用户的交互。

该图受到 MemGPT 等论文的启发,并从我们自己的长期记忆工作中提炼出来,从聊天互动中提取记忆并将其保存到数据库中。本教程中的 “内存” 将以两种方式表示:

  • 代理生成的一段文本信息
  • 有关代理提取的实体的结构化信息,其形状为(subject, predicate, object)知识三倍。

稍后可以在语义上读取或查询此信息,以便在机器人响应特定用户时提供个性化上下文。

KEY 的理念是,通过保存内存,代理保留在多个对话(线程)之间共享的用户信息,这与 LangGraph 的持久性已经启用的单个对话的内存不同。

memory_graph.png

您还可以在此存储库中查看此代理的完整实现。

安装依赖项

%pip install -U --quiet langgraph langchain-openai langchain-community tiktoken
import getpass
import os


def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")


_set_env("OPENAI_API_KEY")
_set_env("TAVILY_API_KEY")
OPENAI_API_KEY:  ········
TAVILY_API_KEY: ········
import json
from typing import List, Literal, Optional

import tiktoken
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.messages import get_buffer_string
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import tool
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_openai import ChatOpenAI
from langchain_openai.embeddings import OpenAIEmbeddings
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, MessagesState, StateGraph
from langgraph.prebuilt import ToolNode

定义记忆的 vectorstore

首先,让我们定义将存储记忆的 vectorstore。Memories 将存储为 embeddings,稍后根据对话上下文进行查找。我们将使用内存中的 vectorstore。

recall_vector_store = InMemoryVectorStore(OpenAIEmbeddings())

定义工具

接下来,我们来定义我们的内存工具。我们将需要一个工具来存储记忆,另一个工具来搜索它们以找到最相关的记忆。

import uuid


def get_user_id(config: RunnableConfig) -> str:
user_id = config["configurable"].get("user_id")
if user_id is None:
raise ValueError("User ID needs to be provided to save a memory.")

return user_id


@tool
def save_recall_memory(memory: str, config: RunnableConfig) -> str:
"""Save memory to vectorstore for later semantic retrieval."""
user_id = get_user_id(config)
document = Document(
page_content=memory, id=str(uuid.uuid4()), metadata={"user_id": user_id}
)
recall_vector_store.add_documents([document])
return memory


@tool
def search_recall_memories(query: str, config: RunnableConfig) -> List[str]:
"""Search for relevant memories."""
user_id = get_user_id(config)

def _filter_function(doc: Document) -> bool:
return doc.metadata.get("user_id") == user_id

documents = recall_vector_store.similarity_search(
query, k=3, filter=_filter_function
)
return [document.page_content for document in documents]

此外,让我们的代理能够使用 Tavily 搜索 Web。

search = TavilySearchResults(max_results=1)
tools = [save_recall_memory, search_recall_memories, search]

定义状态、节点和边

我们的图形状态将只包含两个通道 ——messages用于跟踪聊天记录和recall_memories-- 在呼叫代理之前将提取并传递给代理的系统提示符的上下文记忆。

class State(MessagesState):
# add memories that will be retrieved based on the conversation context
recall_memories: List[str]
# Define the prompt template for the agent
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful assistant with advanced long-term memory"
" capabilities. Powered by a stateless LLM, you must rely on"
" external memory to store information between conversations."
" Utilize the available memory tools to store and retrieve"
" important details that will help you better attend to the user's"
" needs and understand their context.\n\n"
"Memory Usage Guidelines:\n"
"1. Actively use memory tools (save_core_memory, save_recall_memory)"
" to build a comprehensive understanding of the user.\n"
"2. Make informed suppositions and extrapolations based on stored"
" memories.\n"
"3. Regularly reflect on past interactions to identify patterns and"
" preferences.\n"
"4. Update your mental model of the user with each new piece of"
" information.\n"
"5. Cross-reference new information with existing memories for"
" consistency.\n"
"6. Prioritize storing emotional context and personal values"
" alongside facts.\n"
"7. Use memory to anticipate needs and tailor responses to the"
" user's style.\n"
"8. Recognize and acknowledge changes in the user's situation or"
" perspectives over time.\n"
"9. Leverage memories to provide personalized examples and"
" analogies.\n"
"10. Recall past challenges or successes to inform current"
" problem-solving.\n\n"
"## Recall Memories\n"
"Recall memories are contextually retrieved based on the current"
" conversation:\n{recall_memories}\n\n"
"## Instructions\n"
"Engage with the user naturally, as a trusted colleague or friend."
" There's no need to explicitly mention your memory capabilities."
" Instead, seamlessly incorporate your understanding of the user"
" into your responses. Be attentive to subtle cues and underlying"
" emotions. Adapt your communication style to match the user's"
" preferences and current emotional state. Use tools to persist"
" information you want to retain in the next conversation. If you"
" do call tools, all text preceding the tool call is an internal"
" message. Respond AFTER calling the tool, once you have"
" confirmation that the tool completed successfully.\n\n",
),
("placeholder", "{messages}"),
]
)
model = ChatOpenAI(model_name="gpt-4o")
model_with_tools = model.bind_tools(tools)

tokenizer = tiktoken.encoding_for_model("gpt-4o")


def agent(state: State) -> State:
"""Process the current state and generate a response using the LLM.

Args:
state (schemas.State): The current state of the conversation.

Returns:
schemas.State: The updated state with the agent's response.
"""
bound = prompt | model_with_tools
recall_str = (
"<recall_memory>\n" + "\n".join(state["recall_memories"]) + "\n</recall_memory>"
)
prediction = bound.invoke(
{
"messages": state["messages"],
"recall_memories": recall_str,
}
)
return {
"messages": [prediction],
}


def load_memories(state: State, config: RunnableConfig) -> State:
"""Load memories for the current conversation.

Args:
state (schemas.State): The current state of the conversation.
config (RunnableConfig): The runtime configuration for the agent.

Returns:
State: The updated state with loaded memories.
"""
convo_str = get_buffer_string(state["messages"])
convo_str = tokenizer.decode(tokenizer.encode(convo_str)[:2048])
recall_memories = search_recall_memories.invoke(convo_str, config)
return {
"recall_memories": recall_memories,
}


def route_tools(state: State):
"""Determine whether to use tools or end the conversation based on the last message.

Args:
state (schemas.State): The current state of the conversation.

Returns:
Literal["tools", "__end__"]: The next step in the graph.
"""
msg = state["messages"][-1]
if msg.tool_calls:
return "tools"

return END

构建图形

我们的代理图将与简单的 ReAct 代理非常相似。唯一重要的修改是在首次调用 agent 之前添加一个节点来加载内存。

# Create the graph and add nodes
builder = StateGraph(State)
builder.add_node(load_memories)
builder.add_node(agent)
builder.add_node("tools", ToolNode(tools))

# Add edges to the graph
builder.add_edge(START, "load_memories")
builder.add_edge("load_memories", "agent")
builder.add_conditional_edges("agent", route_tools, ["tools", END])
builder.add_edge("tools", "agent")

# Compile the graph
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
from IPython.display import Image, display

display(Image(graph.get_graph().draw_mermaid_png()))

运行代理!

让我们第一次运行代理,并告诉它有关用户的一些信息!

def pretty_print_stream_chunk(chunk):
for node, updates in chunk.items():
print(f"Update from node: {node}")
if "messages" in updates:
updates["messages"][-1].pretty_print()
else:
print(updates)

print("\n")
# NOTE: we're specifying `user_id` to save memories for a given user
config = {"configurable": {"user_id": "1", "thread_id": "1"}}

for chunk in graph.stream({"messages": [("user", "my name is John")]}, config=config):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': []}


Update from node: agent
================================== Ai Message ==================================
Tool Calls:
save_recall_memory (call_OqfbWodmrywjMnB1v3p19QLt)
Call ID: call_OqfbWodmrywjMnB1v3p19QLt
Args:
memory: User's name is John.


Update from node: tools
================================= Tool Message =================================
Name: save_recall_memory

User's name is John.


Update from node: agent
================================== Ai Message ==================================

Nice to meet you, John! How can I assist you today?

您可以看到代理保存了有关用户名的内存。让我们添加一些有关用户的更多信息!

for chunk in graph.stream({"messages": [("user", "i love pizza")]}, config=config):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ["User's name is John."]}


Update from node: agent
================================== Ai Message ==================================
Tool Calls:
save_recall_memory (call_xxEivMuWCURJrGxMZb02Eh31)
Call ID: call_xxEivMuWCURJrGxMZb02Eh31
Args:
memory: John loves pizza.


Update from node: tools
================================= Tool Message =================================
Name: save_recall_memory

John loves pizza.


Update from node: agent
================================== Ai Message ==================================

Pizza is amazing! Do you have a favorite type or topping?
for chunk in graph.stream(
{"messages": [("user", "yes -- pepperoni!")]},
config={"configurable": {"user_id": "1", "thread_id": "1"}},
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ["User's name is John.", 'John loves pizza.']}


Update from node: agent
================================== Ai Message ==================================
Tool Calls:
save_recall_memory (call_AFrtCVwIEr48Fim80zlhe6xg)
Call ID: call_AFrtCVwIEr48Fim80zlhe6xg
Args:
memory: John's favorite pizza topping is pepperoni.


Update from node: tools
================================= Tool Message =================================
Name: save_recall_memory

John's favorite pizza topping is pepperoni.


Update from node: agent
================================== Ai Message ==================================

Pepperoni is a classic choice! Do you have a favorite pizza place, or do you enjoy making it at home?
for chunk in graph.stream(
{"messages": [("user", "i also just moved to new york")]},
config={"configurable": {"user_id": "1", "thread_id": "1"}},
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ["User's name is John.", 'John loves pizza.', "John's favorite pizza topping is pepperoni."]}


Update from node: agent
================================== Ai Message ==================================
Tool Calls:
save_recall_memory (call_Na86uY9eBzaJ0sS0GM4Z9tSf)
Call ID: call_Na86uY9eBzaJ0sS0GM4Z9tSf
Args:
memory: John just moved to New York.


Update from node: tools
================================= Tool Message =================================
Name: save_recall_memory

John just moved to New York.


Update from node: agent
================================== Ai Message ==================================

Welcome to New York! That's a fantastic place for a pizza lover. Have you had a chance to explore any of the famous pizzerias there yet?

现在我们可以在不同的线程上使用保存的有关我们用户的信息。让我们试一试:

config = {"configurable": {"user_id": "1", "thread_id": "2"}}

for chunk in graph.stream(
{"messages": [("user", "where should i go for dinner?")]}, config=config
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ['John loves pizza.', "User's name is John.", 'John just moved to New York.']}


Update from node: agent
================================== Ai Message ==================================

Considering you just moved to New York and love pizza, I'd recommend checking out some of the iconic pizza places in the city. Some popular spots include:

1. **Di Fara Pizza** in Brooklyn – Known for its classic New York-style pizza.
2. **Joe's Pizza** in Greenwich Village – A historic pizzeria with a great reputation.
3. **Lucali** in Carroll Gardens, Brooklyn – Often ranked among the best for its delicious thin-crust pies.

Would you like more recommendations or information about any of these places?

请注意代理在回答之前如何加载最相关的记忆,在我们的例子中,根据食物偏好和位置建议晚餐推荐。

最后,让我们将搜索工具与对话上下文和内存的其余部分一起使用来查找比萨店的位置:

for chunk in graph.stream(
{"messages": [("user", "what's the address for joe's in greenwich village?")]},
config=config,
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ['John loves pizza.', 'John just moved to New York.', "John's favorite pizza topping is pepperoni."]}


Update from node: agent
================================== Ai Message ==================================
Tool Calls:
tavily_search_results_json (call_aespiB28jpTFvaC4d0qpfY6t)
Call ID: call_aespiB28jpTFvaC4d0qpfY6t
Args:
query: Joe's Pizza Greenwich Village NYC address


Update from node: tools
================================= Tool Message =================================
Name: tavily_search_results_json

[{"url": "https://www.joespizzanyc.com/locations-1-1", "content": "Joe's Pizza Greenwich Village (Original Location) 7 Carmine Street New York, NY 10014 (212) 366-1182 Joe's Pizza Times Square 1435 Broadway New York, NY 10018 (646) 559-4878. TIMES SQUARE MENU. ORDER JOE'S TIMES SQUARE Joe's Pizza Williamsburg 216 Bedford Avenue Brooklyn, NY 11249"}]


Update from node: agent
================================== Ai Message ==================================

The address for Joe's Pizza in Greenwich Village is:

**7 Carmine Street, New York, NY 10014**

Enjoy your pizza!

如果您要传递不同的用户 ID,则代理的响应将不会个性化,因为我们尚未保存有关其他用户的任何信息:

添加结构化记忆

到目前为止,我们已经将 memos 表示为字符串,例如"John loves pizza".这是将记忆持久化到 vector store 时的自然表示。如果您的用例将从其他持久性后端(例如图形数据库)中受益,我们可以更新我们的应用程序以生成具有其他结构的内存。

下面,我们更新save_recall_memory工具接受“知识三元组”列表,或带有subject,predicateobject,适合存储在 knolwedge 图中。然后,我们的模型将生成这些表示作为其工具调用的一部分。

为简单起见,我们使用与以前相同的向量数据库,但save_recall_memorysearch_recall_memories工具可以进一步更新以与图形数据库交互。现在,我们只需要更新save_recall_memory工具:

recall_vector_store = InMemoryVectorStore(OpenAIEmbeddings())
from typing_extensions import TypedDict


class KnowledgeTriple(TypedDict):
subject: str
predicate: str
object_: str


@tool
def save_recall_memory(memories: List[KnowledgeTriple], config: RunnableConfig) -> str:
"""Save memory to vectorstore for later semantic retrieval."""
user_id = get_user_id(config)
for memory in memories:
serialized = " ".join(memory.values())
document = Document(
serialized,
id=str(uuid.uuid4()),
metadata={
"user_id": user_id,
**memory,
},
)
recall_vector_store.add_documents([document])
return memories

然后,我们可以像以前一样编译图形:

tools = [save_recall_memory, search_recall_memories, search]
model_with_tools = model.bind_tools(tools)


# Create the graph and add nodes
builder = StateGraph(State)
builder.add_node(load_memories)
builder.add_node(agent)
builder.add_node("tools", ToolNode(tools))

# Add edges to the graph
builder.add_edge(START, "load_memories")
builder.add_edge("load_memories", "agent")
builder.add_conditional_edges("agent", route_tools, ["tools", END])
builder.add_edge("tools", "agent")

# Compile the graph
memory = MemorySaver()
graph = builder.compile(checkpointer=memory)
config = {"configurable": {"user_id": "3", "thread_id": "1"}}

for chunk in graph.stream({"messages": [("user", "Hi, I'm Alice.")]}, config=config):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': []}


Update from node: agent
================================== Ai Message ==================================

Hello, Alice! How can I assist you today?

请注意,应用程序选择从用户的陈述中提取 knowledge-triples:

for chunk in graph.stream(
{"messages": [("user", "My friend John likes Pizza.")]}, config=config
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': []}


Update from node: agent
================================== Ai Message ==================================
Tool Calls:
save_recall_memory (call_EQSZlvZLZpPa0OGS5Kyzy2Yz)
Call ID: call_EQSZlvZLZpPa0OGS5Kyzy2Yz
Args:
memories: [{'subject': 'Alice', 'predicate': 'has a friend', 'object_': 'John'}, {'subject': 'John', 'predicate': 'likes', 'object_': 'Pizza'}]


Update from node: tools
================================= Tool Message =================================
Name: save_recall_memory

[{"subject": "Alice", "predicate": "has a friend", "object_": "John"}, {"subject": "John", "predicate": "likes", "object_": "Pizza"}]


Update from node: agent
================================== Ai Message ==================================

Got it! If you need any suggestions related to pizza or anything else, feel free to ask. What else is on your mind today?

和以前一样,从一个线程生成的内存可以在另一个线程中从同一个用户访问:

config = {"configurable": {"user_id": "3", "thread_id": "2"}}

for chunk in graph.stream(
{"messages": [("user", "What food should I bring to John's party?")]}, config=config
):
pretty_print_stream_chunk(chunk)
Update from node: load_memories
{'recall_memories': ['John likes Pizza', 'Alice has a friend John']}


Update from node: agent
================================== Ai Message ==================================

Since John likes pizza, bringing some delicious pizza would be a great choice for the party. You might also consider asking if there are any specific toppings he prefers or if there are any dietary restrictions among the guests. This way, you can ensure everyone enjoys the food!

或者,为了便于说明,我们可以可视化模型提取的知识图谱:

%pip install -U --quiet matplotlib networkx
import matplotlib.pyplot as plt
import networkx as nx

# Fetch records
records = recall_vector_store.similarity_search(
"Alice", k=2, filter=lambda doc: doc.metadata["user_id"] == "3"
)


# Plot graph
plt.figure(figsize=(6, 4), dpi=80)
G = nx.DiGraph()

for record in records:
G.add_edge(
record.metadata["subject"],
record.metadata["object_"],
label=record.metadata["predicate"],
)

pos = nx.spring_layout(G)
nx.draw(
G,
pos,
with_labels=True,
node_size=3000,
node_color="lightblue",
font_size=10,
font_weight="bold",
arrows=True,
)
edge_labels = nx.get_edge_attributes(G, "label")
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels, font_color="red")
plt.show()