使用 KubernetesPodOperator

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何使用 KubernetesPodOperator,將 Cloud Composer 中的 Kubernetes Pods 部署至 Google Kubernetes Engine 叢集。

KubernetesPodOperator 會在環境叢集中啟動 Kubernetes Pods。相比之下,Google Kubernetes Engine 操作員會在指定叢集中執行 Kubernetes Pod,該叢集可以是與環境無關的獨立叢集。您也可以使用 Google Kubernetes Engine 運算子建立及刪除叢集。

如果您需要下列功能,KubernetesPodOperator 會是理想選擇:

  • 無法透過公開 PyPI 存放區取得的自訂 Python 依附元件。
  • 在原始 Cloud Composer 工作站映像檔中無法使用的二進位依附元件。

事前準備

  • 如果使用 CNCF Kubernetes Provider 的 5.0.0 版,請按照 CNCF Kubernetes Provider 專節中的操作說明進行。

  • Cloud Composer 2 不支援 Pod 關聯設定。如果您想使用 Pod 相依性,請改用 GKE 運算子,在其他叢集中啟動 Pod。

關於 Cloud Composer 2 中的 KubernetesPodOperator

本節說明 KubernetesPodOperator 在 Cloud Composer 2 中的運作方式。

資源使用情況

在 Cloud Composer 2 中,環境的叢集會自動調整資源配置。您使用 KubernetesPodOperator 執行的額外工作負載,會獨立於環境進行擴充。

環境不會受到資源需求增加的影響,但環境的叢集會根據資源需求進行升降。

在環境叢集中執行的額外工作負載,其定價會採用 Cloud Composer 2 定價模式,並使用 Cloud Composer 運算 SKU

Cloud Composer 2 使用 Autopilot 叢集,其中引入了運算類別的概念:

  • Cloud Composer 僅支援 general-purpose 運算類別。

  • 根據預設,如果您未選取任何類別,則在使用 KubernetesPodOperator 建立 Pod 時,系統會假設為 general-purpose 類別。

  • 每個類別都與特定屬性和資源限制相關聯,您可以參閱 Autopilot 說明文件瞭解相關資訊。舉例來說,在 general-purpose 類別中執行的 Pod 最多可使用 110 GiB 記憶體。

存取專案資源

Cloud Composer 2 會使用 GKE 叢集,並搭配 Workload Identity Federation for GKE。在 composer-user-workloads 命名空間中執行的 Pod 可以存取專案中的 Google Cloud 資源,無須額外設定。環境的服務帳戶會用於存取這些資源。

如果您想使用自訂命名空間,則與此命名空間相關聯的 Kubernetes 服務帳戶必須對應至 Google Cloud 服務帳戶,才能為 Google API 和其他服務的要求啟用服務身分授權。如果您在環境叢集中的自訂命名空間中執行 Pod,則不會建立 Kubernetes 和Google Cloud 服務帳戶之間的 IAM 繫結,這些 Pod 也無法存取您的 Google Cloud 專案資源。

如果您使用自訂命名空間,且希望 Pod 能存取Google Cloud 資源,請按照「Workload Identity Federation for GKE」中的指示設定自訂命名空間的繫結:

  1. 在環境叢集中建立不同的命名空間。
  2. 在自訂命名空間 Kubernetes 服務帳戶和環境的服務帳戶之間建立繫結。
  3. 將環境的服務帳戶註解加入 Kubernetes 服務帳戶。
  4. 使用 KubernetesPodOperator 時,請在 namespaceservice_account_name 參數中指定命名空間和 Kubernetes 服務帳戶。

希望將設定需求減至最低

