Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁面說明如何在 Cloud Composer 中啟用資料系譜整合功能。
關於資料歷程整合
資料歷程是 Dataplex Universal Catalog 的功能,可追蹤資料在系統中的移動情形,包括來源、傳遞目的地和採用的轉換機制。
Cloud Composer 會使用 apache-airflow-providers-openlineage
套件產生傳送至 Data Lineage API 的系統樹狀結構事件。
這個套件已在 Cloud Composer 環境中安裝。如果您安裝此套件的其他版本,支援的運算子清單可能會變更。建議您只在必要時才這麼做,否則請保留預先安裝的套件版本。
資料歷程可用於與 Dataplex Universal Catalog 支援資料歷程的相同區域中的環境。
如果在 Cloud Composer 環境中啟用資料沿革,Cloud Composer 會針對使用任何支援運算子的 DAG,向資料沿革 API 回報沿革資訊。如果您想針對不支援的運算子回報階層,也可以傳送自訂階層事件。
您可以透過下列方式存取沿革資訊:
- Data Lineage API
- Dataplex Universal Catalog 中支援項目的歷程圖。詳情請參閱 Dataplex Universal Catalog 說明文件中的「歷程圖」。
建立環境時,如果符合下列條件,系統會自動啟用資料沿革整合:
專案中已啟用 Data Lineage API。詳情請參閱 Dataplex 通用目錄說明文件中的「啟用資料系譜 API」。
在 Airflow 中未設定自訂的Lineage 後端。
您可以在建立環境時停用資料沿革整合功能。
Cloud Composer 中的功能考量
在下列情況下,Cloud Composer 會發出 RPC 呼叫來建立譜系事件:
- Airflow 工作開始或結束時
- DAG 執行作業開始或結束時
如要進一步瞭解這些實體,請參閱 Dataplex Universal Catalog 說明文件中的歷程資訊模型和歷程 API 參考資料。
傳送的歷程流量必須遵守 Data Lineage API 中的配額。Cloud Composer 會消耗寫入配額。
處理沿革資料的相關價格會依據沿革定價。請參閱資料沿革考量事項。
Cloud Composer 的效能考量
資料歷程會在 Airflow 工作執行結束時回報。資料系譜報表的平均產生時間約為 1 到 2 秒。
這不會影響工作本身的效能:如果無法順利將系統系譜回報給 Lineage API,Airflow 工作也不會失敗。這不會影響主要運算子邏輯,但為了回報血統資料,整個工作例項的執行時間會稍微延長。
回報資料系譜的環境會產生額外時間,因此相關成本會略微增加。
法規遵循
資料系譜可為 VPC Service Controls 等功能提供不同層級的支援。請詳閱資料沿革考量事項,確保支援層級符合環境需求。
事前準備
這項功能提供不同的法規遵循支援。請務必先查看 Cloud Composer 專屬功能的考量事項和 資料系譜功能的考量事項。
Composer Worker (
roles/composer.worker
) 角色已包含資料系譜所需的所有 IAM 權限。這個角色是環境服務帳戶的必要角色。如要進一步瞭解資料系譜權限,請參閱 Dataplex Universal Catalog 說明文件中的系譜角色和權限。
檢查是否支援特定電信業者
資料系譜支援功能由運算子所在的供應商套件提供:
請查看運算子所在的供應器套件變更記錄,找出新增 OpenLineage 支援的項目。
舉例來說,BigQueryToBigQueryOperator 支援
apache-airflow-providers-google
11.0.0 版以上的 OpenLineage。請檢查環境使用的供應器套件版本。如要這樣做,請參閱您環境中使用的 Airflow 版本的預先安裝套件清單。您也可以在環境中安裝其他版本的套件。
此外,apache-airflow-providers-openlineage
說明文件中的「Supported classes」頁面會列出最新的支援運算子。
設定資料歷程整合
Cloud Composer 的資料歷程整合功能會依環境個別管理。也就是說,啟用這項功能需要兩個步驟:
- 在專案中啟用 Data Lineage API。
- 在特定 Cloud Composer 環境中啟用資料系譜整合功能。
在 Cloud Composer 中啟用資料系譜
主控台
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。
選取「環境設定」分頁。
在「Dataplex 資料歷程整合」部分中,按一下「編輯」。
在「Dataplex 資料歷程整合」面板中,選取「啟用 Dataplex 資料歷程整合」,然後按一下「儲存」。
gcloud
使用 --enable-cloud-data-lineage-integration
引數。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--enable-cloud-data-lineage-integration
更改下列內容:
ENVIRONMENT_NAME
:環境名稱。LOCATION
:環境所在的地區。
範例:
gcloud composer environments update example-environment \
--location us-central1 \
--enable-cloud-data-lineage-integration
在 Cloud Composer 中停用資料歷程
在 Cloud Composer 環境中停用歷程整合功能,並不會停用 Data Lineage API。如果您想完全停用專案的歷程回報功能,請一併停用 Data Lineage API。請參閱「停用服務」。
主控台
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。
選取「環境設定」分頁。
在「Dataplex 資料歷程整合」部分中,按一下「編輯」。
在「Dataplex 資料歷程整合」面板中,選取「停用 Dataplex 資料歷程整合」,然後按一下「儲存」。
gcloud
使用 --disable-cloud-data-lineage-integration
引數。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--disable-cloud-data-lineage-integration
更改下列內容:
ENVIRONMENT_NAME
:環境名稱。LOCATION
:環境所在的地區。
範例:
gcloud composer environments update example-environment \
--location us-central1 \
--disable-cloud-data-lineage-integration
在支援的運算子中傳送系譜事件
如果啟用資料沿革,支援的運算子會自動傳送沿革事件。您不需要變更 DAG 程式碼。
例如執行下列工作:
task = BigQueryInsertJobOperator(
task_id='snapshot_task',
dag=dag,
location='<dataset-location>',
configuration={
'query': {
'query': 'SELECT * FROM dataset.tableA',
'useLegacySql': False,
'destinationTable': {
'project_id': 'example-project',
'dataset_id': 'dataset',
'table_id': 'tableB',
},
}
},
)
結果會在 Dataplex Universal Catalog UI 中建立下列歷程圖:

傳送自訂歷程事件
如果您想為不支援自動血統回報功能的運算子回報血統,可以傳送自訂血統事件。
舉例來說,如要傳送自訂事件,請使用以下方式:
- BashOperator:修改工作定義中的
inlets
或outlets
參數。 - PythonOperator:修改工作定義中的
task.inlets
或task.outlets
參數。 - 您可以將
AUTO
用於inlets
參數。這會將其值設為上游任務的outlets
。
以下範例說明如何使用入口和出口:
from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO
...
bash_task = BashOperator(
task_id="bash_task",
dag=dag,
bash_command="sleep 0",
inlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table1",
)
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table2",
)
],
)
def _python_task(task):
print("Python task")
python_task = PythonOperator(
task_id="python_task",
dag=dag,
python_callable=_python_task,
inlets=[
AUTO,
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table3",
),
],
outlets=[
BigQueryTable(
project_id="example-project",
dataset_id="dataset",
table_id="table4",
)
],
)
bash_task >> python_task
因此,在 Dataplex Universal Catalog UI 中建立了下列歷程圖:

在 Cloud Composer 中查看譜系記錄
您可以使用 Dataplex Universal Catalog 資料歷程整合專區的 環境設定頁面連結,查看與資料歷程相關的記錄。
疑難排解
如果歷程資料未回報至 Lineage API,或是您無法在 Dataplex Universal Catalog 中看到歷程資料,請嘗試下列疑難排解步驟:
- 請確認 Cloud Composer 環境的專案已啟用 Data Lineage API。
- 檢查 Cloud Composer 環境中是否已啟用資料歷程整合功能。
- 請檢查您使用的運算子是否包含在自動追溯記錄回報支援功能中。請參閱「支援的 Airflow 運算子」。
- 檢查 Cloud Composer 中的lineage 記錄,找出可能的問題。