Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁面說明如何在環境中啟用可延遲運算子的支援功能,並在 DAG 中使用可延遲運算子。 Google Cloud
關於 Cloud Composer 中的可延後運算子
如果您至少有一個觸發器執行個體 (在高復原力環境中至少有兩個),即可在 DAG 中使用可延遲運算子和觸發條件。
對於可延後執行的運算子,Airflow 會將工作執行作業分成下列階段:
開始作業。在這個階段,工作會佔用 Airflow 工作站的空缺。工作會執行作業,將工作委派給其他服務。
舉例來說,執行 BigQuery 工作可能需要幾秒到幾小時的時間。建立工作後,作業會將工作 ID (BigQuery 工作 ID) 傳遞至 Airflow 觸發事件。
觸發事件會監控工作,直到工作完成為止。在這個階段,worker 未佔用任何位置。Airflow 觸發器採用非同步架構,可處理數百個這類工作。當觸發條件偵測到工作已完成時,就會傳送事件,觸發最後一個階段。
在最後一個階段,Airflow 工作站會執行回呼。舉例來說,這個回呼可以將工作標示為成功,或執行其他作業,並將工作重新設為由觸發器監控。
觸發器是無狀態的,因此可在中斷或重新啟動時復原。因此,長時間執行的工作可在 Pod 重新啟動時復原,除非重新啟動發生在最後階段,而這個階段預期會很短。
事前準備
- 可延後運算子和感應器可在 Cloud Composer 2 環境中使用,且需要以下項目:
- Cloud Composer 2.0.31 以上版本
- Airflow 2.2.5、2.3.3 以上版本
啟用延後運算子的支援功能
名為 Airflow 觸發器的環境元件會以非同步方式監控環境中的所有延後工作。這類工作完成延遲作業後,觸發器會將工作傳遞給 Airflow 工作站。
您必須在環境中至少有一個觸發器執行個體 (在高彈性環境中至少要有兩個),才能在 DAG 中使用可延後模式。您可以在建立環境時設定觸發條件,也可以調整現有環境的觸發條件和效能參數數量。
Google Cloud 支援延後模式的運算子
只有部分 Airflow 運算子已擴充,可支援延後模式。以下清單是 airflow.providers.google.operators.cloud
套件中支援延後模式的運算子參考資料。列出最低所需 airflow.providers.google.operators.cloud
套件版本的資料欄,代表該運算子支援延後模式的最早套件版本。
Cloud Composer 運算子
運算子名稱 | 所需的 apache-airflow-providers-google 版本 |
---|---|
CloudComposerCreateEnvironmentOperator | 6.4.0 |
CloudComposerDeleteEnvironmentOperator | 6.4.0 |
CloudComposerUpdateEnvironmentOperator | 6.4.0 |
BigQuery 運算子
運算子名稱 | 所需的 apache-airflow-providers-google 版本 |
---|---|
BigQueryCheckOperator | 8.4.0 |
BigQueryValueCheckOperator | 8.4.0 |
BigQueryIntervalCheckOperator | 8.4.0 |
BigQueryGetDataOperator | 8.4.0 |
BigQueryInsertJobOperator | 8.4.0 |
BigQuery 資料移轉服務運算子
運算子名稱 | 所需的 apache-airflow-providers-google 版本 |
---|---|
BigQueryDataTransferServiceStartTransferRunsOperator | 8.9.0 |
Cloud Build 運算子
運算子名稱 | 所需的 apache-airflow-providers-google 版本 |
---|---|
CloudBuildCreateBuildOperator | 8.7.0 |
Cloud SQL 運算子
運算子名稱 | 所需的 apache-airflow-providers-google 版本 |
---|---|
CloudSQLExportInstanceOperator | 10.3.0 |
Dataflow 運算子
運算子名稱 | 所需的 apache-airflow-providers-google 版本 |
---|---|
DataflowTemplatedJobStartOperator | 8.9.0 |
DataflowStartFlexTemplateOperator | 8.9.0 |
Cloud Data Fusion 運算子
運算子名稱 | 所需的 apache-airflow-providers-google 版本 |
---|---|
CloudDataFusionStartPipelineOperator | 8.9.0 |
Dataproc 運算子
運算子名稱 | 所需的 apache-airflow-providers-google 版本 |
---|---|
DataprocCreateClusterOperator | 8.9.0 |
DataprocDeleteClusterOperator | 8.9.0 |
DataprocJobBaseOperator | 8.4.0 |
DataprocInstantiateWorkflowTemplateOperator | 9.0.0 |
DataprocInstantiateInlineWorkflowTemplateOperator | 10.1.0 |
DataprocSubmitJobOperator | 8.4.0 |
DataprocUpdateClusterOperator | 8.9.0 |
DataprocCreateBatchOperator | 8.9.0 |
Google Kubernetes Engine 運算子
運算子名稱 | 所需的 apache-airflow-providers-google 版本 |
---|---|
GKEDeleteClusterOperator | 9.0.0 |
GKECreateClusterOperator | 9.0.0 |
AI 平台營運人員
運算子名稱 | 所需的 apache-airflow-providers-google 版本 |
---|---|
MLEngineStartTrainingJobOperator | 8.9.0 |
在 DAG 中使用可延遲運算子
所有 Google Cloud 運算子的共同規則,就是使用 deferrable
布林參數啟用可延遲模式。如果 Google Cloud運算子沒有這個參數,就無法在延後模式下執行。其他運算子可能會採用不同的慣例。舉例來說,部分社群經營者會使用名稱中帶有 Async
後置字元的獨立類別。
以下 DAG 範例會在可延遲模式中使用 DataprocSubmitJobOperator
運算子:
PYSPARK_JOB = {
"reference": { "project_id": "PROJECT_ID" },
"placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },
"pyspark_job": {
"main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"
},
}
DataprocSubmitJobOperator(
task_id="dataproc-deferrable-example",
job=PYSPARK_JOB,
deferrable=True,
)
查看觸發器記錄檔
觸發器會產生記錄,可與其他環境元件的記錄一併使用。如要進一步瞭解如何查看環境記錄,請參閱「查看記錄」。
監控觸發器
如要進一步瞭解如何監控觸發器元件,請參閱「Airflow 指標」。
除了監控觸發器之外,您也可以在環境的 Monitoring 資訊主頁中,查看「Unfinished Task」指標中的延遲工作數量。