在 DAG 中使用可延遲運算子

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何在環境中啟用可延遲運算子的支援功能,並在 DAG 中使用可延遲運算子。 Google Cloud

關於 Cloud Composer 中的可延後運算子

如果您至少有一個觸發器執行個體 (在高復原力環境中至少有兩個),即可在 DAG 中使用可延遲運算子和觸發條件

對於可延後執行的運算子,Airflow 會將工作執行作業分成下列階段:

  1. 開始作業。在這個階段,工作會佔用 Airflow 工作站的空缺。工作會執行作業,將工作委派給其他服務。

    舉例來說,執行 BigQuery 工作可能需要幾秒到幾小時的時間。建立工作後,作業會將工作 ID (BigQuery 工作 ID) 傳遞至 Airflow 觸發事件。

  2. 觸發事件會監控工作,直到工作完成為止。在這個階段,worker 未佔用任何位置。Airflow 觸發器採用非同步架構,可處理數百個這類工作。當觸發條件偵測到工作已完成時,就會傳送事件,觸發最後一個階段。

  3. 在最後一個階段,Airflow 工作站會執行回呼。舉例來說,這個回呼可以將工作標示為成功,或執行其他作業,並將工作重新設為由觸發器監控。

觸發器是無狀態的,因此可在中斷或重新啟動時復原。因此,長時間執行的工作可在 Pod 重新啟動時復原,除非重新啟動發生在最後階段,而這個階段預期會很短。

事前準備

  • 可延後運算子和感應器可在 Cloud Composer 2 環境中使用,且需要以下項目:
    • Cloud Composer 2.0.31 以上版本
    • Airflow 2.2.5、2.3.3 以上版本

啟用延後運算子的支援功能

名為 Airflow 觸發器的環境元件會以非同步方式監控環境中的所有延後工作。這類工作完成延遲作業後,觸發器會將工作傳遞給 Airflow 工作站。

您必須在環境中至少有一個觸發器執行個體 (在高彈性環境中至少要有兩個),才能在 DAG 中使用可延後模式。您可以在建立環境時設定觸發條件,也可以調整現有環境的觸發條件和效能參數數量

Google Cloud 支援延後模式的運算子

只有部分 Airflow 運算子已擴充,可支援延後模式。以下清單是 airflow.providers.google.operators.cloud 套件中支援延後模式的運算子參考資料。列出最低所需 airflow.providers.google.operators.cloud 套件版本的資料欄,代表該運算子支援延後模式的最早套件版本。

Cloud Composer 運算子

運算子名稱所需的 apache-airflow-providers-google 版本
CloudComposerCreateEnvironmentOperator 6.4.0
CloudComposerDeleteEnvironmentOperator 6.4.0
CloudComposerUpdateEnvironmentOperator 6.4.0

BigQuery 運算子

運算子名稱所需的 apache-airflow-providers-google 版本
BigQueryCheckOperator 8.4.0
BigQueryValueCheckOperator 8.4.0
BigQueryIntervalCheckOperator 8.4.0
BigQueryGetDataOperator 8.4.0
BigQueryInsertJobOperator 8.4.0

BigQuery 資料移轉服務運算子

運算子名稱所需的 apache-airflow-providers-google 版本
BigQueryDataTransferServiceStartTransferRunsOperator 8.9.0

Cloud Build 運算子

運算子名稱所需的 apache-airflow-providers-google 版本
CloudBuildCreateBuildOperator 8.7.0

Cloud SQL 運算子

運算子名稱所需的 apache-airflow-providers-google 版本
CloudSQLExportInstanceOperator 10.3.0

Dataflow 運算子

運算子名稱所需的 apache-airflow-providers-google 版本
DataflowTemplatedJobStartOperator 8.9.0
DataflowStartFlexTemplateOperator 8.9.0

Cloud Data Fusion 運算子

運算子名稱所需的 apache-airflow-providers-google 版本
CloudDataFusionStartPipelineOperator 8.9.0

Google Kubernetes Engine 運算子

運算子名稱所需的 apache-airflow-providers-google 版本
GKEDeleteClusterOperator 9.0.0
GKECreateClusterOperator 9.0.0

AI 平台營運人員

運算子名稱所需的 apache-airflow-providers-google 版本
MLEngineStartTrainingJobOperator 8.9.0

在 DAG 中使用可延遲運算子

所有 Google Cloud 運算子的共同規則,就是使用 deferrable 布林參數啟用可延遲模式。如果 Google Cloud運算子沒有這個參數,就無法在延後模式下執行。其他運算子可能會採用不同的慣例。舉例來說,部分社群經營者會使用名稱中帶有 Async 後置字元的獨立類別。

以下 DAG 範例會在可延遲模式中使用 DataprocSubmitJobOperator 運算子:

PYSPARK_JOB = {
    "reference": { "project_id": "PROJECT_ID" },
    "placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
    "pyspark_job": {
        "main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
    },
}

DataprocSubmitJobOperator(
        task_id="dataproc-deferrable-example",
        job=PYSPARK_JOB,
        deferrable=True,
    )

查看觸發器記錄檔

觸發器會產生記錄,可與其他環境元件的記錄一併使用。如要進一步瞭解如何查看環境記錄,請參閱「查看記錄」。

監控觸發器

如要進一步瞭解如何監控觸發器元件,請參閱「Airflow 指標」。

除了監控觸發器之外,您也可以在環境的 Monitoring 資訊主頁中,查看「Unfinished Task」指標中的延遲工作數量。

後續步驟