Skip to main content
Open In ColabOpen on GitHub

Elasticsearch

Elasticsearch 是一个分布式的、基于 REST 的搜索引擎和分析引擎,能够执行向量搜索和词法搜索。它是基于 Apache Lucene 库构建的。

此笔记本展示了如何使用与Elasticsearch向量存储相关的功能。

设置

要使用Elasticsearch向量搜索,您必须安装langchain-elasticsearch包。

%pip install -qU langchain-elasticsearch

Credentials

使用 LangChain AI 开发框架时,有两大主要方式来设置一个 Elasticsearch 实例:

  1. 弹性云: 弹性云是一个托管的Elasticsearch服务。您可以点击免费试用进行注册。

要连接到不需要登录凭证的Elasticsearch实例(通过带有安全性的docker实例启动),请将Elasticsearch URL和索引名称以及嵌入对象传递给构造函数。

  1. 本地安装Elasticsearch:通过在本地运行Elasticsearch开始使用它。最简单的方法是使用官方的Elasticsearch Docker镜像。有关更多信息,请参阅Elasticsearch Docker文档

运行Elasticsearch 通过Docker

Example: 使用安全功能禁用的单节点Elasticsearch实例。这不推荐用于生产环境。

%docker run -p 9200:9200 -e "discovery.type=single-node" -e "xpack.security.enabled=false" -e "xpack.security.http.ssl.enabled=false" docker.elastic.co/elasticsearch/elasticsearch:8.12.1

运行带有身份验证

对于生产环境,我们建议您启用安全模式运行。要使用登录凭据进行连接,请使用参数es_api_keyes_useres_password

pip install -qU langchain-openai
import getpass
import os

if not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = getpass.getpass("Enter API key for OpenAI: ")

from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
from langchain_elasticsearch import ElasticsearchStore

elastic_vector_search = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="langchain_index",
embedding=embeddings,
es_user="elastic",
es_password="changeme",
)

如何获取默认"elastic"用户的密码?

要获取默认 "elastic" 用户的弹性云密码:

  1. 登录 Elastic Cloud 控制台,请访问 https://cloud.elastic.co
  2. 去"安全" > "用户"
  3. 找到 "elastic" 用户并点击 "编辑"
  4. 点击“重置密码”
  5. 按照提示重置密码

如何获取API密钥?

要获得API密钥:

  1. 登录 Elastic Cloud 控制台,请访问 https://cloud.elastic.co
  2. 打开Kibana并前往堆栈管理 > API密钥
  3. 点击“创建 API 密钥”
  4. 请输入API密钥的名称并点击“创建”
  5. 将API密钥复制并粘贴到api_key参数中

弹性云

要连接到Elastic Cloud上的Elasticsearch实例,您可以使用es_cloud_id参数或es_url

elastic_vector_search = ElasticsearchStore(
es_cloud_id="<cloud_id>",
index_name="test_index",
embedding=embeddings,
es_user="elastic",
es_password="changeme",
)

如果您想要获得最佳的模型调用自动化跟踪,您也可以通过取消注释下方代码来设置您的LangSmith API密钥。

# os.environ["LANGSMITH_API_KEY"] = getpass.getpass("Enter your LangSmith API key: ")
# os.environ["LANGSMITH_TRACING"] = "true"

初始化

Elasticsearch 正在本地 localhost:9200 上通过 docker 运行。有关如何从 Elastic Cloud 连接到 Elasticsearch 的更多详细信息,请参见上方的 使用身份验证连接

from langchain_elasticsearch import ElasticsearchStore

vector_store = ElasticsearchStore(
"langchain-demo", embedding=embeddings, es_url="http://localhost:9201"
)

管理向量存储

添加项到向量存储

from uuid import uuid4

from langchain_core.documents import Document

document_1 = Document(
page_content="I had chocolate chip pancakes and scrambled eggs for breakfast this morning.",
metadata={"source": "tweet"},
)

