使用 KubernetesPodOperator

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何使用 KubernetesPodOperator,將 Cloud Composer 中的 Kubernetes Pods 部署至 Google Kubernetes Engine 叢集。

KubernetesPodOperator 會在環境叢集中啟動 Kubernetes Pods。相比之下,Google Kubernetes Engine 操作員會在指定叢集中執行 Kubernetes Pod,該叢集可以是與環境無關的獨立叢集。您也可以使用 Google Kubernetes Engine 運算子建立及刪除叢集。

如果您需要下列功能,KubernetesPodOperator 會是理想選擇:

  • 無法透過公開 PyPI 存放區取得的自訂 Python 依附元件。
  • 在原始 Cloud Composer 工作站映像檔中無法使用的二進位依附元件。

事前準備

  • 建議您使用最新版本的 Cloud Composer。至少必須支援這個版本,做為淘汰和支援政策的一部分。
  • 確認環境有足夠的資源。將 Pod 啟動至資源不足的環境,可能會導致 Airflow 工作站和 Airflow 排程器發生錯誤。

設定 Cloud Composer 環境資源

建立 Cloud Composer 環境時,您必須指定其效能參數,包括環境叢集的效能參數。將 Kubernetes Pod 啟動至環境叢集,可能會導致叢集資源 (例如 CPU 或記憶體) 競爭。由於 Airflow 排程器和工作站位於同一個 GKE 叢集,如果競爭導致資源耗盡,排程器和工作站就無法正常運作。

如要避免資源耗盡,請採取下列一或多項操作:

建立節點集區

如要避免 Cloud Composer 環境中的資源耗盡,建議您建立新的節點集區,並設定 Kubernetes Pod,只使用該集區的資源執行。

主控台

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 按一下環境名稱。

  3. 在「Environment details」頁面中,前往「Environment configuration」分頁。

  4. 在「Resources」>「GKE cluster」部分,點選「view cluster details」連結。

  5. 按照「新增節點集區」一文的指示建立節點集區。

gcloud

  1. 判斷環境叢集的名稱:

    gcloud composer environments describe ENVIRONMENT_NAME \
      --location LOCATION \
      --format="value(config.gkeCluster)"
    

    取代:

    • ENVIRONMENT_NAME 替換為環境的名稱。
    • LOCATION 改成環境所在的地區。
  2. 輸出結果包含環境叢集的名稱。例如 europe-west3-example-enviro-af810e25-gke

  3. 按照「新增節點集區」一文的指示建立節點集區。

增加環境中的節點數

增加 Cloud Composer 環境中的節點數量,可增加工作負載可用的運算能力。這項增加作業不會為需要比指定機器類型提供的 CPU 或 RAM 更多資源的任務提供額外資源。

如要增加節點數,請更新環境

指定適當的機器類型

在 Cloud Composer 建立環境時,您可以指定機器類型。為確保資源可用,請針對 Cloud Composer 環境中發生的運算類型指定機器類型

希望將設定需求減至最低

如要建立 KubernetesPodOperator,只需要 Pod 的 name、要使用的 image,以及 task_id 參數。/home/airflow/composer_kube_config 包含用於向 GKE 進行驗證的憑證。

Airflow 2

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. In Composer 1 there is the potential for
    # the resource starvation of Airflow workers and scheduler
    # within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources,
    # and using Composer 2 will mean the environment will autoscale.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Airflow 1

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Pod 相依性設定

在 KubernetesPodOperator 中設定 affinity 參數時,您可以控制要將 Pod 排程到哪些節點,例如只在特定節點集區中的節點。在這個範例中,運算子只會在名為 pool-0pool-1 的節點集區上執行。Cloud Composer 1 環境節點位於 default-pool 中,因此 Pod 不會在環境中的節點上執行。

Airflow 2

# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Airflow 1

kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

由於範例已設定,因此工作會失敗。查看記錄後,您會發現由於節點集區 pool-0pool-1 不存在,因此任務會失敗。

如要確保 values 中的節點集區存在,請進行下列任一設定變更:

  • 如果您先前已建立節點集區,請將 pool-0pool-1 替換為節點集區的名稱,然後再次上傳 DAG。

  • 建立名為 pool-0pool-1 的節點集區。您可以同時建立這兩個,但任務只需要其中一個即可成功。

  • pool-0pool-1 替換為 default-pool,這是 Airflow 使用的預設集區。然後再次上傳 DAG。

變更完成後,請稍候幾分鐘,讓環境更新。然後再次執行 ex-pod-affinity 工作,並確認 ex-pod-affinity 工作是否成功。

額外設定

本例顯示可在 KubernetesPodOperator 中設定的其他參數。

詳情請參閱下列資源:

Airflow 2

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        limits={"memory": "250M", "cpu": "100m"},
    ),
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Airflow 1

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={"limit_memory": "250M", "limit_cpu": "100m"},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={},
)

使用 Jinja 範本

Airflow 支援 DAG 中的 Jinja 範本

