Skip to main content
Open In Colab在 GitHub 上打开

卡 夫 卡

Kafka 是一种分布式消息传递系统,用于发布和订阅记录流。 此演示展示了如何使用KafkaChatMessageHistory来存储和检索来自 Kafka 集群的聊天消息。

运行演示需要一个正在运行的 Kafka 集群。您可以按照此说明在本地创建 Kafka 集群。

from langchain_community.chat_message_histories import KafkaChatMessageHistory

chat_session_id = "chat-message-history-kafka"
bootstrap_servers = "localhost:64797" # host:port. `localhost:Plaintext Ports` if setup Kafka cluster locally
history = KafkaChatMessageHistory(
chat_session_id,
bootstrap_servers,
)

要构造的可选参数KafkaChatMessageHistory:

  • ttl_ms:聊天消息的生存时间(以毫秒为单位)。
  • partition:用于存储聊天消息的主题的分区编号。
  • replication_factor:用于存储聊天消息的主题的复制因子。

KafkaChatMessageHistory内部使用 Kafka consumer 读取聊天消息,并且具有持久标记消费位置的能力。它有以下方法来检索聊天消息:

  • messages:继续使用上一个聊天消息。
  • messages_from_beginning:将 Consumer 重置为历史记录的开头并消费消息。可选参数:
    1. max_message_count:要阅读的最大消息数。
    2. max_time_sec:读取消息的最长时间(以秒为单位)。
  • messages_from_latest:将 Consumer 重置为聊天记录的末尾,并尝试消费消息。可选参数同上。
  • messages_from_last_consumed:返回从最后使用的消息继续的消息,类似于messages,但带有可选参数。

max_message_countmax_time_sec用于避免在检索消息时无限期阻塞。 因此,messages等检索消息的方法可能无法返回聊天记录中的所有消息。您需要指定max_message_countmax_time_sec以批量检索所有聊天记录。

添加消息并检索。

history.add_user_message("hi!")
history.add_ai_message("whats up?")

history.messages
[HumanMessage(content='hi!'), AIMessage(content='whats up?')]

messages再次返回一个空列表,因为使用者位于聊天历史记录的末尾。

history.messages
[]

添加新消息并继续消费。

history.add_user_message("hi again!")
history.add_ai_message("whats up again?")
history.messages
[HumanMessage(content='hi again!'), AIMessage(content='whats up again?')]

要重置使用者并从头开始读取:

history.messages_from_beginning()
[HumanMessage(content='hi again!'),
AIMessage(content='whats up again?'),
HumanMessage(content='hi!'),
AIMessage(content='whats up?')]

将 consumer 设置为聊天记录的末尾,添加几条新消息,然后使用:

history.messages_from_latest()
history.add_user_message("HI!")
history.add_ai_message("WHATS UP?")
history.messages
[HumanMessage(content='HI!'), AIMessage(content='WHATS UP?')]