document_2 = Document(
page_content="The weather forecast for tomorrow is cloudy and overcast, with a high of 62 degrees.",
metadata={"source": "news"},
)

document_3 = Document(
page_content="Building an exciting new project with LangChain - come check it out!",
metadata={"source": "tweet"},
)

document_4 = Document(
page_content="Robbers broke into the city bank and stole $1 million in cash.",
metadata={"source": "news"},
)

document_5 = Document(
page_content="Wow! That was an amazing movie. I can't wait to see it again.",
metadata={"source": "tweet"},
)

document_6 = Document(
page_content="Is the new iPhone worth the price? Read this review to find out.",
metadata={"source": "website"},
)

document_7 = Document(
page_content="The top 10 soccer players in the world right now.",
metadata={"source": "website"},
)

document_8 = Document(
page_content="LangGraph is the best framework for building stateful, agentic applications!",
metadata={"source": "tweet"},
)

document_9 = Document(
page_content="The stock market is down 500 points today due to fears of a recession.",
metadata={"source": "news"},
)

document_10 = Document(
page_content="I have a bad feeling I am going to get deleted :(",
metadata={"source": "tweet"},
)

documents = [
document_1,
document_2,
document_3,
document_4,
document_5,
document_6,
document_7,
document_8,
document_9,
document_10,
]
uuids = [str(uuid4()) for _ in range(len(documents))]

vector_store.add_documents(documents=documents, ids=uuids)
API 参考:文档
['21cca03c-9089-42d2-b41c-3d156be2b519',
'a6ceb967-b552-4802-bb06-c0e95fce386e',
'3a35fac4-e5f0-493b-bee0-9143b41aedae',
'176da099-66b1-4d6a-811b-dfdfe0808d30',
'ecfa1a30-3c97-408b-80c0-5c43d68bf5ff',
'c0f08baa-e70b-4f83-b387-c6e0a0f36f73',
'489b2c9c-1925-43e1-bcf0-0fa94cf1cbc4',
'408c6503-9ba4-49fd-b1cc-95584cd914c5',
'5248c899-16d5-4377-a9e9-736ca443ad4f',
'ca182769-c4fc-4e25-8f0a-8dd0a525955c']

删除向量存储中的项

vector_store.delete(ids=[uuids[-1]])
True

查询向量存储

一旦您的向量存储库创建完毕并已添加相关文档,在运行链或代理期间您很可能希望对其进行查询。这些示例还展示了在搜索时如何使用过滤功能。

查询直接

进行简单的相似度搜索并结合元数据过滤可以按照以下方式进行:

results = vector_store.similarity_search(
query="LangChain provides abstractions to make working with LLMs easy",
k=2,
filter=[{"term": {"metadata.source.keyword": "tweet"}}],
)
for res in results:
print(f"* {res.page_content} [{res.metadata}]")
* Building an exciting new project with LangChain - come check it out! [{'source': 'tweet'}]
* LangGraph is the best framework for building stateful, agentic applications! [{'source': 'tweet'}]

相似性搜索(带分数)

如果您想要执行相似性搜索并接收相应的评分,可以运行:

results = vector_store.similarity_search_with_score(
query="Will it be hot tomorrow",
k=1,
filter=[{"term": {"metadata.source.keyword": "news"}}],
)
for doc, score in results:
print(f"* [SIM={score:3f}] {doc.page_content} [{doc.metadata}]")
* [SIM=0.765887] The weather forecast for tomorrow is cloudy and overcast, with a high of 62 degrees. [{'source': 'news'}]

查询通过转换为检索器

您也可以将向量存储转换为检索器,以便在链条中更方便地使用。

retriever = vector_store.as_retriever(
search_type="similarity_score_threshold", search_kwargs={"score_threshold": 0.2}
)
retriever.invoke("Stealing from the bank is a crime")
[Document(metadata={'source': 'news'}, page_content='Robbers broke into the city bank and stole $1 million in cash.'),
Document(metadata={'source': 'news'}, page_content='The stock market is down 500 points today due to fears of a recession.'),
Document(metadata={'source': 'website'}, page_content='Is the new iPhone worth the price? Read this review to find out.'),
Document(metadata={'source': 'tweet'}, page_content='Building an exciting new project with LangChain - come check it out!')]