您必須使用運算子宣告必要的 Airflow 參數 (task_idnameimage)。如以下範例所示,您可以使用 Jinja 建立所有其他參數的範本,包括 cmdsargumentsenv_varsconfig_file

範例中的 env_vars 參數是從名為 my_valueAirflow 變數設定。範例 DAG 會從 Airflow 中的 vars 範本變數取得值。Airflow 提供更多變數,可存取不同類型的資訊。舉例來說,您可以使用 conf 範本變數存取 Airflow 設定選項的值。如需詳細資訊和 Airflow 中可用的變數清單,請參閱 Airflow 說明文件中的「範本參考資料」。

如果不變更 DAG 或建立 env_vars 變數,範例中的 ex-kube-templates 工作會失敗,因為變數不存在。您可以在 Airflow 使用者介面或 Google Cloud CLI 中建立這個變數:

Airflow UI

  1. 前往 Airflow UI

  2. 在工具列中,依序選取「管理」>「變數」

  3. 在「清單變數」頁面中,按一下「新增記錄」

  4. 在「新增變數」頁面中輸入下列資訊:

    • 鍵:my_value
    • Val:example_value
  5. 按一下 [儲存]

如果您的環境使用 Airflow 1,請改為執行下列指令:

  1. 前往 Airflow UI

  2. 在工具列中,依序選取「管理」>「變數」

  3. 在「變數」頁面中,按一下「建立」分頁。

  4. 在「變數」頁面中輸入下列資訊:

    • 鍵:my_value
    • Val:example_value
  5. 按一下 [儲存]

gcloud

輸入下列指令:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

如果您的環境使用 Airflow 1,請改為執行下列指令:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set my_value example_value

取代:

  • ENVIRONMENT 替換為環境的名稱。
  • LOCATION 改成環境所在的地區。

以下範例說明如何搭配 KubernetesPodOperator 使用 Jinja 範本:

Airflow 2

kubenetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Airflow 1

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

使用 Kubernetes Secret 和 ConfigMap

Kubernetes Secret 是包含機密資料的物件。Kubernetes ConfigMap 是物件,可在鍵/值組合中包含非機密資料。

在 Cloud Composer 2 中,您可以使用 Google Cloud CLI、API 或 Terraform 建立機密資料和 ConfigMap,然後透過 KubernetesPodOperator 存取。

關於 YAML 設定檔

使用 Google Cloud CLI 和 API 建立 Kubernetes 密鑰或 ConfigMap 時,您必須提供 YAML 格式的檔案。這個檔案必須遵循 Kubernetes 密鑰和 ConfigMap 使用的格式。Kubernetes 說明文件提供許多 ConfigMap 和 Secret 的程式碼範例。如要開始使用,請參閱「使用 Secret 安全地發布憑證」頁面和 ConfigMaps

與 Kubernetes 密鑰相同,在密鑰中定義值時,請使用 Base64 表示法。

如要編碼值,您可以使用下列指令 (這是取得 Base64 編碼值的眾多方法之一):

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

輸出:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

以下兩個 YAML 檔案範例會在本指南後續的範例中使用。Kubernetes 機密資料的 YAML 設定檔範例:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

以下是另一個示例,說明如何納入檔案。與前一個範例相同,請先對檔案內容進行編碼 (cat ./key.json | base64),然後在 YAML 檔案中提供這個值:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

ConfigMap 的 YAML 設定檔範例。您不需要在 ConfigMap 中使用 base64 表示法:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

管理 Kubernetes 密鑰

在 Cloud Composer 2 中,您可以使用 Google Cloud CLI 和 kubectl 建立機密:

  1. 取得環境叢集的相關資訊:

    1. 執行下列指令:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      取代:

      • ENVIRONMENT 改為環境名稱。
      • LOCATION 與 Cloud Composer 環境所在的地區。

      這個指令的輸出格式如下: projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>

    2. 如要取得 GKE 叢集 ID,請複製 /clusters/ 後方的輸出內容 (結尾為 -gke)。

    3. 如要取得區域,請複製 /zones/ 後面的輸出內容。

  2. 使用下列指令連線至 GKE 叢集:

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --zone ZONE
    

    取代:

    • CLUSTER_ID:環境的叢集 ID。
    • PROJECT_ID專案 ID
    • ZONE 替換成環境叢集所在的區域。
  3. 建立 Kubernetes 密鑰:

    以下指令示範兩種建立 Kubernetes 機密值的方法。--from-literal 方法會使用鍵/值組合。--from-file 方法會使用檔案內容。

    • 如要透過提供鍵/值組合建立 Kubernetes 密鑰,請執行下列指令。這個範例會建立名為 airflow-secrets 的 Secret,其中包含值為 test_valuesql_alchemy_conn 欄位。

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value
      
    • 如要透過提供檔案內容建立 Kubernetes 密鑰,請執行下列指令。這個範例會建立名為 service-account 的 Secret,其中的 service-account.json 欄位會從本機 ./key.json 檔案的內容擷取值。

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json
      

在 DAG 中使用 Kubernetes Secret

