使用 Dataplex 通用目錄追蹤資料歷程

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何在 Cloud Composer 中啟用資料系譜整合功能。

關於資料歷程整合

資料歷程Dataplex Universal Catalog 的功能,可追蹤資料在系統中的移動情形,包括來源、傳遞目的地和採用的轉換機制。

Cloud Composer 會使用 apache-airflow-providers-openlineage 套件產生傳送至 Data Lineage API 的系統樹狀結構事件。

這個套件已在 Cloud Composer 環境中安裝。如果您安裝此套件的其他版本,支援的運算子清單可能會變更。建議您只在必要時才這麼做,否則請保留預先安裝的套件版本。

  • 資料歷程可用於與 Dataplex Universal Catalog 支援資料歷程的相同區域中的環境。

  • 如果在 Cloud Composer 環境中啟用資料沿革,Cloud Composer 會針對使用任何支援運算子的 DAG,向資料沿革 API 回報沿革資訊。如果您想針對不支援的運算子回報階層,也可以傳送自訂階層事件

  • 您可以透過下列方式存取沿革資訊:

    • Data Lineage API
    • Dataplex Universal Catalog 中支援項目的歷程圖。詳情請參閱 Dataplex Universal Catalog 說明文件中的「歷程圖」。

建立環境時,如果符合下列條件,系統會自動啟用資料沿革整合:

  • 專案中已啟用 Data Lineage API。詳情請參閱 Dataplex 通用目錄說明文件中的「啟用資料系譜 API」。

  • 在 Airflow 中未設定自訂的Lineage 後端

您可以在建立環境時停用資料沿革整合功能。

對於現有環境,您隨時可以啟用停用資料歷程整合。

Cloud Composer 中的功能考量

在下列情況下,Cloud Composer 會發出 RPC 呼叫來建立譜系事件:

  • Airflow 工作開始或結束時
  • DAG 執行作業開始或結束時

如要進一步瞭解這些實體,請參閱 Dataplex Universal Catalog 說明文件中的歷程資訊模型歷程 API 參考資料

傳送的歷程流量必須遵守 Data Lineage API 中的配額。Cloud Composer 會消耗寫入配額。

處理沿革資料的相關價格會依據沿革定價。請參閱資料沿革考量事項

Cloud Composer 的效能考量

資料歷程會在 Airflow 工作執行結束時回報。資料系譜報表的平均產生時間約為 1 到 2 秒。

這不會影響工作本身的效能:如果無法順利將系統系譜回報給 Lineage API,Airflow 工作也不會失敗。這不會影響主要運算子邏輯,但為了回報血統資料,整個工作例項的執行時間會稍微延長。

回報資料系譜的環境會產生額外時間,因此相關成本會略微增加。

法規遵循

資料系譜可為 VPC Service Controls 等功能提供不同層級的支援。請詳閱資料沿革考量事項,確保支援層級符合環境需求。

事前準備

檢查是否支援特定電信業者

資料系譜支援功能由運算子所在的供應商套件提供:

  1. 請查看運算子所在的供應器套件變更記錄,找出新增 OpenLineage 支援的項目。

    舉例來說,BigQueryToBigQueryOperator 支援 apache-airflow-providers-google 11.0.0 版以上的 OpenLineage。

  2. 請檢查環境使用的供應器套件版本。如要這麼做,請參閱預先安裝套件清單,瞭解環境中使用的 Cloud Composer 版本。您也可以在環境中安裝其他版本的套件。

此外,apache-airflow-providers-openlineage 說明文件中的「Supported classes」頁面會列出最新的支援運算子。

設定資料歷程整合

Cloud Composer 的資料歷程整合功能會依環境個別管理。也就是說,啟用這項功能需要兩個步驟:

  1. 在專案中啟用 Data Lineage API。
  2. 在特定 Cloud Composer 環境中啟用資料系譜整合功能。

在 Cloud Composer 中啟用資料系譜

主控台

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

  3. 選取「環境設定」分頁。

  4. 在「Dataplex 資料歷程整合」部分中,按一下「編輯」

  5. 在「Dataplex 資料歷程整合」面板中,選取「啟用 Dataplex 資料歷程整合」,然後按一下「儲存」