距离相似度算法

Elasticsearch 支持以下向量距离相似度算法:

  • cosine
  • euclidean
  • dot_product

cosine相似度算法是默认的。

您可以使用similarity参数指定所需的相似度算法。

NOTE: 根据检索策略,在查询时无法更改相似度算法。需要在为字段创建索引映射时设置。如果您需要更改相似度算法,需要删除索引并使用正确的distance_strategy重新创建。

db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
distance_strategy="COSINE",
# distance_strategy="EUCLIDEAN_DISTANCE"
# distance_strategy="DOT_PRODUCT"
)

检索策略

Elasticsearch 与其他仅向量数据库相比具有很大的优势,因为它能够支持广泛的检索策略。在这个笔记本中,我们将配置 ElasticsearchStore 来支持最常见的几种检索策略。

默认情况下,ElasticsearchStore 使用了 DenseVectorStrategy(在版本 0.2.0 之前称为 ApproxRetrievalStrategy)。

DenseVectorStrategy

这将返回与查询向量最相似的前 k 个向量。当初始化 k 参数时,其默认值为 10。

from langchain_elasticsearch import DenseVectorStrategy

db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
strategy=DenseVectorStrategy(),
)

docs = db.similarity_search(
query="What did the president say about Ketanji Brown Jackson?", k=10
)

这个示例将展示如何配置ElasticsearchStore以执行混合检索,结合使用近似语义搜索和关键词基于的搜索。

我们使用RRF来平衡不同检索方法得到的两个分数。

要启用混合检索,我们需要在DenseVectorStrategy构造函数中设置hybrid=True

db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
strategy=DenseVectorStrategy(hybrid=True),
)

当启用了混合模式时,查询将结合近似语义搜索和关键词基于的搜索。

它将使用倒排秩融合(rrf)来平衡不同检索方法得到的两个分数。

Note: RRF 要求使用 Elasticsearch 8.9.0 或更高版本。

{
"retriever": {
"rrf": {
"retrievers": [
{
"standard": {
"query": {
"bool": {
"filter": [],
"must": [{"match": {"text": {"query": "foo"}}}],
}
},
},
},
{
"knn": {
"field": "vector",
"filter": [],
"k": 1,
"num_candidates": 50,
"query_vector": [1.0, ..., 0.0],
},
},
]
}
}
}

例:使用Embedding模型在Elasticsearch中的密集向量搜索

这个示例将展示如何配置ElasticsearchStore使用部署在Elasticsearch中的嵌入模型来进行密集向量检索。

要使用此功能,请通过query_model_id参数在DenseVectorStrategy构造函数中指定model_id。

NOTE: 这需要将模型部署并在Elasticsearch ML节点上运行。请参见示例笔记本了解如何使用eland部署模型。

DENSE_SELF_DEPLOYED_INDEX_NAME = "test-dense-self-deployed"

# Note: This does not have an embedding function specified
# Instead, we will use the embedding model deployed in Elasticsearch
db = ElasticsearchStore(
es_cloud_id="<your cloud id>",
es_user="elastic",
es_password="<your password>",
index_name=DENSE_SELF_DEPLOYED_INDEX_NAME,
query_field="text_field",
vector_query_field="vector_query_field.predicted_value",
strategy=DenseVectorStrategy(model_id="sentence-transformers__all-minilm-l6-v2"),
)

# Setup a Ingest Pipeline to perform the embedding
# of the text field
db.client.ingest.put_pipeline(
id="test_pipeline",
processors=[
{
"inference": {
"model_id": "sentence-transformers__all-minilm-l6-v2",
"field_map": {"query_field": "text_field"},
"target_field": "vector_query_field",
}
}
],
)

