Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本页介绍了如何在 Cloud Composer 中启用数据谱系集成。
数据沿袭集成简介
数据沿袭是 Dataplex 的一项功能,可跟踪数据在系统中的移动方式:数据来自何处、传递到何处以及对其应用了哪些转换。
Cloud Composer 使用 apache-airflow-providers-openlineage
软件包生成要发送到 Data Lineage API 的谱系事件。
此软件包已安装在 Cloud Composer 环境中。如果您安装此软件包的其他版本,受支持的运营商列表可能会发生变化。我们建议您仅在必要时执行此操作,否则请保留预安装的软件包版本。
数据沿袭适用于与支持数据沿袭的 Dataplex 区域位于同一区域的环境。
如果 Cloud Composer 环境中启用了数据谱系,对于使用任何受支持的运算符的 DAG,Cloud Composer 会向 Data Lineage API 报告谱系信息。如果您想为不受支持的运营商报告谱系,还可以发送自定义谱系事件。
您可以通过以下方式访问谱系信息:
- Data Lineage API
- Dataplex 中受支持条目的沿袭图。 如需了解详情,请参阅 Dataplex 文档中的沿袭图。
创建环境时,如果满足以下条件,系统会自动启用数据源流水线集成:
您的项目已启用 Data Lineage API。如需了解详情,请参阅 Dataplex 文档中的启用 Data Lineage API。
Airflow 中未配置自定义谱系后端。
您可以在创建环境时停用数据沿袭集成。
Cloud Composer 中的功能注意事项
在以下情况下,Cloud Composer 会进行 RPC 调用以创建谱系事件:
- Airflow 任务的开始或结束时间
- DAG 运行的开始或结束时间
如需详细了解这些实体,请参阅 Dataplex 文档中的谱系信息模型和谱系 API 参考。
发出的谱系流量受 Data Lineage API 中的配额限制。Cloud Composer 会消耗写入配额。
与处理谱系数据相关的价格取决于谱系价格。 请参阅数据沿袭注意事项。
Cloud Composer 中的性能注意事项
系统会在 Airflow 任务执行结束时报告数据沿袭。平均而言,数据谱系报告大约需要 1-2 秒。
这不会影响任务本身的性能:即使未成功向 Lineage API 报告谱系,Airflow 任务也不会失败。这对主要运算符逻辑没有影响,但整个任务实例的执行时间会延长一些,以便报告谱系数据。
报告数据谱系的环境的相关费用会略微增加,因为报告数据谱系需要额外的时间。
合规性
数据源流可为 VPC Service Controls 等功能提供不同的支持级别。查看数据谱系注意事项,确保支持级别符合您的环境要求。
准备工作
此功能提供不同的合规性支持。请务必先查看特定于 Cloud Composer 的功能注意事项和数据谱系功能注意事项。
Cloud Composer 2.1.2 及更高版本与 Airflow 2.2.5 及更高版本支持数据沿袭集成。
Composer Worker (
roles/composer.worker
) 角色已包含数据传承所需的所有 IAM 权限。此角色是环境服务账号的必需角色。如需详细了解数据沿袭权限,请参阅 Dataplex 文档中的沿袭角色和权限。
检查运营商是否受支持
数据传承支持由包含运算符的提供程序软件包提供:
检查包含 operator 的提供程序软件包的更新日志,查看是否有添加 OpenLineage 支持的条目。
例如,BigQueryToBigQueryOperator 从
apache-airflow-providers-google
11.0.0 版开始支持 OpenLineage。检查您的环境使用的提供方软件包的版本。为此,请参阅您环境中使用的 Cloud Composer 版本的预安装软件包列表。您还可以在环境中安装其他版本的软件包。
此外,apache-airflow-providers-openlineage
文档中的支持的类页面列出了最新的受支持运算符。
配置数据沿袭集成
Cloud Composer 的数据沿袭集成是按环境进行管理的。这意味着,启用该功能需要完成以下两个步骤:
- 在项目中启用 Data Lineage API。
- 在特定 Cloud Composer 环境中启用数据沿袭集成。
在 Cloud Composer 中启用数据沿袭
控制台
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
选择环境配置标签页。
在 Dataplex 数据沿袭集成部分中,点击修改。
在 Dataplex 数据沿袭集成面板中,选择启用与 Dataplex 数据沿袭的集成,然后点击保存。
gcloud
使用 --enable-cloud-data-lineage-integration
参数。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
替换以下内容:
ENVIRONMENT_NAME
:您的环境的名称。LOCATION
:环境所在的区域。
示例:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
在 Cloud Composer 中停用数据沿袭
在 Cloud Composer 环境中停用谱系集成不会停用 Data Lineage API。如果您想为项目彻底停用沿袭报告,请同时停用 Data Lineage API。请参阅停用服务。
控制台
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
选择环境配置标签页。
在 Dataplex 数据沿袭集成部分中,点击修改。
在 Dataplex 数据沿袭集成面板中,选择停用与 Dataplex 数据沿袭的集成,然后点击保存。
gcloud
使用 --disable-cloud-data-lineage-integration
参数。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
替换以下内容:
ENVIRONMENT_NAME
:您的环境的名称。LOCATION
:环境所在的区域。
示例:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
在受支持的运营商中发送谱系事件
如果启用了数据沿袭,受支持的运营商会自动发送沿袭事件。您无需更改 DAG 代码。
例如,运行以下任务:
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': 'example-project',
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
这会在 Dataplex 界面中创建以下谱系图:

发送自定义谱系事件
如果您想为不支持自动生成谱系报告的运营商报告谱系,可以发送自定义谱系事件。
例如,如需使用以下代码发送自定义事件,请执行以下操作:
- BashOperator:修改任务定义中的
inlets
或outlets
参数。 - PythonOperator:修改任务定义中的
task.inlets
或task.outlets
参数。 - 您可以为
inlets
参数使用AUTO
。这会将其值设为其上游任务的outlets
。
以下示例展示了入口和出口的用法:
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
...
bash_task = BashOperator(
task_id="bash_task",
dag=dag,
bash_command="sleep 0",
inlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table1",
)
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table2",
)
],
)
def _python_task(task):
print("Python task")
python_task = PythonOperator(
task_id="python_task",
dag=dag,
python_callable=_python_task,
inlets=[
AUTO,
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table3",
),
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table4",
)
],
)
bash_task >> python_task
因此,Dataplex 界面中会创建以下谱系图:

在 Cloud Composer 中查看谱系日志
您可以使用环境配置页面中 Dataplex 数据沿袭集成部分中的链接,检查与数据沿袭相关的日志。
问题排查
如果系统未向 Lineage API 报告谱系数据,或者您在 Dataplex 中看不到谱系数据,请尝试执行以下问题排查步骤:
- 确保已在 Cloud Composer 环境的项目中启用 Data Lineage API。
- 检查 Cloud Composer 环境中是否已启用数据谱系集成。
- 检查您使用的运算符是否包含在自动谱系报告支持范围内。请参阅支持的 Airflow 操作器。
- 在 Cloud Composer 中查看谱系日志,了解可能存在的问题。