本範例說明使用 Kubernetes Secret 的兩種方式:做為環境變數,以及做為 Pod 掛載的磁碟區。

第一個密鑰 airflow-secrets 會設為名為 SQL_CONN 的 Kubernetes 環境變數 (而非 Airflow 或 Cloud Composer 環境變數)。

第二個密鑰 service-account 會將 service-account.json 掛載至 /var/secrets/googleservice-account.json 是含有服務帳戶權杖的檔案。

密鑰物件如下所示:

Airflow 2

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Airflow 1

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

第一個 Kubernetes Secret 的名稱會在 secret_env 變數中定義。這個 Secret 的名稱為 airflow-secretsdeploy_type 參數會指定該參數必須以環境變數形式公開。環境變數的名稱為 SQL_CONN,如 deploy_target 參數所指定。最後,將 SQL_CONN 環境變數的值設為 sql_alchemy_conn 鍵的值。

secret_volume 變數中定義第二個 Kubernetes 密鑰的名稱。這個 Secret 的名稱為 service-account。系統會將其公開為音量,如 deploy_type 參數所指定。要掛載的檔案路徑 deploy_target/var/secrets/google。最後,儲存在 deploy_target 中的 Secret 的 keyservice-account.json

運算子設定如下所示:

Airflow 2

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Airflow 1

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

CNCF Kubernetes 供應商相關資訊

KubernetesPodOperator 是在 apache-airflow-providers-cncf-kubernetes 供應器中實作。

如要查看 CNCF Kubernetes 供應商的詳細版本資訊,請前往 CNCF Kubernetes 供應商網站

6.0.0 版

在 CNCF Kubernetes Provider 套件的 6.0.0 版中,KubernetesPodOperator 預設會使用 kubernetes_default 連線。

如果您在 5.0.0 版中指定自訂連線,運算子仍會使用這項自訂連線。如要改回使用 kubernetes_default 連線,建議您調整 DAG。

5.0.0 版

與 4.4.0 版本相比,這個版本引入了一些回溯不相容的變更。其中最重要的是與 kubernetes_default 連線相關,但 5.0.0 版並未使用這項連線。

  • kubernetes_default 連線需要修改。Kubernetes 設定路徑必須設為 /home/airflow/composer_kube_config (如下圖所示)。或者,您必須將 config_file 新增至 KubernetesPodOperator 設定 (如以下程式碼範例所示)。
Airflow UI 中的 Kube 設定路徑欄位
圖 1. Airflow UI,修改 kubernetes_default 連線 (按一下可放大)
  • 請按照下列方式修改使用 KubernetesPodOperator 的任務程式碼:
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

如要進一步瞭解 5.0.0 版,請參閱 CNCF Kubernetes Provider Release Notes

疑難排解

本節提供常見 KubernetesPodOperator 問題的疑難排解建議:

查看記錄

排解問題時,您可以按照以下順序查看記錄:

  1. Airflow 工作記錄:

    1. 前往 Google Cloud 控制台的「Environments」頁面。

      前往「環境」

    2. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

    3. 前往「DAG」分頁。

    4. 按一下 DAG 名稱,然後點選 DAG 執行作業,即可查看詳細資料和記錄。

  2. Airflow 排程器記錄:

    1. 前往「環境詳細資料」頁面。

    2. 前往「Logs」分頁。

    3. 檢查 Airflow 排程器記錄。

  3. Google Cloud 控制台的 GKE 工作負載下方有 Pod 記錄檔。這些記錄包括 Pod 定義 YAML 檔案、Pod 事件和 Pod 詳細資料。

非零值的傳回代碼

使用 KubernetesPodOperator (和 GKEStartPodOperator) 時,容器進入點的傳回代碼會決定工作是否視為成功。非零值的傳回代碼表示失敗。

常見的模式是執行殼層指令碼做為容器進入點,以便將容器內的多項作業分組。

如果您要編寫這類指令碼,建議您在指令碼頂端加入 set -e 指令,以便指令碼中的失敗指令終止指令碼,並將失敗情形傳播至 Airflow 工作例項。

Pod 逾時

KubernetesPodOperator 的預設逾時時間為 120 秒,可能會導致在下載較大圖片之前發生逾時。建立 KubernetesPodOperator 時,您可以變更 startup_timeout_seconds 參數來增加逾時時間。

Pod 逾時時,Airflow UI 會顯示專屬於該工作記錄。例如:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

如果 Cloud Composer 服務帳戶缺少執行當前任務所需的 IAM 權限,也會發生 Pod 逾時。如要確認這項資訊,請使用 GKE 資訊主頁查看 Pod 層級錯誤,以便查看特定工作負載的記錄檔,或使用 Cloud Logging。

無法建立新連線

GKE 叢集預設會啟用自動升級功能。如果節點集區位於升級中的叢集中,您可能會看到以下錯誤:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

如要確認叢集是否正在升級,請在 Google Cloud 控制台中前往「Kubernetes clusters」(Kubernetes 叢集) 頁面,然後查看環境叢集名稱旁邊的載入圖示。

後續步驟