# creating a new index with the pipeline,
# not relying on langchain to create the index
db.client.indices.create(
index=DENSE_SELF_DEPLOYED_INDEX_NAME,
mappings={
"properties": {
"text_field": {"type": "text"},
"vector_query_field": {
"properties": {
"predicted_value": {
"type": "dense_vector",
"dims": 384,
"index": True,
"similarity": "l2_norm",
}
}
},
}
},
settings={"index": {"default_pipeline": "test_pipeline"}},
)

db.from_texts(
["hello world"],
es_cloud_id="<cloud id>",
es_user="elastic",
es_password="<cloud password>",
index_name=DENSE_SELF_DEPLOYED_INDEX_NAME,
query_field="text_field",
vector_query_field="vector_query_field.predicted_value",
strategy=DenseVectorStrategy(model_id="sentence-transformers__all-minilm-l6-v2"),
)

# Perform search
db.similarity_search("hello world", k=10)

SparseVectorStrategy (ELSER)

这策略使用Elasticsearch的稀疏向量检索来获取前k个结果。目前我们仅支持我们自己的"ELSER"嵌入模型。

注意: 这需要在Elasticsearch ml节点上部署并运行ELSER模型。

要使用此功能,请在ElasticsearchStore构造函数中指定SparseVectorStrategy(在版本0.2.0之前称为SparseVectorRetrievalStrategy)。您需要提供一个模型ID。

from langchain_elasticsearch import SparseVectorStrategy

# Note that this example doesn't have an embedding function. This is because we infer the tokens at index time and at query time within Elasticsearch.
# This requires the ELSER model to be loaded and running in Elasticsearch.
db = ElasticsearchStore.from_documents(
docs,
es_cloud_id="<cloud id>",
es_user="elastic",
es_password="<cloud password>",
index_name="test-elser",
strategy=SparseVectorStrategy(model_id=".elser_model_2"),
)

db.client.indices.refresh(index="test-elser")

results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson", k=4
)
print(results[0])

DenseVectorScriptScoreStrategy

此策略使用Elasticsearch的脚本评分查询来执行精确向量检索(也称为暴力搜索),以检索前k个结果。(在版本0.2.0之前,此策略被称为ExactRetrievalStrategy。)

使用此功能,请在ElasticsearchStore构造函数中指定DenseVectorScriptScoreStrategy

from langchain_elasticsearch import SparseVectorStrategy

db = ElasticsearchStore.from_documents(
docs,
embeddings,
es_url="http://localhost:9200",
index_name="test",
strategy=DenseVectorScriptScoreStrategy(),
)

BM25Strategy

最终,您可以使用全文关键词搜索。

使用此功能,请在ElasticsearchStore构造函数中指定BM25Strategy

from langchain_elasticsearch import BM25Strategy

db = ElasticsearchStore.from_documents(
docs,
es_url="http://localhost:9200",
index_name="test",
strategy=BM25Strategy(),
)
API 参考:BM25策略

BM25RetrievalStrategy

此策略允许用户使用纯BM25进行搜索,而不使用向量搜索。

使用此功能,请在ElasticsearchStore构造函数中指定BM25RetrievalStrategy

请注意,在下面的示例中,未指定嵌入选项,这意味着搜索不使用嵌入进行。

from langchain_elasticsearch import ElasticsearchStore

db = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="test_index",
strategy=ElasticsearchStore.BM25RetrievalStrategy(),
)

db.add_texts(
["foo", "foo bar", "foo bar baz", "bar", "bar baz", "baz"],
)

results = db.similarity_search(query="foo", k=10)
print(results)

自定义查询

使用 custom_query 参数进行搜索时,您可以调整用于从 Elasticsearch 获取文档的查询。这在您希望使用更复杂的查询或支持字段的线性增强时非常有用。

# Example of a custom query thats just doing a BM25 search on the text field.
def custom_query(query_body: dict, query: str):
"""Custom query to be used in Elasticsearch.
Args:
query_body (dict): Elasticsearch query body.
query (str): Query string.
Returns:
dict: Elasticsearch query body.
"""
print("Query Retriever created by the retrieval strategy:")
print(query_body)
print()

