Skip to main content
Open In Colab在 GitHub 上打开

Airbyte Salesforce(已弃用)

注意:此特定于连接器的加载器已弃用。请使用AirbyteLoader相反。

Airbyte是一个用于ELT管道的数据集成平台,从API、数据库和文件到仓库和湖泊。它拥有最大的数据仓库和数据库 ELT 连接器目录。

此加载程序将 Salesforce 连接器公开为文档加载程序,允许您将各种 Salesforce 对象作为文档加载。

安装

首先,您需要安装airbyte-source-salesforcepython 软件包。

%pip install --upgrade --quiet  airbyte-source-salesforce

查看 Airbyte 文档页面,了解有关如何配置读取器的详细信息。 config 对象应遵循的 JSON 架构可在 Github 上找到:https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-salesforce/source_salesforce/spec.yaml

一般形状如下所示:

{
"client_id": "<oauth client id>",
"client_secret": "<oauth client secret>",
"refresh_token": "<oauth refresh token>",
"start_date": "<date from which to start retrieving records from in ISO format, e.g. 2020-10-20T00:00:00Z>",
"is_sandbox": False, # set to True if you're using a sandbox environment
"streams_criteria": [ # Array of filters for salesforce objects that should be loadable
{"criteria": "exacts", "value": "Account"}, # Exact name of salesforce object
{"criteria": "starts with", "value": "Asset"}, # Prefix of the name
# Other allowed criteria: ends with, contains, starts not with, ends not with, not contains, not exacts
],
}

默认情况下,所有字段都作为元数据存储在文档中,并且文本设置为空字符串。通过转换 Reader 返回的文档来构建文档的文本。

from langchain_community.document_loaders.airbyte import AirbyteSalesforceLoader

config = {
# your salesforce configuration
}

loader = AirbyteSalesforceLoader(
config=config, stream_name="asset"
) # check the documentation linked above for a list of all streams

现在,您可以按常规方式加载文档

docs = loader.load()

load返回一个列表,它将阻塞,直到加载所有文档。为了更好地控制此过程,您还可以将lazy_load方法,它返回一个迭代器:

docs_iterator = loader.lazy_load()

请记住,默认情况下,页面内容为空,并且 metadata 对象包含记录中的所有信息。要在不同的环境中创建文档,请在创建 loader 时传入 record_handler 函数:

from langchain_core.documents import Document


def handle_record(record, id):
return Document(page_content=record.data["title"], metadata=record.data)


loader = AirbyteSalesforceLoader(
config=config, record_handler=handle_record, stream_name="asset"
)
docs = loader.load()
API 参考:文档

增量负载

某些流允许增量加载,这意味着源会跟踪同步的记录,并且不会再次加载它们。这对于具有大量数据且经常更新的源非常有用。

要利用这一点,请将last_state属性,并在再次创建 loader 时传入。这将确保仅加载新记录。

last_state = loader.last_state  # store safely

incremental_loader = AirbyteSalesforceLoader(
config=config, stream_name="asset", state=last_state
)

new_docs = incremental_loader.load()