Elasticsearch
Elasticsearch 是一个分布式 RESTful 搜索和分析引擎,能够执行向量和词法搜索。它构建在 Apache Lucene 库之上。
此笔记本展示了如何使用与Elasticsearch向量存储。
设置
要使用Elasticsearchvector search 中,您必须安装langchain-elasticsearch包。
%pip install -qU langchain-elasticsearch
凭据
有两种主要方法可以设置 Elasticsearch 实例以用于:
- Elastic Cloud:Elastic Cloud 是一项托管式 Elasticsearch 服务。注册免费试用。
连接到不需要的 Elasticsearch 实例 登录凭证(在启用安全性的情况下启动 Docker 实例),传递 Elasticsearch URL 和索引名称以及 embedding 对象添加到构造函数中。
- 本地安装 Elasticsearch:通过在本地运行 Elasticsearch 来开始使用它。最简单的方法是使用官方的 Elasticsearch Docker 镜像。有关更多信息,请参阅 Elasticsearch Docker 文档。
通过 Docker 运行 Elasticsearch
示例:在禁用安全性的情况下运行单节点 Elasticsearch 实例。不建议将其用于生产用途。
%docker run -p 9200:9200 -e "discovery.type=single-node" -e "xpack.security.enabled=false" -e "xpack.security.http.ssl.enabled=false" docker.elastic.co/elasticsearch/elasticsearch:8.12.1
使用身份验证运行
对于生产环境,我们建议您在启用安全性的情况下运行。要使用登录凭证进行连接,您可以使用参数es_api_key或es_user和es_password.
pip install -qU langchain-openai
import getpass
import os
if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
from langchain_elasticsearch import ElasticsearchStore
elastic_vector_search = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="langchain_index",
embedding=embeddings,
es_user="elastic",
es_password="changeme",
)
如何获取默认 elastic 用户的密码?
要获取默认 “elastic” 用户的 Elastic Cloud 密码,请执行以下作:
- 在 https://cloud.elastic.co 登录到 Elastic Cloud 控制台
- 转到“安全”>“用户”
- 找到 “elastic” 用户并单击 “Edit”
- 点击“重置密码”
- 按照提示重置密码
如何获取 API 密钥?
要获取 API 密钥,请执行以下作:
- 在 https://cloud.elastic.co 登录到 Elastic Cloud 控制台
- 打开 Kibana 并转到 Stack Management > API Keys
- 点击“Create API key”
- 输入 API 密钥的名称,然后单击 “Create”
- 复制 API 密钥并将其粘贴到
api_key参数
弹性云
要连接到 Elastic Cloud 上的 Elasticsearch 实例,您可以使用es_cloud_idparameter 或es_url.
elastic_vector_search = ElasticsearchStore(
es_cloud_id="<cloud_id>",
index_name="test_index",
embedding=embeddings,
es_user="elastic",
es_password="changeme",
)
如果您想获得一流的模型调用自动跟踪,您还可以通过取消下面的注释来设置 LangSmith API 密钥:
# os.environ["LANGSMITH_API_KEY"] = getpass.getpass("Enter your LangSmith API key: ")
# os.environ["LANGSMITH_TRACING"] = "true"
初始化
Elasticsearch 正在使用 docker 在 localhost:9200 上本地运行。有关如何从 Elastic Cloud 连接到 Elasticsearch 的更多详细信息,请参阅上面的使用身份验证进行连接。
from langchain_elasticsearch import ElasticsearchStore
vector_store = ElasticsearchStore(
"langchain-demo", embedding=embeddings, es_url="http://localhost:9201"
)
管理矢量存储
将项目添加到向量存储
from uuid import uuid4
from langchain_core.documents import Document
document_1 = Document(
page_content="I had chocolate chip pancakes and scrambled eggs for breakfast this morning.",
metadata={"source": "tweet"},
)
document_2 = Document(
page_content="The weather forecast for tomorrow is cloudy and overcast, with a high of 62 degrees.",
metadata={"source": "news"},
)
document_3 = Document(
page_content="Building an exciting new project with LangChain - come check it out!",
metadata={"source": "tweet"},
)
document_4 = Document(
page_content="Robbers broke into the city bank and stole $1 million in cash.",
metadata={"source": "news"},
)
document_5 = Document(
page_content="Wow! That was an amazing movie. I can't wait to see it again.",
metadata={"source": "tweet"},
)
document_6 = Document(
page_content="Is the new iPhone worth the price? Read this review to find out.",
metadata={"source": "website"},
)
document_7 = Document(
page_content="The top 10 soccer players in the world right now.",
metadata={"source": "website"},
)
document_8 = Document(
page_content="LangGraph is the best framework for building stateful, agentic applications!",
metadata={"source": "tweet"},
)
document_9 = Document(
page_content="The stock market is down 500 points today due to fears of a recession.",
metadata={"source": "news"},
)
document_10 = Document(
page_content="I have a bad feeling I am going to get deleted :(",
metadata={"source": "tweet"},
)
documents = [
document_1,
document_2,
document_3,
document_4,
document_5,
document_6,
document_7,
document_8,
document_9,
document_10,
]
uuids = [str(uuid4()) for _ in range(len(documents))]
vector_store.add_documents(documents=documents, ids=uuids)
['21cca03c-9089-42d2-b41c-3d156be2b519',
'a6ceb967-b552-4802-bb06-c0e95fce386e',
'3a35fac4-e5f0-493b-bee0-9143b41aedae',
'176da099-66b1-4d6a-811b-dfdfe0808d30',
'ecfa1a30-3c97-408b-80c0-5c43d68bf5ff',
'c0f08baa-e70b-4f83-b387-c6e0a0f36f73',
'489b2c9c-1925-43e1-bcf0-0fa94cf1cbc4',
'408c6503-9ba4-49fd-b1cc-95584cd914c5',
'5248c899-16d5-4377-a9e9-736ca443ad4f',
'ca182769-c4fc-4e25-8f0a-8dd0a525955c']
从 vector store 中删除项目
vector_store.delete(ids=[uuids[-1]])
True
查询向量存储
创建矢量存储并添加相关文档后,您很可能希望在链或代理运行期间对其进行查询。这些示例还显示了如何在搜索时使用筛选。
直接查询
相似性搜索
可以按如下方式执行对元数据进行筛选的简单相似性搜索:
results = vector_store.similarity_search(
query="LangChain provides abstractions to make working with LLMs easy",
k=2,
filter=[{"term": {"metadata.source.keyword": "tweet"}}],
)
for res in results:
print(f"* {res.page_content} [{res.metadata}]")
* Building an exciting new project with LangChain - come check it out! [{'source': 'tweet'}]
* LangGraph is the best framework for building stateful, agentic applications! [{'source': 'tweet'}]
带分数的相似性搜索
如果要执行相似性搜索并接收相应的分数,可以运行:
results = vector_store.similarity_search_with_score(
query="Will it be hot tomorrow",
k=1,
filter=[{"term": {"metadata.source.keyword": "news"}}],
)
for doc, score in results:
print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
* [SIM=0.765887] The weather forecast for tomorrow is cloudy and overcast, with a high of 62 degrees. [{'source': 'news'}]
通过转换为 retriever 进行查询
您还可以将 vector store 转换为检索器,以便在您的链中更轻松地使用。
retriever = vector_store.as_retriever(
search_type="similarity_score_threshold", search_kwargs={"score_threshold": 0.2}
)
retriever.invoke("Stealing from the bank is a crime")
[Document(metadata={'source': 'news'}, page_content='Robbers broke into the city bank and stole $1 million in cash.'),
Document(metadata={'source': 'news'}, page_content='The stock market is down 500 points today due to fears of a recession.'),
Document(metadata={'source': 'website'}, page_content='Is the new iPhone worth the price? Read this review to find out.'),
Document(metadata={'source': 'tweet'}, page_content='Building an exciting new project with LangChain - come check it out!')]
距离相似性算法
Elasticsearch 支持以下向量距离相似度算法:
- 余弦
- 欧 氏
- dot_product
余弦相似度算法是默认算法。
您可以通过 similarity 参数指定所需的 similarity Algorithm。
注意:根据检索策略,无法在查询时更改相似性算法。在为 field 创建索引映射时需要设置它。如果需要更改相似性算法,则需要删除索引并使用正确的distance_strategy重新创建索引。
db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
distance_strategy="COSINE",
# distance_strategy="EUCLIDEAN_DISTANCE"
# distance_strategy="DOT_PRODUCT"
)
检索策略
与其他纯矢量数据库相比,Elasticsearch 具有很大的优势,因为它能够支持广泛的检索策略。在此 Notebook 中,我们将配置ElasticsearchStore以支持一些最常见的检索策略。
默认情况下,ElasticsearchStore使用DenseVectorStrategy(被称为ApproxRetrievalStrategy0.2.0 之前的版本)。
DenseVectorStrategy
这将返回与查询向量最相似的前 k 个向量。这k参数在ElasticsearchStore已初始化。默认值为 10。
from langchain_elasticsearch import DenseVectorStrategy
db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
strategy=DenseVectorStrategy(),
)
docs = db.similarity_search(
query="What did the president say about Ketanji Brown Jackson?", k=10
)
示例:使用密集向量和关键字搜索进行混合检索
此示例将展示如何配置 ElasticsearchStore 以结合使用近似语义搜索和基于关键字的搜索来执行混合检索。
我们使用 RRF 来平衡来自不同检索方法的两个分数。
要启用混合检索,我们需要将hybrid=True在DenseVectorStrategy构造 函数。
db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
strategy=DenseVectorStrategy(hybrid=True),
)
启用混合后,执行的查询将是近似语义搜索和基于关键字的搜索的组合。
它将使用 rrf (Reciprocal Rank Fusion) 来平衡来自不同检索方法的两个分数。
注意:RRF 需要 Elasticsearch 8.9.0 或更高版本。
{
"retriever": {
"rrf": {
"retrievers": [
{
"standard": {
"query": {
"bool": {
"filter": [],
"must": [{"match": {"text": {"query": "foo"}}}],
}
},
},
},
{
"knn": {
"field": "vector",
"filter": [],
"k": 1,
"num_candidates": 50,
"query_vector": [1.0, ..., 0.0],
},
},
]
}
}
}
示例:在 Elasticsearch 中使用 Embedding Model 进行密集向量搜索
此示例将演示如何配置ElasticsearchStore使用 Elasticsearch 中部署的嵌入模型进行密集向量检索。
要使用此选项model_id,请在DenseVectorStrategy构造函数通过query_model_id论点。
注意:这需要在 Elasticsearch ML 节点中部署和运行模型。请参阅笔记本示例,了解如何使用eland.
DENSE_SELF_DEPLOYED_INDEX_NAME = "test-dense-self-deployed"
# Note: This does not have an embedding function specified
# Instead, we will use the embedding model deployed in Elasticsearch
db = ElasticsearchStore(
es_cloud_id="<your cloud id>",
es_user="elastic",
es_password="<your password>",
index_name=DENSE_SELF_DEPLOYED_INDEX_NAME,
query_field="text_field",
vector_query_field="vector_query_field.predicted_value",
strategy=DenseVectorStrategy(model_id="sentence-transformers__all-minilm-l6-v2"),
)
# Setup a Ingest Pipeline to perform the embedding
# of the text field
db.client.ingest.put_pipeline(
id="test_pipeline",
processors=[
{
"inference": {
"model_id": "sentence-transformers__all-minilm-l6-v2",
"field_map": {"query_field": "text_field"},
"target_field": "vector_query_field",
}
}
],
)
# creating a new index with the pipeline,
# not relying on langchain to create the index
db.client.indices.create(
index=DENSE_SELF_DEPLOYED_INDEX_NAME,
mappings={
"properties": {
"text_field": {"type": "text"},
"vector_query_field": {
"properties": {
"predicted_value": {
"type": "dense_vector",
"dims": 384,
"index": True,
"similarity": "l2_norm",
}
}
},
}
},
settings={"index": {"default_pipeline": "test_pipeline"}},
)
db.from_texts(
["hello world"],
es_cloud_id="<cloud id>",
es_user="elastic",
es_password="<cloud password>",
index_name=DENSE_SELF_DEPLOYED_INDEX_NAME,
query_field="text_field",
vector_query_field="vector_query_field.predicted_value",
strategy=DenseVectorStrategy(model_id="sentence-transformers__all-minilm-l6-v2"),
)
# Perform search
db.similarity_search("hello world", k=10)
稀疏矢量策略 (ELSER)
此策略使用 Elasticsearch 的稀疏向量检索来检索 top-k 结果。我们目前只支持我们自己的 “ELSER” 嵌入模型。
注意:这需要在 Elasticsearch ml 节点中部署和运行 ELSER 模型。
要使用此选项,请指定SparseVectorStrategy(被称为SparseVectorRetrievalStrategy0.2.0 之前的版本)在ElasticsearchStore构造 函数。您需要提供模型 ID。
from langchain_elasticsearch import SparseVectorStrategy
# Note that this example doesn't have an embedding function. This is because we infer the tokens at index time and at query time within Elasticsearch.
# This requires the ELSER model to be loaded and running in Elasticsearch.
db = ElasticsearchStore.from_documents(
docs,
es_cloud_id="<cloud id>",
es_user="elastic",
es_password="<cloud password>",
index_name="test-elser",
strategy=SparseVectorStrategy(model_id=".elser_model_2"),
)
db.client.indices.refresh(index="test-elser")
results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson", k=4
)
print(results[0])
DenseVectorScriptScoreStrategy
此策略使用 Elasticsearch 的脚本分数查询来执行精确向量检索(也称为蛮力)以检索前 k 个结果。(这个策略叫做ExactRetrievalStrategy0.2.0 之前的版本。
要使用此选项,请指定DenseVectorScriptScoreStrategy在ElasticsearchStore构造 函数。
from langchain_elasticsearch import SparseVectorStrategy
db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
strategy=DenseVectorScriptScoreStrategy(),
)
BM25策略
最后,您可以使用全文关键字搜索。
要使用此选项,请指定BM25Strategy在ElasticsearchStore构造 函数。
from langchain_elasticsearch import BM25Strategy
db = ElasticsearchStore.from_documents(
docs,
es_url="http://localhost:9200",
index_name="test",
strategy=BM25Strategy(),
)
BM25检索策略
此策略允许用户使用纯 BM25 执行搜索,而无需进行矢量搜索。
要使用此选项,请指定BM25RetrievalStrategy在ElasticsearchStore构造 函数。
请注意,在下面的示例中,未指定 embedding 选项,表示在不使用嵌入的情况下执行搜索。
from langchain_elasticsearch import ElasticsearchStore
db = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="test_index",
strategy=ElasticsearchStore.BM25RetrievalStrategy(),
)
db.add_texts(
["foo", "foo bar", "foo bar baz", "bar", "bar baz", "baz"],
)
results = db.similarity_search(query="foo", k=10)
print(results)
自定义查询
跟custom_query参数,您可以调整用于从 Elasticsearch 检索文档的查询。如果要使用更复杂的查询来支持字段的线性提升,这将非常有用。
# Example of a custom query thats just doing a BM25 search on the text field.
def custom_query(query_body: dict, query: str):
"""Custom query to be used in Elasticsearch.
Args:
query_body (dict): Elasticsearch query body.
query (str): Query string.
Returns:
dict: Elasticsearch query body.
"""
print("Query Retriever created by the retrieval strategy:")
print(query_body)
print()
new_query_body = {"query": {"match": {"text": query}}}
print("Query thats actually used in Elasticsearch:")
print(new_query_body)
print()
return new_query_body
results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson",
k=4,
custom_query=custom_query,
)
print("Results:")
print(results[0])
自定义 Document Builder
跟doc_builder参数,您可以调整使用从 Elasticsearch 检索的数据构建文档的方式。如果您的索引不是使用 Langchain 创建的,这将特别有用。
from typing import Dict
from langchain_core.documents import Document
def custom_document_builder(hit: Dict) -> Document:
src = hit.get("_source", {})
return Document(
page_content=src.get("content", "Missing content!"),
metadata={
"page_number": src.get("page_number", -1),
"original_filename": src.get("original_filename", "Missing filename!"),
},
)
results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson",
k=4,
doc_builder=custom_document_builder,
)
print("Results:")
print(results[0])
用于检索增强生成
有关如何使用此向量存储进行检索增强生成 (RAG) 的指南,请参阅以下部分:
常见问题
问题:在 Elasticsearch 中索引文档时出现超时错误。我该如何解决这个问题?
一个可能的问题是,您的文档可能需要更长的时间才能索引到 Elasticsearch 中。ElasticsearchStore 使用 Elasticsearch 批量 API,该 API 具有一些默认值,您可以调整这些 API 以减少出现超时错误的可能性。
当您使用 SparseVectorRetrievalStrategy 时,这也是一个好主意。
默认值为:
chunk_size: 500max_chunk_bytes: 100兆字节
要调整这些参数,您可以传入chunk_size和max_chunk_bytes参数添加到 ElasticsearchStoreadd_texts方法。
vector_store.add_texts(
texts,
bulk_kwargs={
"chunk_size": 50,
"max_chunk_bytes": 200000000
}
)
升级到 ElasticsearchStore
如果您已经在基于 langchain 的项目中使用 Elasticsearch,则可能正在使用旧的实现:ElasticVectorSearch和ElasticKNNSearch这些组件现已弃用。我们引入了一个名为ElasticsearchStore这更灵活,更易于使用。此笔记本将指导您完成升级到新实施的过程。
新增功能
新的实现现在是一个名为ElasticsearchStore通过策略,可用于近似密集向量、精确密集向量、稀疏向量 (ELSER)、BM25 检索和混合检索。
我正在使用 ElasticKNNSearch
旧实现:
from langchain_community.vectorstores.elastic_vector_search import ElasticKNNSearch
db = ElasticKNNSearch(
elasticsearch_url="http://localhost:9200",
index_name="test_index",
embedding=embedding
)
新实现:
from langchain_elasticsearch import ElasticsearchStore, DenseVectorStrategy
db = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="test_index",
embedding=embedding,
# if you use the model_id
# strategy=DenseVectorStrategy(model_id="test_model")
# if you use hybrid search
# strategy=DenseVectorStrategy(hybrid=True)
)
我正在使用 ElasticVectorSearch
旧实现:
from langchain_community.vectorstores.elastic_vector_search import ElasticVectorSearch
db = ElasticVectorSearch(
elasticsearch_url="http://localhost:9200",
index_name="test_index",
embedding=embedding
)
新实现:
from langchain_elasticsearch import ElasticsearchStore, DenseVectorScriptScoreStrategy
db = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="test_index",
embedding=embedding,
strategy=DenseVectorScriptScoreStrategy()
)
db.client.indices.delete(
index="test-metadata, test-elser, test-basic",
ignore_unavailable=True,
allow_no_indices=True,
)
API 参考
有关所有ElasticSearchStore功能和配置前往 API 参考:https://python.langchain.com/api_reference/elasticsearch/vectorstores/langchain_elasticsearch.vectorstores.ElasticsearchStore.html