new_query_body = {"query": {"match": {"text": query}}}

print("Query thats actually used in Elasticsearch:")
print(new_query_body)
print()

return new_query_body


results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson",
k=4,
custom_query=custom_query,
)
print("Results:")
print(results[0])

自定义文档构建器

使用doc_builder参数进行搜索,您可以根据从Elasticsearch检索到的数据调整文档的构建方式。这在您使用的索引不是通过Langchain创建的情况下特别有用。

from typing import Dict

from langchain_core.documents import Document


def custom_document_builder(hit: Dict) -> Document:
src = hit.get("_source", {})
return Document(
page_content=src.get("content", "Missing content!"),
metadata={
"page_number": src.get("page_number", -1),
"original_filename": src.get("original_filename", "Missing filename!"),
},
)


results = db.similarity_search(
"What did the president say about Ketanji Brown Jackson",
k=4,
doc_builder=custom_document_builder,
)
print("Results:")
print(results[0])
API 参考:文档

使用检索增强生成

对于如何使用此向量存储进行检索增强生成(RAG)的指南,请参见以下部分:

FAQ

Question: 当我将文档索引到Elasticsearch时,为什么会出现超时错误?如何解决这个问题?

一个可能的问题是,您的文档可能需要更长的时间才能被索引到Elasticsearch中。ElasticsearchStore使用了Elasticsearch的批量API,您可以通过调整一些默认设置来减少超时错误的发生概率。

这在您使用SparseVectorRetrievalStrategy时也是一个好主意。

The defaults are:

  • chunk_size: 500
  • max_chunk_bytes: 100MB

要调整这些设置,可以将chunk_sizemax_chunk_bytes参数传递给ElasticsearchStore的add_texts方法。

    vector_store.add_texts(
texts,
bulk_kwargs={
"chunk_size": 50,
"max_chunk_bytes": 200000000
}
)

升级到 ElasticsearchStore

如果您已经在基于LangChain的项目中使用了Elasticsearch,您可能正在使用已经弃用的老版本实现:ElasticVectorSearchElasticKNNSearch。我们引入了一个新的实现称为ElasticsearchStore,它更灵活且更容易使用。此笔记本将引导您完成升级到新实现的过程。

什么是最新动态?

The new implementation is now one class called ElasticsearchStore which can be used for approximate dense vector, exact dense vector, sparse vector (ELSER), BM25 retrieval and hybrid retrieval, via strategies.

我正在使用 ElasticKNNSearch

旧实现:


from langchain_community.vectorstores.elastic_vector_search import ElasticKNNSearch

db = ElasticKNNSearch(
elasticsearch_url="http://localhost:9200",
index_name="test_index",
embedding=embedding
)

新实现:<br/>


from langchain_elasticsearch import ElasticsearchStore, DenseVectorStrategy

db = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="test_index",
embedding=embedding,
# if you use the model_id
# strategy=DenseVectorStrategy(model_id="test_model")
# if you use hybrid search
# strategy=DenseVectorStrategy(hybrid=True)
)

我正在使用ElasticVectorSearch

旧实现:


from langchain_community.vectorstores.elastic_vector_search import ElasticVectorSearch

db = ElasticVectorSearch(
elasticsearch_url="http://localhost:9200",
index_name="test_index",
embedding=embedding
)

新实现:<br/>


from langchain_elasticsearch import ElasticsearchStore, DenseVectorScriptScoreStrategy

db = ElasticsearchStore(
es_url="http://localhost:9200",
index_name="test_index",
embedding=embedding,
strategy=DenseVectorScriptScoreStrategy()
)

db.client.indices.delete(
index="test-metadata, test-elser, test-basic",
ignore_unavailable=True,
allow_no_indices=True,
)

API 参考

详细文档请参阅所有ElasticSearchStore功能和配置的API参考: https://python.langchain.com/api_reference/elasticsearch/vectorstores/langchain_elasticsearch.vectorstores.ElasticsearchStore.html