卡 夫 卡
Kafka 是一种分布式消息传递系统,用于发布和订阅记录流。
此演示展示了如何使用KafkaChatMessageHistory来存储和检索来自 Kafka 集群的聊天消息。
运行演示需要一个正在运行的 Kafka 集群。您可以按照此说明在本地创建 Kafka 集群。
from langchain_community.chat_message_histories import KafkaChatMessageHistory
chat_session_id = "chat-message-history-kafka"
bootstrap_servers = "localhost:64797" # host:port. `localhost:Plaintext Ports` if setup Kafka cluster locally
history = KafkaChatMessageHistory(
chat_session_id,
bootstrap_servers,
)
API 参考:KafkaChatMessageHistory
要构造的可选参数KafkaChatMessageHistory:
ttl_ms:聊天消息的生存时间(以毫秒为单位)。partition:用于存储聊天消息的主题的分区编号。replication_factor:用于存储聊天消息的主题的复制因子。
KafkaChatMessageHistory内部使用 Kafka consumer 读取聊天消息,并且具有持久标记消费位置的能力。它有以下方法来检索聊天消息:
messages:继续使用上一个聊天消息。messages_from_beginning:将 Consumer 重置为历史记录的开头并消费消息。可选参数:max_message_count:要阅读的最大消息数。max_time_sec:读取消息的最长时间(以秒为单位)。
messages_from_latest:将 Consumer 重置为聊天记录的末尾,并尝试消费消息。可选参数同上。messages_from_last_consumed:返回从最后使用的消息继续的消息,类似于messages,但带有可选参数。
max_message_count和max_time_sec用于避免在检索消息时无限期阻塞。
因此,messages等检索消息的方法可能无法返回聊天记录中的所有消息。您需要指定max_message_count和max_time_sec以批量检索所有聊天记录。
添加消息并检索。
history.add_user_message("hi!")
history.add_ai_message("whats up?")
history.messages
[HumanMessage(content='hi!'), AIMessage(content='whats up?')]
叫messages再次返回一个空列表,因为使用者位于聊天历史记录的末尾。
history.messages
[]
添加新消息并继续消费。
history.add_user_message("hi again!")
history.add_ai_message("whats up again?")
history.messages
[HumanMessage(content='hi again!'), AIMessage(content='whats up again?')]
要重置使用者并从头开始读取:
history.messages_from_beginning()
[HumanMessage(content='hi again!'),
AIMessage(content='whats up again?'),
HumanMessage(content='hi!'),
AIMessage(content='whats up?')]
将 consumer 设置为聊天记录的末尾,添加几条新消息,然后使用:
history.messages_from_latest()
history.add_user_message("HI!")
history.add_ai_message("WHATS UP?")
history.messages
[HumanMessage(content='HI!'), AIMessage(content='WHATS UP?')]