如要建立 KubernetesPodOperator,只需要 Pod 的 name、要使用的 image,以及 task_id 參數。/home/airflow/composer_kube_config 包含用於向 GKE 進行驗證的憑證。

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`. Always use the
    # `composer-user-workloads` namespace with Composer 3.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

額外設定

本例顯示可在 KubernetesPodOperator 中設定的其他參數。

詳情請參閱下列資源:

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The Docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container.
    # The env_vars parameter is templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

使用 Jinja 範本

Airflow 支援 DAG 中的 Jinja 範本

您必須使用運算子宣告必要的 Airflow 參數 (task_idnameimage)。如以下範例所示,您可以使用 Jinja 建立所有其他參數的範本,包括 cmdsargumentsenv_varsconfig_file

範例中的 env_vars 參數是從名為 my_valueAirflow 變數設定。範例 DAG 會從 Airflow 中的 vars 範本變數取得值。Airflow 提供更多變數,可存取不同類型的資訊。舉例來說,您可以使用 conf 範本變數存取 Airflow 設定選項的值。如需詳細資訊和 Airflow 中可用的變數清單,請參閱 Airflow 說明文件中的「範本參考資料」。

如果不變更 DAG 或建立 env_vars 變數,範例中的 ex-kube-templates 工作會失敗,因為變數不存在。您可以在 Airflow 使用者介面或 Google Cloud CLI 中建立這個變數:

Airflow UI

  1. 前往 Airflow UI

  2. 在工具列中,依序選取「管理」>「變數」

  3. 在「清單變數」頁面中,按一下「新增記錄」

  4. 在「新增變數」頁面中輸入下列資訊:

    • 鍵:my_value
    • Val:example_value
  5. 按一下 [儲存]

gcloud

輸入下列指令:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

取代:

  • ENVIRONMENT 替換為環境的名稱。
  • LOCATION 改成環境所在的地區。

以下範例說明如何搭配 KubernetesPodOperator 使用 Jinja 範本:

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below can be templated with Jinja. For more information
    # and the list of variables available in Airflow, see
    # the Airflow templates reference:
    # https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in Jinja is the execution date as YYYY-MM-DD, this Docker image
    # will echo the execution date. Arguments to the entrypoint. The Docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI. The env_vars parameter
    # is templated.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Specifies path to Kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

使用 Kubernetes Secret 和 ConfigMap

Kubernetes Secret 是包含機密資料的物件。Kubernetes ConfigMap 是物件,可在鍵/值組合中包含非機密資料。

在 Cloud Composer 2 中,您可以使用 Google Cloud CLI、API 或 Terraform 建立機密資料和 ConfigMap,然後透過 KubernetesPodOperator 存取。

關於 YAML 設定檔

使用 Google Cloud CLI 和 API 建立 Kubernetes 密鑰或 ConfigMap 時,您必須提供 YAML 格式的檔案。這個檔案必須遵循 Kubernetes 密鑰和 ConfigMap 使用的格式。Kubernetes 說明文件提供許多 ConfigMap 和 Secret 的程式碼範例。如要開始使用,請參閱「使用 Secret 安全地發布憑證」頁面和 ConfigMaps

與 Kubernetes 密鑰相同,在密鑰中定義值時,請使用 Base64 表示法。

如要編碼值,您可以使用下列指令 (這是取得 Base64 編碼值的眾多方法之一):

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

輸出:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

以下兩個 YAML 檔案範例會在本指南後續的範例中使用。Kubernetes 機密資料的 YAML 設定檔範例:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

以下是另一個示例,說明如何納入檔案。與前一個範例相同,請先對檔案內容進行編碼 (cat ./key.json | base64),然後在 YAML 檔案中提供這個值:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

ConfigMap 的 YAML 設定檔範例。您不需要在 ConfigMap 中使用 base64 表示法:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

管理 Kubernetes 密鑰

在 Cloud Composer 2 中,您可以使用 Google Cloud CLI 和 kubectl 建立機密:

  1. 取得環境叢集的相關資訊:

    1. 執行下列指令:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      取代:

      • ENVIRONMENT 改為環境名稱。
      • LOCATION 與 Cloud Composer 環境所在的地區。

      這個指令的輸出格式如下: projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>

    2. 如要取得 GKE 叢集 ID,請複製 /clusters/ 後方的輸出內容 (結尾為 -gke)。

  2. 使用下列指令連線至 GKE 叢集:

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --region LOCATION
    

    更改下列內容:

    • CLUSTER_ID:環境的叢集 ID。
    • PROJECT_ID專案 ID
    • LOCATION:環境所在的地區。

  3. 建立 Kubernetes 密鑰:

    以下指令示範兩種建立 Kubernetes 機密值的方法。--from-literal 方法會使用鍵/值組合。--from-file 方法會使用檔案內容。

    • 如要透過提供鍵/值組合建立 Kubernetes 密鑰,請執行下列指令。這個範例會建立名為 airflow-secrets 的 Secret,其中包含值為 test_valuesql_alchemy_conn 欄位。

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
      

    • 如要透過提供檔案內容建立 Kubernetes 密鑰,請執行下列指令。這個範例會建立名為 service-account 的 Secret,其中的 service-account.json 欄位會從本機 ./key.json 檔案的內容擷取值。

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json -n composer-user-workloads
      

在 DAG 中使用 Kubernetes Secret

本範例說明使用 Kubernetes Secret 的兩種方式:做為環境變數,以及做為 Pod 掛載的磁碟區。

第一個密鑰 airflow-secrets 會設為名為 SQL_CONN 的 Kubernetes 環境變數 (而非 Airflow 或 Cloud Composer 環境變數)。

第二個密鑰 service-account 會將 service-account.json 掛載至 /var/secrets/googleservice-account.json 是含有服務帳戶權杖的檔案。

密鑰物件如下所示:

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

第一個 Kubernetes Secret 的名稱會在 secret_env 變數中定義。這個 Secret 的名稱為 airflow-secretsdeploy_type 參數會指定該參數必須以環境變數形式公開。環境變數的名稱為 SQL_CONN,如 deploy_target 參數所指定。最後,將 SQL_CONN 環境變數的值設為 sql_alchemy_conn 鍵的值。

secret_volume 變數中定義第二個 Kubernetes 密鑰的名稱。這個 Secret 的名稱為 service-account。系統會將其公開為音量,如 deploy_type 參數所指定。要掛載的檔案路徑 deploy_target/var/secrets/google。最後,儲存在 deploy_target 中的 Secret 的 keyservice-account.json

運算子設定如下所示:

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. The env_vars parameter is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

CNCF Kubernetes 供應商相關資訊

KubernetesPodOperator 是在 apache-airflow-providers-cncf-kubernetes 供應器中實作。

如要查看 CNCF Kubernetes 供應商的詳細版本資訊,請前往 CNCF Kubernetes 供應商網站

6.0.0 版

在 CNCF Kubernetes Provider 套件的 6.0.0 版中,KubernetesPodOperator 預設會使用 kubernetes_default 連線。

如果您在 5.0.0 版中指定自訂連線,運算子仍會使用這項自訂連線。如要改回使用 kubernetes_default 連線,建議您調整 DAG。

5.0.0 版

與 4.4.0 版本相比,這個版本引入了一些回溯不相容的變更。其中最重要的是與 kubernetes_default 連線相關,但 5.0.0 版並未使用這項連線。

  • kubernetes_default 連線需要修改。Kubernetes 設定路徑必須設為 /home/airflow/composer_kube_config (如下圖所示)。或者,您必須將 config_file 新增至 KubernetesPodOperator 設定 (如以下程式碼範例所示)。
Airflow UI 中的 Kube 設定路徑欄位
圖 1. Airflow UI,修改 kubernetes_default 連線 (按一下可放大)
  • 請按照下列方式修改使用 KubernetesPodOperator 的任務程式碼:
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

如要進一步瞭解 5.0.0 版,請參閱 CNCF Kubernetes Provider Release Notes

疑難排解

本節提供常見 KubernetesPodOperator 問題的疑難排解建議:

查看記錄

排解問題時,您可以按照以下順序查看記錄:

  1. Airflow 工作記錄:

    1. 前往 Google Cloud 控制台的「Environments」頁面。

      前往「環境」

    2. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

    3. 前往「DAG」分頁。

    4. 按一下 DAG 名稱,然後點選 DAG 執行作業,即可查看詳細資料和記錄。

  2. Airflow 排程器記錄:

    1. 前往「環境詳細資料」頁面。

    2. 前往「Logs」分頁。

    3. 檢查 Airflow 排程器記錄。

  3. Google Cloud 控制台的 GKE 工作負載下方有 Pod 記錄檔。這些記錄包括 Pod 定義 YAML 檔案、Pod 事件和 Pod 詳細資料。

非零值的傳回代碼

使用 KubernetesPodOperator (和 GKEStartPodOperator) 時,容器進入點的傳回代碼會決定工作是否視為成功。非零值的傳回代碼表示失敗。

常見的模式是執行殼層指令碼做為容器進入點,以便將容器內的多項作業分組。

如果您要編寫這類指令碼,建議您在指令碼頂端加入 set -e 指令,以便指令碼中的失敗指令終止指令碼,並將失敗情形傳播至 Airflow 工作例項。

Pod 逾時

KubernetesPodOperator 的預設逾時時間為 120 秒,可能會導致在下載較大圖片之前發生逾時。建立 KubernetesPodOperator 時,您可以變更 startup_timeout_seconds 參數來增加逾時時間。

Pod 逾時時,Airflow UI 會顯示專屬於該工作記錄。例如:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

如果 Cloud Composer 服務帳戶缺少執行當前任務所需的 IAM 權限,也會發生 Pod 逾時。如要確認這項資訊,請使用 GKE 資訊主頁查看 Pod 層級錯誤,以便查看特定工作負載的記錄檔,或使用 Cloud Logging。

無法建立新連線

GKE 叢集預設會啟用自動升級功能。如果節點集區位於升級中的叢集中,您可能會看到以下錯誤:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

如要確認叢集是否正在升級,請在 Google Cloud 控制台中前往「Kubernetes clusters」(Kubernetes 叢集) 頁面,然後查看環境叢集名稱旁邊的載入圖示。

後續步驟