Cassandra 数据库工具包
Apache Cassandra®是广泛使用的数据库,用于存储事务应用程序的数据。大型语言模型中的函数和工具的引入为现有数据在生成式AI应用中的使用打开了许多令人兴奋的应用场景。
The
Cassandra Database工具包使AI工程师能够高效地将代理与Cassandra数据集成,提供以下功能:
- 快速数据访问,通过优化查询实现。大多数查询应在单个毫秒或更短的时间内完成。
- Schema introspection 以增强大语言模型的推理能力
- 兼容各种Cassandra部署,包括Apache Cassandra®、DataStax Enterprise™和DataStax Astra™
- 当前开发工具包仅支持SELECT查询和模式内省操作。(安全第一)
对于创建Cassandra数据库代理的更多信息,请参见CQL代理烹饪书
快速开始
- 安装
cassio库 - 设置与您连接的Cassandra数据库相关的环境变量
- 初始化
CassandraDatabase - 将工具传递给您的代理,请使用
toolkit.get_tools() - 坐下来放松,看着它为你完成所有工作
理论运行原理
Cassandra Query Language (CQL) 是与 Cassandra 数据库交互的主要 以人为本 方式。虽然在生成查询时提供了一定的灵活性,但它需要了解 Cassandra 数据建模的最佳实践知识。通过 LLM 函数调用,代理能够推理并选择合适的工具来满足请求。使用 LLM 的代理应在选择合适的工具或工具链时采用特定于 Cassandra 的逻辑。这减少了当 LLM 被强制提供自顶向下的解决方案时引入的随机性。您希望让一个 LLM 完全无限制地访问您的数据库吗?是的。可能不会。为此,我们提供了在构建问题提示时使用的提示:
您是一位Apache Cassandra专家查询分析机器人,具备以下功能和规则:
- 您将从最终用户那里获取一个问题,用于在数据库中查找特定数据。
- 您将检查数据库模式并创建查询路径。
- 您将为用户提供正确的查询以找到他们正在寻找的数据,并展示由查询路径提供的步骤。
- 您将使用最佳实践来查询Apache Cassandra,利用分区键和聚簇列。
- 避免在查询中使用ALLOW FILTERING。
- 目标是找到一个查询路径,因此可能需要查询其他表才能得到最终答案。
The following is an example of a query path in JSON format:
{
"query_paths": [
{
"description": "Direct query to users table using email",
"steps": [
{
"table": "user_credentials",
"query":
"SELECT userid FROM user_credentials WHERE email = 'example@example.com';"
},
{
"table": "users",
"query": "SELECT * FROM users WHERE userid = ?;"
}
]
}
]
}
工具提供
cassandra_db_schema
连接的数据库或特定模式的所有架构信息汇总。对于代理确定行动至关重要。
cassandra_db_select_table_data
从特定的键空间和表中选择数据。代理可以传递谓词参数以及返回记录的数量限制。
cassandra_db_query
实验性的替代方案cassandra_db_select_table_data,它完全由代理生成查询字符串而不是参数。警告: 这可能会导致一些不寻常的查询,这些查询可能不会表现良好(甚至无法工作)。这在未来版本中可能会被移除。如果它真的做了一些酷的事情,我们也想了解。你永远不知道!
环境设置
安装以下Python模块:
pip install ipykernel python-dotenv cassio langchain_openai langchain langchain-community langchainhub
.env文件
连接通过使用 cassio 参数的 auto=True 进行,笔记本使用 OpenAI。你应该相应地创建一个 .env 文件。
对 Cassandra,请设置如下:
CASSANDRA_CONTACT_POINTS
CASSANDRA_USERNAME
CASSANDRA_PASSWORD
CASSANDRA_KEYSPACE
对于 Astra,设置:
ASTRA_DB_APPLICATION_TOKEN
ASTRA_DB_DATABASE_ID
ASTRA_DB_KEYSPACE
例如:
# Connection to Astra:
ASTRA_DB_DATABASE_ID=a1b2c3d4-...
ASTRA_DB_APPLICATION_TOKEN=AstraCS:...
ASTRA_DB_KEYSPACE=notebooks
# Also set
OPENAI_API_KEY=sk-....
(您也可以修改以下代码以直接连接到cassio)
from dotenv import load_dotenv
load_dotenv(override=True)
# Import necessary libraries
import os
import cassio
from langchain import hub
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_community.agent_toolkits.cassandra_database.toolkit import (
CassandraDatabaseToolkit,
)
from langchain_community.tools.cassandra_database.prompt import QUERY_PATH_PROMPT
from langchain_community.utilities.cassandra_database import CassandraDatabase
from langchain_openai import ChatOpenAI
连接到Cassandra数据库
cassio.init(auto=True)
session = cassio.config.resolve_session()
if not session:
raise Exception(
"Check environment configuration or manually configure cassio connection parameters"
)
# Test data pep
session = cassio.config.resolve_session()
session.execute("""DROP KEYSPACE IF EXISTS langchain_agent_test; """)
session.execute(
"""
CREATE KEYSPACE if not exists langchain_agent_test
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
"""
)
session.execute(
"""
CREATE TABLE IF NOT EXISTS langchain_agent_test.user_credentials (
user_email text PRIMARY KEY,
user_id UUID,
password TEXT
);
"""
)
session.execute(
"""
CREATE TABLE IF NOT EXISTS langchain_agent_test.users (
id UUID PRIMARY KEY,
name TEXT,
email TEXT
);"""
)
session.execute(
"""
CREATE TABLE IF NOT EXISTS langchain_agent_test.user_videos (
user_id UUID,
video_id UUID,
title TEXT,
description TEXT,
PRIMARY KEY (user_id, video_id)
);
"""
)
user_id = "522b1fe2-2e36-4cef-a667-cd4237d08b89"
video_id = "27066014-bad7-9f58-5a30-f63fe03718f6"
session.execute(
f"""
INSERT INTO langchain_agent_test.user_credentials (user_id, user_email)
VALUES ({user_id}, 'patrick@datastax.com');
"""
)
session.execute(
f"""
INSERT INTO langchain_agent_test.users (id, name, email)
VALUES ({user_id}, 'Patrick McFadin', 'patrick@datastax.com');
"""
)
session.execute(
f"""
INSERT INTO langchain_agent_test.user_videos (user_id, video_id, title)
VALUES ({user_id}, {video_id}, 'Use Langflow to Build a LangChain LLM Application in 5 Minutes');
"""
)
session.set_keyspace("langchain_agent_test")
# Create a CassandraDatabase instance
# Uses the cassio session to connect to the database
db = CassandraDatabase()
# Choose the LLM that will drive the agent
# Only certain models support this
llm = ChatOpenAI(temperature=0, model="gpt-4-1106-preview")
toolkit = CassandraDatabaseToolkit(db=db)
tools = toolkit.get_tools()
print("Available tools:")
for tool in tools:
print(tool.name + "\t- " + tool.description)
Available tools:
cassandra_db_schema -
Input to this tool is a keyspace name, output is a table description
of Apache Cassandra tables.
If the query is not correct, an error message will be returned.
If an error is returned, report back to the user that the keyspace
doesn't exist and stop.
cassandra_db_query -
Execute a CQL query against the database and get back the result.
If the query is not correct, an error message will be returned.
If an error is returned, rewrite the query, check the query, and try again.
cassandra_db_select_table_data -
Tool for getting data from a table in an Apache Cassandra database.
Use the WHERE clause to specify the predicate for the query that uses the
primary key. A blank predicate will return all rows. Avoid this if possible.
Use the limit to specify the number of rows to return. A blank limit will
return all rows.
prompt = hub.pull("hwchase17/openai-tools-agent")
# Construct the OpenAI Tools agent
agent = create_openai_tools_agent(llm, tools, prompt)
input = (
QUERY_PATH_PROMPT
+ "\n\nHere is your task: Find all the videos that the user with the email address 'patrick@datastax.com' has uploaded to the langchain_agent_test keyspace."
)
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
response = agent_executor.invoke({"input": input})
print(response["output"])
[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m
Invoking: `cassandra_db_schema` with `{'keyspace': 'langchain_agent_test'}`
[0m[36;1m[1;3mTable Name: user_credentials
- Keyspace: langchain_agent_test
- Columns
- password (text)
- user_email (text)
- user_id (uuid)
- Partition Keys: (user_email)
- Clustering Keys:
Table Name: user_videos
- Keyspace: langchain_agent_test
- Columns
- description (text)
- title (text)
- user_id (uuid)
- video_id (uuid)
- Partition Keys: (user_id)
- Clustering Keys: (video_id asc)
Table Name: users
- Keyspace: langchain_agent_test
- Columns
- email (text)
- id (uuid)
- name (text)
- Partition Keys: (id)
- Clustering Keys:
[0m[32;1m[1;3m
Invoking: `cassandra_db_select_table_data` with `{'keyspace': 'langchain_agent_test', 'table': 'user_credentials', 'predicate': "user_email = 'patrick@datastax.com'", 'limit': 1}`
[0m[38;5;200m[1;3mRow(user_email='patrick@datastax.com', password=None, user_id=UUID('522b1fe2-2e36-4cef-a667-cd4237d08b89'))[0m[32;1m[1;3m
Invoking: `cassandra_db_select_table_data` with `{'keyspace': 'langchain_agent_test', 'table': 'user_videos', 'predicate': 'user_id = 522b1fe2-2e36-4cef-a667-cd4237d08b89', 'limit': 10}`
[0m[38;5;200m[1;3mRow(user_id=UUID('522b1fe2-2e36-4cef-a667-cd4237d08b89'), video_id=UUID('27066014-bad7-9f58-5a30-f63fe03718f6'), description='DataStax Academy is a free resource for learning Apache Cassandra.', title='DataStax Academy')[0m[32;1m[1;3mTo find all the videos that the user with the email address 'patrick@datastax.com' has uploaded to the `langchain_agent_test` keyspace, we can follow these steps:
1. Query the `user_credentials` table to find the `user_id` associated with the email 'patrick@datastax.com'.
2. Use the `user_id` obtained from the first step to query the `user_videos` table to retrieve all the videos uploaded by the user.
Here is the query path in JSON format:
\`\`\`json
{
"query_paths": [
{
"description": "Find user_id from user_credentials and then query user_videos for all videos uploaded by the user",
"steps": [
{
"table": "user_credentials",
"query": "SELECT user_id FROM user_credentials WHERE user_email = 'patrick@datastax.com';"
},
{
"table": "user_videos",
"query": "SELECT * FROM user_videos WHERE user_id = 522b1fe2-2e36-4cef-a667-cd4237d08b89;"
}
]
}
]
}
\`\`\`
Following this query path, we found that the user with the user_id `522b1fe2-2e36-4cef-a667-cd4237d08b89` has uploaded at least one video with the title 'DataStax Academy' and the description 'DataStax Academy is a free resource for learning Apache Cassandra.' The video_id for this video is `27066014-bad7-9f58-5a30-f63fe03718f6`. If there are more videos, the same query can be used to retrieve them, possibly with an increased limit if necessary.[0m
[1m> Finished chain.[0m
To find all the videos that the user with the email address 'patrick@datastax.com' has uploaded to the `langchain_agent_test` keyspace, we can follow these steps:
1. Query the `user_credentials` table to find the `user_id` associated with the email 'patrick@datastax.com'.
2. Use the `user_id` obtained from the first step to query the `user_videos` table to retrieve all the videos uploaded by the user.
Here is the query path in JSON format:
\`\`\`json
{
"query_paths": [
{
"description": "Find user_id from user_credentials and then query user_videos for all videos uploaded by the user",
"steps": [
{
"table": "user_credentials",
"query": "SELECT user_id FROM user_credentials WHERE user_email = 'patrick@datastax.com';"
},
{
"table": "user_videos",
"query": "SELECT * FROM user_videos WHERE user_id = 522b1fe2-2e36-4cef-a667-cd4237d08b89;"
}
]
}
]
}
\`\`\`
Following this query path, we found that the user with the user_id `522b1fe2-2e36-4cef-a667-cd4237d08b89` has uploaded at least one video with the title 'DataStax Academy' and the description 'DataStax Academy is a free resource for learning Apache Cassandra.' The video_id for this video is `27066014-bad7-9f58-5a30-f63fe03718f6`. If there are more videos, the same query can be used to retrieve them, possibly with an increased limit if necessary.