Skip to main content
Open In ColabOpen on GitHub

Apache Cassandra

此页面提供了将Apache Cassandra®用作向量存储的快速入门。

Cassandra 是一个 NoSQL、行导向的、高度可扩展和可用的数据库。从 5.0 版本开始,该数据库自带 向量搜索功能

注: 除了数据库访问权限,还需要一个OpenAI API密钥才能运行完整示例。

Setup and general dependencies

使用集成需要以下Python包。

%pip install --upgrade --quiet langchain-community "cassio>=0.1.4"

注意:根据您的LangChain配置,您可能需要安装/升级此演示所需的其他依赖项。 (具体而言,需要版本datasetsopenaipypdftiktoken的最新版本,以及langchain-community。)

import os
from getpass import getpass

from datasets import (
load_dataset,
)
from langchain_community.document_loaders import PyPDFLoader
from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass("OPENAI_API_KEY = ")
embe = OpenAIEmbeddings()

导入向量存储

from langchain_community.vectorstores import Cassandra
API 参考:Cassandra

连接参数

此页面展示的向量存储集成可以与Cassandra以及其他使用CQL(Cassandra查询语言)协议的派生数据库(例如Astra DB)一起使用。

DataStax Astra DB 是一种基于 Cassandra 构建的托管无服务器数据库,提供相同的接口和优势。

根据您是通过Cassandra集群的CQL连接还是通过Astra DB连接,创建向量存储对象时将提供不同的参数。

连接到Cassandra集群

您首先需要创建一个cassandra.cluster.Session对象,如Cassandra驱动程序文档中所述。详细信息会有所不同(例如网络设置和身份验证),但这可能类似于:

from cassandra.cluster import Cluster

cluster = Cluster(["127.0.0.1"])
session = cluster.connect()

您现在可以将会话设置为全局 CassIO 参数,同时指定所需的键空间名称:

import cassio

CASSANDRA_KEYSPACE = input("CASSANDRA_KEYSPACE = ")

cassio.init(session=session, keyspace=CASSANDRA_KEYSPACE)

现在你可以创建向量存储:

vstore = Cassandra(
embedding=embe,
table_name="cassandra_vector_demo",
# session=None, keyspace=None # Uncomment on older versions of LangChain
)

Note: 您也可以在创建向量存储时直接传递会话和密钥空间作为参数。然而,使用全局 cassio.init 设置在您的应用程序以多种方式使用 Cassandra(例如,用于向量存储、聊天记忆和LLM响应缓存)时非常方便,因为它允许在一个地方集中管理凭证和数据库连接。

连接到Astra DB 通过 CQL

在本例中,您使用以下连接参数初始化 CassIO:

  • the Database ID, e.g. 01234567-89ab-cdef-0123-456789abcdef
  • 该Token,例如AstraCS:6gBhNmsk135....(它必须是“数据库管理员”令牌)
  • 可选地指定一个Keyspace名称(如果省略,则将使用数据库的默认Namespace)
ASTRA_DB_ID = input("ASTRA_DB_ID = ")
ASTRA_DB_APPLICATION_TOKEN = getpass("ASTRA_DB_APPLICATION_TOKEN = ")

desired_keyspace = input("ASTRA_DB_KEYSPACE (optional, can be left empty) = ")
if desired_keyspace:
ASTRA_DB_KEYSPACE = desired_keyspace
else:
ASTRA_DB_KEYSPACE = None
import cassio

cassio.init(
database_id=ASTRA_DB_ID,
token=ASTRA_DB_APPLICATION_TOKEN,
keyspace=ASTRA_DB_KEYSPACE,
)

现在你可以创建向量存储:

vstore = Cassandra(
embedding=embe,
table_name="cassandra_vector_demo",
# session=None, keyspace=None # Uncomment on older versions of LangChain
)

加载数据集

将源数据集中的每个条目转换为Document,然后将其写入向量存储中:

philo_dataset = load_dataset("datastax/philosopher-quotes")["train"]

docs = []
for entry in philo_dataset:
metadata = {"author": entry["author"]}
doc = Document(page_content=entry["quote"], metadata=metadata)
docs.append(doc)

inserted_ids = vstore.add_documents(docs)
print(f"\nInserted {len(inserted_ids)} documents.")

