如何使用 LangChain 索引 API
在这里,我们将介绍使用 LangChain 索引 API 的基本索引工作流程。
索引 API 允许您将来自任何来源的文档加载并保持同步到矢量存储中。具体来说,它有助于:
- 避免将重复的内容写入 vector store
- 避免重写未更改的内容
- 避免对未更改的内容重新计算嵌入
所有这些都可以节省您的时间和金钱,并改善您的矢量搜索结果。
至关重要的是,索引 API 甚至可以处理已经经历过多次的文档 相对于原始源文档的转换步骤(例如,通过文本分块)。
运作方式
LangChain 索引使用记录管理器 (RecordManager) 跟踪文档写入向量存储。
为内容编制索引时,将计算每个文档的哈希值,并将以下信息存储在记录管理器中:
- 文档哈希值(页面内容和元数据的哈希值)
- 写入时间
- 源 ID -- 每个文档的元数据中都应该包含信息,以便我们确定此文档的最终来源
删除模式
将文档索引到向量存储中时,可能会删除向量存储中的一些现有文档。在某些情况下,您可能希望删除与正在编制索引的新文档派生自相同来源的任何现有文档。在其他情况下,您可能希望批量删除所有现有文档。索引 API 删除模式允许您选择所需的行为:
| 清理模式 | 删除重复内容 | 可并行化 | 清理已删除的源文档 | 清理源文档和/或派生文档的更改 | 清理计时 |
|---|---|---|---|---|---|
| None | ✅ | ✅ | ❌ | ❌ | - |
| Incremental | ✅ | ✅ | ❌ | ✅ | Continuously |
| Full | ✅ | ❌ | ✅ | ✅ | At end of indexing |
| Scoped_Full | ✅ | ✅ | ❌ | ✅ | At end of indexing |
None不执行任何自动清理,允许用户手动清理旧内容。
incremental,full和scoped_full提供以下自动清理功能:
- 如果源文档或派生文档的内容已更改,则所有 3 种模式都将清理(删除)以前版本的内容。
- 如果源文档已被删除(意味着它未包含在当前正在编制索引的文档中),则
full清理模式会正确地将其从 Vector Store 中删除,但incremental和scoped_fullmode 则不会。
当内容发生更改时(例如,源 PDF 文件被修改),在索引期间将有一段时间,此时新版本和旧版本都可能返回给用户。这发生在写入新内容之后,但在删除旧版本之前。
incremental索引可以最大限度地减少此时间段,因为它能够在写入时连续进行清理。full和scoped_fullmode 会在写入所有批处理后进行清理。
要求
- 请勿与已预先填充独立于索引 API 的内容的存储一起使用,因为记录管理器不会知道之前已插入记录。
- 仅适用于 LangChain
vectorstore的 support 为:- 按 ID 添加文档 (
add_documentsmethod 替换为ids参数) - 按 ID 删除 (
deletemethod 替换为ids参数)
- 按 ID 添加文档 (
兼容的 Vectorstore:Aerospike,AnalyticDB,AstraDB,AwaDB,AzureCosmosDBNoSqlVectorSearch,AzureCosmosDBVectorSearch,AzureSearch,Bagel,Cassandra,Chroma,CouchbaseVectorStore,DashVector,DatabricksVectorSearch,DeepLake,Dingo,ElasticVectorSearch,ElasticsearchStore,FAISS,HanaDB,Milvus,MongoDBAtlasVectorSearch,MyScale,OpenSearchVectorSearch,PGVector,Pinecone,Qdrant,Redis,Rockset,ScaNN,SingleStoreDB,SupabaseVectorStore,SurrealDBStore,TimescaleVector,Vald,VDMS,Vearch,VespaStore,Weaviate,Yellowbrick,ZepVectorStore,TencentVectorDB,OpenSearchVectorSearch.
谨慎
记录管理器依赖于基于时间的机制来确定可以清理的内容(在使用full或incremental或scoped_full清理模式)。
如果两个任务背靠背运行,并且第一个任务在时钟时间更改之前完成,则第二个任务可能无法清理内容。
这在实际设置中不太可能成为问题,原因如下:
- RecordManager 使用更高分辨率的时间戳。
- 数据需要在第一次和第二次任务运行之间发生变化,如果任务之间的时间间隔很小,这不太可能。
- 索引任务通常需要数毫秒以上的时间。
快速入门
from langchain.indexes import SQLRecordManager, index
from langchain_core.documents import Document
from langchain_elasticsearch import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings
初始化 vector store 并设置嵌入:
collection_name = "test_index"
embedding = OpenAIEmbeddings()
vectorstore = ElasticsearchStore(
es_url="http://localhost:9200", index_name="test_index", embedding=embedding
)
使用适当的命名空间初始化记录管理器。
建议:使用同时考虑向量存储和向量存储中的集合名称的命名空间;例如,'redis/my_docs'、'chromadb/my_docs' 或 'postgres/my_docs'。
namespace = f"elasticsearch/{collection_name}"
record_manager = SQLRecordManager(
namespace, db_url="sqlite:///record_manager_cache.sql"
)
在使用记录管理器之前创建架构。
record_manager.create_schema()
让我们索引一些测试文档:
doc1 = Document(page_content="kitty", metadata={"source": "kitty.txt"})
doc2 = Document(page_content="doggy", metadata={"source": "doggy.txt"})
索引到空向量存储中:
def _clear():
"""Hacky helper method to clear content. See the `full` mode section to to understand why it works."""
index([], record_manager, vectorstore, cleanup="full", source_id_key="source")
None删除模式
此模式不会自动清理旧版本的内容;但是,它仍然负责内容重复数据删除。
_clear()
index(
[doc1, doc1, doc1, doc1, doc1],
record_manager,
vectorstore,
cleanup=None,
source_id_key="source",
)
{'num_added': 1, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
_clear()
index([doc1, doc2], record_manager, vectorstore, cleanup=None, source_id_key="source")
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
第二次,将跳过所有内容:
index([doc1, doc2], record_manager, vectorstore, cleanup=None, source_id_key="source")
{'num_added': 0, 'num_updated': 0, 'num_skipped': 2, 'num_deleted': 0}
"incremental"删除模式
_clear()
index(
[doc1, doc2],
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
再次索引应该会导致两个文档都被跳过 —— 也跳过了嵌入作!
index(
[doc1, doc2],
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 0, 'num_updated': 0, 'num_skipped': 2, 'num_deleted': 0}
如果我们没有提供具有增量索引模式的文档,则不会发生任何变化。
index([], record_manager, vectorstore, cleanup="incremental", source_id_key="source")
{'num_added': 0, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
如果我们更改一个文档,将写入新版本,并删除共享同一源的所有旧版本。
changed_doc_2 = Document(page_content="puppy", metadata={"source": "doggy.txt"})
index(
[changed_doc_2],
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 1, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 1}
"full"删除模式
在full模式中,用户应将full应索引到 indexing 函数中的内容的 universe。
任何未传递到索引函数且存在于 vectorstore 中的文档都将被删除!
此行为对于处理源文档的删除非常有用。
_clear()
all_docs = [doc1, doc2]
index(all_docs, record_manager, vectorstore, cleanup="full", source_id_key="source")
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
假设有人删除了第一个文档:
del all_docs[0]
all_docs
[Document(page_content='doggy', metadata={'source': 'doggy.txt'})]
使用完整模式也会清理已删除的内容。
index(all_docs, record_manager, vectorstore, cleanup="full", source_id_key="source")
{'num_added': 0, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 1}
源
metadata 属性包含一个名为source.这个来源应该指向与给定文档相关的最终出处。
例如,如果这些文档表示某个父文档的块,则source对于两个文档,应相同并引用父文档。
通常source应始终指定。仅使用None,如果您从未打算使用incrementalmode 的 API 中,并且由于某种原因无法指定source字段。
from langchain_text_splitters import CharacterTextSplitter
doc1 = Document(
page_content="kitty kitty kitty kitty kitty", metadata={"source": "kitty.txt"}
)
doc2 = Document(page_content="doggy doggy the doggy", metadata={"source": "doggy.txt"})
new_docs = CharacterTextSplitter(
separator="t", keep_separator=True, chunk_size=12, chunk_overlap=2
).split_documents([doc1, doc2])
new_docs
[Document(page_content='kitty kit', metadata={'source': 'kitty.txt'}),
Document(page_content='tty kitty ki', metadata={'source': 'kitty.txt'}),
Document(page_content='tty kitty', metadata={'source': 'kitty.txt'}),
Document(page_content='doggy doggy', metadata={'source': 'doggy.txt'}),
Document(page_content='the doggy', metadata={'source': 'doggy.txt'})]
_clear()
index(
new_docs,
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 5, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
changed_doggy_docs = [
Document(page_content="woof woof", metadata={"source": "doggy.txt"}),
Document(page_content="woof woof woof", metadata={"source": "doggy.txt"}),
]
这应该会删除与doggy.txtsource 并将它们替换为新版本。
index(
changed_doggy_docs,
record_manager,
vectorstore,
cleanup="incremental",
source_id_key="source",
)
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 2}
vectorstore.similarity_search("dog", k=30)
[Document(page_content='woof woof', metadata={'source': 'doggy.txt'}),
Document(page_content='woof woof woof', metadata={'source': 'doggy.txt'}),
Document(page_content='tty kitty', metadata={'source': 'kitty.txt'}),
Document(page_content='tty kitty ki', metadata={'source': 'kitty.txt'}),
Document(page_content='kitty kit', metadata={'source': 'kitty.txt'})]
与 loader 一起使用
索引可以接受文档的可迭代对象或任何 loader。
注意力:加载程序必须正确设置 source key。
from langchain_core.document_loaders import BaseLoader
class MyCustomLoader(BaseLoader):
def lazy_load(self):
text_splitter = CharacterTextSplitter(
separator="t", keep_separator=True, chunk_size=12, chunk_overlap=2
)
docs = [
Document(page_content="woof woof", metadata={"source": "doggy.txt"}),
Document(page_content="woof woof woof", metadata={"source": "doggy.txt"}),
]
yield from text_splitter.split_documents(docs)
def load(self):
return list(self.lazy_load())
_clear()
loader = MyCustomLoader()
loader.load()
[Document(page_content='woof woof', metadata={'source': 'doggy.txt'}),
Document(page_content='woof woof woof', metadata={'source': 'doggy.txt'})]
index(loader, record_manager, vectorstore, cleanup="full", source_id_key="source")
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
vectorstore.similarity_search("dog", k=30)
[Document(page_content='woof woof', metadata={'source': 'doggy.txt'}),
Document(page_content='woof woof woof', metadata={'source': 'doggy.txt'})]