更新 Cloud Composer 環境

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Apache Airflow 提供指令列介面 (CLI),可用於執行各種工作,例如觸發及管理 DAG、取得 DAG 執行作業和工作相關資訊、新增及刪除連線和使用者。

支援的 Airflow CLI 指令

Airflow 使用 Airflow 2 CLI 語法,相關說明請參閱 Airflow 說明文件。

如需支援的 Airflow CLI 指令完整清單,請參閱 gcloud composer environments run 指令的參考資料。

事前準備

  • 您必須具備權限,才能在 Cloud Composer 中使用 Google Cloud CLI 並執行 Airflow CLI 指令。

  • 在 2.4.0 以下版本的 Cloud Composer 中,您必須存取環境叢集的控制層,才能執行 Airflow CLI 指令。

執行 Airflow CLI 指令

如要在環境中執行 Airflow CLI 指令,請使用 gcloud CLI:

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    SUBCOMMAND \
    -- SUBCOMMAND_ARGUMENTS

請使用下列內容取代:

  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的地區。
  • SUBCOMMAND支援的 Airflow CLI 指令之一。
  • SUBCOMMAND_ARGUMENTS 搭配 Airflow CLI 指令的引數。

子指令引數分隔符

請使用 -- 分隔指定 Airflow CLI 指令的引數:

  • 將複合 CLI 指令指定為子指令。
  • -- 分隔符後方,指定複合指令的任何引數做為子指令引數。

範例:

gcloud composer environments run example-environment \
    dags list -- --output=json

預設位置

大部分的 gcloud composer 指令都需要位置。您可以使用 --location 標記指定位置,也可以設定預設位置

舉例來說,如要在 Cloud Composer 環境中觸發名稱為 sample_quickstart 且 ID 為 5077 的 DAG,請使用以下指令:

gcloud composer environments run example-environment \
    --location us-central1 dags trigger -- sample_quickstart \
    --run-id=5077

在私人 IP 環境中執行指令

在 2.4.0 以下的 Cloud Composer 版本中:

如要在私人 IP 環境中執行 Airflow CLI 指令,請在可存取 GKE 叢集控制層端點的機器上執行指令。可用的選項可能會因私人叢集設定而異。

如果您環境的叢集中停用公開端點存取權,就無法使用 gcloud composer 指令執行 Airflow CLI。如要執行 Airflow CLI 指令,請執行下列步驟:

  1. 在虛擬私有雲網路中建立 VM
  2. 取得叢集憑證。執行下列指令:

    gcloud container clusters get-credentials CLUSTER_NAME \
      --region REGION \
      --project PROJECT \
      --internal-ip
    

使用 kubectl 執行 Airflow 指令。例如:

kubectl exec deployment/airflow-scheduler -n COMPOSER_NAMESPACE \
  --container airflow-scheduler -- airflow dags list

COMPOSER_NAMESPACE 替換為類似以下的命名空間:composer-2-0-28-airflow-2-3-394zxc12411。您可以在工作負載清單中找到 Cloud Composer,也可以使用 kubectl get namespaces 指令。

如果環境叢集中已啟用公開端點存取權,您也可以透過具有已加入授權網路的外部 IP 位址的機器,執行 Airflow CLI 指令。如要啟用機器的存取權,請將機器的外部位址新增至環境的授權網路清單

執行 gcloud composer environments runkubectl 指令時,您可能會遇到下列錯誤:

Get "https://<IP Address>/api?timeout=32s": dial tcp <IP Address>:443: i/o timeout"

症狀:這則錯誤訊息表示執行這些指令的電腦沒有網路連線。

解決方法:請按照「在私人 IP 環境中執行指令」一節中的規範操作,或參考「kubectl 指令逾時」一節的操作說明。

透過 Cloud Composer API 執行 Airflow CLI 指令

自 Cloud Composer 2.4.0 版起,您可以透過 Cloud Composer API 執行 Airflow CLI 指令。

執行指令

建構 environments.executeAirflowCommand API 要求:

{
  "environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
  "command": "AIRFLOW_COMMAND",
  "subcommand": "AIRFLOW_SUBCOMMAND",
  "parameters": [
    "SUBCOMMAND_PARAMETER"
  ]
}

更改下列內容:

  • PROJECT_ID專案 ID
  • LOCATION:環境所在的地區。
  • ENVIRONMENT_NAME:環境名稱。
  • AIRFLOW_COMMAND:您要執行的 Airflow CLI 指令,例如 dags
  • AIRFLOW_SUBCOMMAND:您要執行的 Airflow CLI 指令的子指令,例如 list
  • (選用) SUBCOMMAND_PARAMETER:子指令的參數。如果您想使用多個參數,請在清單中新增更多項目。

範例:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
  "environment": "projects/example-project/locations/us-central1/environments/example-environment",
  "command": "dags",
  "subcommand": "list",
  "parameters": [
    "-o json",
    "--verbose"
  ]
}

輪詢指令狀態

透過 Cloud Composer API 執行 Airflow CLI 指令後,請發出 PollAirflowCommand 要求,並檢查 exitInfo 中的欄位是否有錯誤和狀態代碼,確認指令是否已順利完成。output 欄位包含記錄行。

如要取得指令執行狀態並擷取記錄,請提供 ExecuteAirflowCommandRequest 傳回的 executionIdpodpodNamespace 值:

範例:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
  "executionId": "39b82312-3a19-4d21-abac-7f8f19855ce7",
  "pod": "airflow-scheduler-1327d8cd68-hblpd",
  "podNamespace": "composer-2-4-0-airflow-2-5-3-184dadaf",
  "nextLineNumber": 1
}

後續步驟