Skip to main content
Open In Colab在 GitHub 上打开

Airbyte CDK(已弃用)

注意:AirbyteCDKLoader已弃用。请使用AirbyteLoader相反。

Airbyte是一个用于ELT管道的数据集成平台,从API、数据库和文件到仓库和湖泊。它拥有最大的数据仓库和数据库 ELT 连接器目录。

许多源连接器都是使用 Airbyte CDK 实现的。此加载器允许运行这些连接器中的任何一个并将数据作为文档返回。

安装

首先,您需要安装airbyte-cdkpython 软件包。

%pip install --upgrade --quiet  airbyte-cdk

然后,从 Airbyte Github 存储库安装现有连接器或使用 Airbyte CDK 创建自己的连接器。

例如,要安装 Github 连接器,请运行

%pip install --upgrade --quiet  "source_github@git+https://github.com/airbytehq/airbyte.git@master#subdirectory=airbyte-integrations/connectors/source-github"

一些源也作为常规包在 PyPI 上发布

现在,您可以创建一个AirbyteCDKLoader基于导入的源。它需要一个config对象。您还必须按名称 (stream_name).查看连接器文档页面和规范定义,以了解有关 config 对象和可用流的更多信息。对于 Github 连接器,这些连接器包括:

from langchain_community.document_loaders.airbyte import AirbyteCDKLoader
from source_github.source import SourceGithub # plug in your own source here

config = {
# your github configuration
"credentials": {"api_url": "api.github.com", "personal_access_token": "<token>"},
"repository": "<repo>",
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
}

issues_loader = AirbyteCDKLoader(
source_class=SourceGithub, config=config, stream_name="issues"
)
API 参考:AirbyteCDKLoader

现在,您可以按常规方式加载文档

docs = issues_loader.load()

load返回一个列表,它将阻塞,直到加载所有文档。为了更好地控制此过程,您还可以将lazy_load方法,它返回一个迭代器:

docs_iterator = issues_loader.lazy_load()

请记住,默认情况下,页面内容为空,并且 metadata 对象包含记录中的所有信息。要在不同的环境中创建文档,请在创建 loader 时传入 record_handler 函数:

from langchain_core.documents import Document


def handle_record(record, id):
return Document(
page_content=record.data["title"] + "\n" + (record.data["body"] or ""),
metadata=record.data,
)


issues_loader = AirbyteCDKLoader(
source_class=SourceGithub,
config=config,
stream_name="issues",
record_handler=handle_record,
)

docs = issues_loader.load()
API 参考:文档

增量负载

某些流允许增量加载,这意味着源会跟踪同步的记录,并且不会再次加载它们。这对于具有大量数据且经常更新的源非常有用。

要利用这一点,请将last_state属性,并在再次创建 loader 时传入。这将确保仅加载新记录。

last_state = issues_loader.last_state  # store safely

incremental_issue_loader = AirbyteCDKLoader(
source_class=SourceGithub, config=config, stream_name="issues", state=last_state
)

new_docs = incremental_issue_loader.load()