gcloud

使用 --enable-cloud-data-lineage-integration 引數。

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --enable-cloud-data-lineage-integration

更改下列內容:

  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的地區。

範例:

gcloud composer environments update example-environment \
    --location us-central1 \
    --enable-cloud-data-lineage-integration

在 Cloud Composer 中停用資料歷程

在 Cloud Composer 環境中停用歷程整合功能,並不會停用 Data Lineage API。如果您想完全停用專案的歷程回報功能,請一併停用 Data Lineage API。請參閱「停用服務」。

主控台

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

  3. 選取「環境設定」分頁。

  4. 在「Dataplex 資料歷程整合」部分中,按一下「編輯」

  5. 在「Dataplex 資料歷程整合」面板中,選取「停用 Dataplex 資料歷程整合」,然後按一下「儲存」

gcloud

使用 --disable-cloud-data-lineage-integration 引數。

gcloud composer environments update ENVIRONMENT_NAME \
    --location LOCATION \
    --disable-cloud-data-lineage-integration

更改下列內容:

  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的地區。

範例:

gcloud composer environments update example-environment \
    --location us-central1 \
    --disable-cloud-data-lineage-integration

在支援的運算子中傳送系譜事件

如果啟用資料沿革,支援的運算子會自動傳送沿革事件。您不需要變更 DAG 程式碼。

例如執行下列工作:

task = BigQueryInsertJobOperator(
    task_id='snapshot_task',
    dag=dag,
    location='<dataset-location>',
    configuration={
        'query': {
            'query': 'SELECT * FROM dataset.tableA',
            'useLegacySql': False,
            'destinationTable': {
                'project_id': 'example-project',
                'dataset_id': 'dataset',
                'table_id': 'tableB',
            },
        }
    },
)

結果會在 Dataplex Universal Catalog UI 中建立下列歷程圖:

Dataplex UI 中的歷程圖示例。
圖 1. Dataplex Universal Catalog UI 中 BigQuery 資料表的歷程圖示例。

傳送自訂歷程事件

如果您想為不支援自動血統回報功能的運算子回報血統,可以傳送自訂血統事件。

舉例來說,如要傳送自訂事件,請使用以下方式:

  • BashOperator:修改工作定義中的 inletsoutlets 參數。
  • PythonOperator:修改工作定義中的 task.inletstask.outlets 參數。
  • 您可以將 AUTO 用於 inlets 參數。這會將其值設為上游任務的 outlets

以下範例說明如何使用入口和出口:

from airflow.composer.data_lineage.entities import BigQueryTable
from airflow.lineage import AUTO

...

bash_task = BashOperator(
    task_id="bash_task",
    dag=dag,
    bash_command="sleep 0",
    inlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table1",
        )
    ],
    outlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table2",
        )
    ],
)


def _python_task(task):
    print("Python task")


python_task = PythonOperator(
    task_id="python_task",
    dag=dag,
    python_callable=_python_task,
    inlets=[
        AUTO,
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table3",
        ),
    ],
    outlets=[
        BigQueryTable(
            project_id="example-project",
            dataset_id="dataset",
            table_id="table4",
        )
    ],
)

bash_task >> python_task

因此,在 Dataplex Universal Catalog UI 中建立了下列歷程圖:

Dataplex UI 中自訂事件的歷程圖範例。
圖 2. Dataplex Universal Catalog UI 中多個 BigQuery 資料表的歷代圖表範例。

在 Cloud Composer 中查看譜系記錄

您可以使用 Dataplex Universal Catalog 資料歷程整合專區的 環境設定頁面連結,查看與資料歷程相關的記錄。

疑難排解

如果歷程資料未回報至 Lineage API,或是您無法在 Dataplex Universal Catalog 中看到歷程資料,請嘗試下列疑難排解步驟:

  • 請確認 Cloud Composer 環境的專案已啟用 Data Lineage API。
  • 檢查 Cloud Composer 環境中是否已啟用資料歷程整合功能。
  • 請檢查您使用的運算子是否包含在自動追溯記錄回報支援功能中。請參閱「支援的 Airflow 運算子」。
  • 檢查 Cloud Composer 中的lineage 記錄,找出可能的問題。

後續步驟