在上面,metadata 个字典从源数据中创建,并且是 Document 的一部分。

添加一些更多的条目,这次使用add_texts:

texts = ["I think, therefore I am.", "To the things themselves!"]
metadatas = [{"author": "descartes"}, {"author": "husserl"}]
ids = ["desc_01", "huss_xy"]

inserted_ids_2 = vstore.add_texts(texts=texts, metadatas=metadatas, ids=ids)
print(f"\nInserted {len(inserted_ids_2)} documents.")

注:您可能希望通过增加并发级别来加快add_textsadd_documents的执行速度。 这些批量操作,请查看方法的batch_size参数 对于更多详细信息。根据网络情况和客户端机器的规格,您最佳的选择的参数可能会有所不同。

运行搜索

这节内容展示了元数据过滤以及获取相似度评分:

results = vstore.similarity_search("Our life is what we make of it", k=3)
for res in results:
print(f"* {res.page_content} [{res.metadata}]")
results_filtered = vstore.similarity_search(
"Our life is what we make of it",
k=3,
filter={"author": "plato"},
)
for res in results_filtered:
print(f"* {res.page_content} [{res.metadata}]")
results = vstore.similarity_search_with_score("Our life is what we make of it", k=3)
for res, score in results:
print(f"* [SIM={score:3f}] {res.page_content} [{res.metadata}]")
results = vstore.max_marginal_relevance_search(
"Our life is what we make of it",
k=3,
filter={"author": "aristotle"},
)
for res in results:
print(f"* {res.page_content} [{res.metadata}]")

删除已存储的文档

delete_1 = vstore.delete(inserted_ids[:3])
print(f"all_succeed={delete_1}") # True, all documents deleted
delete_2 = vstore.delete(inserted_ids[2:5])
print(f"some_succeeds={delete_2}") # True, though some IDs were gone already

最小的RAG链

接下来的单元格将实现一个简单的RAG管道:

  • download a sample PDF file and load it onto the store;
  • 使用LCEL(LangChain 表达语言)创建一个基于向量存储的RAG链;
  • 运行问答链路。
!curl -L \
"https://github.com/awesome-astra/datasets/blob/main/demo-resources/what-is-philosophy/what-is-philosophy.pdf?raw=true" \
-o "what-is-philosophy.pdf"
pdf_loader = PyPDFLoader("what-is-philosophy.pdf")
splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=64)
docs_from_pdf = pdf_loader.load_and_split(text_splitter=splitter)

print(f"Documents from PDF: {len(docs_from_pdf)}.")
inserted_ids_from_pdf = vstore.add_documents(docs_from_pdf)
print(f"Inserted {len(inserted_ids_from_pdf)} documents.")
retriever = vstore.as_retriever(search_kwargs={"k": 3})

philo_template = """
You are a philosopher that draws inspiration from great thinkers of the past
to craft well-thought answers to user questions. Use the provided context as the basis
for your answers and do not make up new reasoning paths - just mix-and-match what you are given.
Your answers must be concise and to the point, and refrain from answering about other topics than philosophy.

CONTEXT:
{context}

QUESTION: {question}

YOUR ANSWER:"""

philo_prompt = ChatPromptTemplate.from_template(philo_template)

llm = ChatOpenAI()

chain = (
{"context": retriever, "question": RunnablePassthrough()}
| philo_prompt
| llm
| StrOutputParser()
)
chain.invoke("How does Russel elaborate on Peirce's idea of the security blanket?")

要了解更多,请查看通过CQL使用Astra DB的完整RAG模板 这里

清理

以下内容本质上是从 CassIO 中检索 Session 对象,并使用它运行一个 CQL DROP TABLE 语句:

(您将丢失其中存储的所有数据。)

cassio.config.resolve_session().execute(
f"DROP TABLE {cassio.config.resolve_keyspace()}.cassandra_vector_demo;"
)

学习更多

对于更多信息、扩展快速开始指南和额外的使用示例,请访问CassIO 文档,了解更多关于使用 LangChain Cassandra 向量存储的信息。

署名声明

Apache Cassandra, Cassandra 和 Apache 是Apache软件基金会在美国及其他国家的注册商标或商标。