Skip to main content
Open In ColabOpen on GitHub

Airbyte CDK(已弃用)

注意:AirbyteCDKLoader 已弃用。请改用 AirbyteLoader

LangChain AI开发框架是一个用于从API、数据库和文件到数据仓库和湖的ELT管道的数据集成平台。它拥有最大的ELT连接器目录,可以连接到数据仓库和数据库。

A lot of source connectors are implemented using the Airbyte CDK. This loader allows to run any of these connectors and return the data as documents.

安装

首先,你需要安装LangChain Python 包。

%pip install --upgrade --quiet  airbyte-cdk

然后,您可以通过Airbyte Github仓库安装现有的连接器,或者使用Airbyte CDK创建自己的连接器。

例如,要安装Github连接器,请运行

%pip install --upgrade --quiet  "source_github@git+https://github.com/airbytehq/airbyte.git@master#subdirectory=airbyte-integrations/connectors/source-github"

一些源代码还以常规包的形式发布在PyPI上

示例

现在你可以基于导入的源创建一个AirbyteCDKLoader。它接收一个被传递给连接器的config对象。你还必须通过名称(stream_name)选择要检索记录的流。有关配置对象和可用流的更多信息,请参阅连接器文档页面和规范定义。对于Github连接器,这些是:

from langchain_community.document_loaders.airbyte import AirbyteCDKLoader
from source_github.source import SourceGithub # plug in your own source here

config = {
# your github configuration
"credentials": {"api_url": "api.github.com", "personal_access_token": "<token>"},
"repository": "<repo>",
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
}

issues_loader = AirbyteCDKLoader(
source_class=SourceGithub, config=config, stream_name="issues"
)
API 参考:AirbyteCDKLoader

现在您可以通过常规方式加载文档

docs = issues_loader.load()

As load 返回一个列表,它将在所有文档加载完成后才会阻塞。为了更好地控制这个过程,你也可以使用 lazy_load 方法,该方法返回的是一个迭代器:

docs_iterator = issues_loader.lazy_load()

请注意,默认情况下页面内容为空,而元数据对象包含记录中的所有信息。若要以不同方式创建文档,在创建加载器时传入一个 record_handler 函数:

from langchain_core.documents import Document


def handle_record(record, id):
return Document(
page_content=record.data["title"] + "\n" + (record.data["body"] or ""),
metadata=record.data,
)


issues_loader = AirbyteCDKLoader(
source_class=SourceGithub,
config=config,
stream_name="issues",
record_handler=handle_record,
)

docs = issues_loader.load()
API 参考:文档

增量加载

一些流允许增量加载,这意味着源会跟踪已同步的记录并不会再加载这些记录。这对于数据量大且经常更新的数据源非常有用。

要利用这一点,请存储加载器的last_state属性,并在再次创建加载器时传递该值。这将确保仅加载新记录。

last_state = issues_loader.last_state  # store safely

incremental_issue_loader = AirbyteCDKLoader(
source_class=SourceGithub, config=config, stream_name="issues", state=last_state
)

new_docs = incremental_issue_loader.load()