使用 KubernetesPodOperator

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁面說明如何使用 KubernetesPodOperator,將 Cloud Composer 中的 Kubernetes Pods 部署至 Google Kubernetes Engine 叢集。

KubernetesPodOperator 會在環境叢集中啟動 Kubernetes Pods。相比之下,Google Kubernetes Engine 操作員會在指定叢集中執行 Kubernetes Pod,該叢集可以是與環境無關的獨立叢集。您也可以使用 Google Kubernetes Engine 運算子建立及刪除叢集。

如果您需要下列功能,KubernetesPodOperator 會是理想選擇:

  • 無法透過公開 PyPI 存放區取得的自訂 Python 依附元件。
  • 在原始 Cloud Composer 工作站映像檔中無法使用的二進位依附元件。

事前準備

請查看以下清單,瞭解 Cloud Composer 3 和 Cloud Composer 2 中 KubernetesPodOperator 的差異,並確保 DAG 相容:

  • 您無法在 Cloud Composer 3 中建立自訂命名空間。即使指定其他命名空間,Pod 仍會一律在 composer-user-workloads 命名空間中執行。這個命名空間中的 Pod 不需要額外設定,即可存取專案的資源和 VPC 網路 (如果已啟用)。

  • 在 Cloud Composer 3 中,無法執行多個額外的附加容器。如果 sidecar 容器名稱為 airflow-xcom-sidecar,您可以執行單一 sidecar 容器。

  • 無法使用 Kubernetes API 建立 Kubernetes Secret 和 ConfigMap。相反地,Cloud Composer 提供 Google Cloud CLI 指令、Terraform 資源和 Cloud Composer API,用於管理 Kubernetes 機密資料和 ConfigMap。詳情請參閱「使用 Kubernetes Secret 和 ConfigMap」。

  • 您無法在 Cloud Composer 3 中部署自訂工作負載。您只能修改 Kubernetes Secret 和 ConfigMap,但無法變更其他設定。

  • 您必須使用支援的值指定資源需求 (CPU、記憶體和儲存空間)。

  • 與 Cloud Composer 2 相同,您無法設定 Pod 親和性。如果您想使用 Pod 相依性,請改用 GKE 運算子,在其他叢集中啟動 Pod。

關於 Cloud Composer 3 中的 KubernetesPodOperator

本節說明 KubernetesPodOperator 在 Cloud Composer 3 中的運作方式。

資源使用情況

在 Cloud Composer 3 中,環境的叢集會自動調整資源配置。您使用 KubernetesPodOperator 執行的額外工作負載,會獨立於環境進行擴充。環境不會受到資源需求增加的影響,但環境的叢集會視資源需求而調整。

在環境叢集中執行的額外工作負載,其定價會採用 Cloud Composer 3 定價模式,並使用 Cloud Composer 3 SKU。

Cloud Composer 3 使用 Autopilot 叢集,其中引入了運算類別的概念:

  • Cloud Composer 僅支援 general-purpose 運算類別。

  • 根據預設,如果您未選取任何類別,則在使用 KubernetesPodOperator 建立 Pod 時,系統會假設為 general-purpose 類別。

  • 每個類別都與特定屬性和資源限制相關聯,您可以參閱 Autopilot 說明文件瞭解相關資訊。舉例來說,在 general-purpose 類別中執行的 Pod 最多可使用 110 GiB 記憶體。

存取專案資源

在 Cloud Composer 3 中,環境的叢集位於用戶群專案中,無法進行設定。Pod 會在環境叢集中的隔離命名空間中執行。

在 Cloud Composer 3 中,即使指定不同的命名空間,Pod 一律會在 composer-user-workloads 命名空間中執行。這個命名空間中的 Pod 可以存取 Google Cloud專案中的資源和虛擬私有雲網路 (如果已啟用),無須額外設定。環境的服務帳戶會用於存取這些資源。無法指定其他服務帳戶。

希望將設定需求減至最低

如要建立 KubernetesPodOperator,只需要 Pod 的 name、要使用的 image,以及 task_id 參數。/home/airflow/composer_kube_config 包含用於向 GKE 進行驗證的憑證。

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`. Always use the
    # `composer-user-workloads` namespace with Composer 3.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

額外設定

本例顯示可在 KubernetesPodOperator 中設定的其他參數。

詳情請參閱下列資源:

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The Docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container.
    # The env_vars parameter is templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

使用 Jinja 範本

Airflow 支援 DAG 中的 Jinja 範本

您必須使用運算子宣告必要的 Airflow 參數 (task_idnameimage)。如以下範例所示,您可以使用 Jinja 建立所有其他參數的範本,包括 cmdsargumentsenv_varsconfig_file

範例中的 env_vars 參數是從名為 my_valueAirflow 變數設定。範例 DAG 會從 Airflow 中的 vars 範本變數取得值。Airflow 提供更多變數,可存取不同類型的資訊。舉例來說,您可以使用 conf 範本變數存取 Airflow 設定選項的值。如需詳細資訊和 Airflow 中可用的變數清單,請參閱 Airflow 說明文件中的「範本參考資料」。

如果不變更 DAG 或建立 env_vars 變數,範例中的 ex-kube-templates 工作會失敗,因為變數不存在。您可以在 Airflow 使用者介面或 Google Cloud CLI 中建立這個變數:

Airflow UI

  1. 前往 Airflow UI

  2. 在工具列中,依序選取「管理」>「變數」

  3. 在「清單變數」頁面中,按一下「新增記錄」

  4. 在「新增變數」頁面中輸入下列資訊:

    • 鍵:my_value
    • Val:example_value
  5. 按一下 [儲存]

gcloud

輸入下列指令:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

取代:

  • ENVIRONMENT 替換為環境的名稱。
  • LOCATION 改成環境所在的地區。

以下範例說明如何搭配 KubernetesPodOperator 使用 Jinja 範本:

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below can be templated with Jinja. For more information
    # and the list of variables available in Airflow, see
    # the Airflow templates reference:
    # https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in Jinja is the execution date as YYYY-MM-DD, this Docker image
    # will echo the execution date. Arguments to the entrypoint. The Docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI. The env_vars parameter
    # is templated.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Specifies path to Kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

使用 Kubernetes Secret 和 ConfigMap

Kubernetes Secret 是包含機密資料的物件。Kubernetes ConfigMap 是物件,可在鍵/值組合中包含非機密資料。

在 Cloud Composer 3 中,您可以使用 Google Cloud CLI、API 或 Terraform 建立機密資料和 ConfigMap,然後透過 KubernetesPodOperator 存取:

  • 使用 Google Cloud CLI 和 API 時,您必須提供 YAML 設定檔。
  • 在 Terraform 中,您可以將 Secret 和 ConfigMap 定義為 Terraform 設定檔中的個別資源。

關於 YAML 設定檔

使用 Google Cloud CLI 和 API 建立 Kubernetes 密鑰或 ConfigMap 時,您必須提供 YAML 格式的檔案。這個檔案必須遵循 Kubernetes 密鑰和 ConfigMap 使用的格式。Kubernetes 說明文件提供許多 ConfigMap 和 Secret 的程式碼範例。如要開始使用,請參閱「使用 Secret 安全地發布憑證」頁面和 ConfigMaps

與 Kubernetes 密鑰相同,在密鑰中定義值時,請使用 Base64 表示法。

如要編碼值,您可以使用下列指令 (這是取得 Base64 編碼值的眾多方法之一):

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

輸出:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

以下兩個 YAML 檔案範例會在本指南後續的範例中使用。Kubernetes 機密資料的 YAML 設定檔範例:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

以下是另一個示例,說明如何納入檔案。與前一個範例相同,請先對檔案內容進行編碼 (cat ./key.json | base64),然後在 YAML 檔案中提供這個值:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

ConfigMap 的 YAML 設定檔範例。您不需要在 ConfigMap 中使用 base64 表示法:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

管理 Kubernetes 密鑰

gcloud

建立 Secret

如要建立 Kubernetes 密鑰,請執行下列指令:

gcloud beta composer environments user-workloads-secrets create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

更改下列內容:

  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的地區。
  • SECRET_FILE:包含 Secret 設定的本機 YAML 檔案路徑。

範例:

gcloud beta composer environments user-workloads-secrets create \
  --environment example-environment \
  --location us-central1 \
  --secret-file-path ./secrets/example-secret.yaml

更新密鑰

如要更新 Kubernetes 密鑰,請執行下列指令。系統會從指定的 YAML 檔案取得密鑰名稱,並取代密鑰內容。

gcloud beta composer environments user-workloads-secrets update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

更改下列內容:

  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的地區。
  • SECRET_FILE:包含 Secret 設定的本機 YAML 檔案路徑。在這個檔案的 metadata > name 欄位中指定密鑰名稱。

列出 Secret

如要取得環境的 Secret 清單及其欄位,請執行下列指令。輸出內容中的鍵值會替換成星號。

gcloud beta composer environments user-workloads-secrets list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

更改下列內容:

  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的地區。

取得 Secret 的詳細資料

如要取得 Secret 的詳細資訊,請執行下列指令。輸出內容中的鍵值會替換成星號。

gcloud beta composer environments user-workloads-secrets describe \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

更改下列內容:

  • SECRET_NAME:密鑰的名稱,如同 YAML 檔案中 metadata > name 欄位所定義的密鑰設定。
  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的地區。

刪除密鑰

如要刪除 Secret,請執行下列指令:

gcloud beta composer environments user-workloads-secrets delete \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • SECRET_NAME:密鑰的名稱,如同 YAML 檔案中 metadata > name 欄位所定義的密鑰設定。
  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的地區。

API

建立 Secret

  1. 建立 environments.userWorkloadsSecrets.create API 要求。

  2. 在這個要求中:

    1. 在要求主體的 name 欄位中,指定新機密的 URI。
    2. 在要求主體的 data 欄位中,指定 Secret 的鍵和 base64 編碼值。

範例:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo="
  }
}

更新密鑰

  1. 建立 environments.userWorkloadsSecrets.update API 要求。

  2. 在這個要求中:

    1. 在要求主體的 name 欄位中,指定 Secret 的 URI。
    2. 在要求主體的 data 欄位中,指定 Secret 的鍵和 base64 編碼值。系統會替換這些值。

範例:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo=",
    "another-example": "YW5vdGhlcl9leGFtcGxlX3ZhbHVlIC1uCg=="
  }
}

列出 Secret

建立 environments.userWorkloadsSecrets.list API 要求。輸出內容中的鍵值會替換成星號。您可以使用分頁功能搭配此要求,請參閱要求的參考資料瞭解詳情。

範例:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

取得 Secret 的詳細資料

建立 environments.userWorkloadsSecrets.get API 要求。輸出內容中的鍵值會替換成星號。

範例:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

刪除密鑰

建立 environments.userWorkloadsSecrets.delete API 要求。

範例:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Terraform

google_composer_user_workloads_secret 資源會定義 Kubernetes 密鑰,其中鍵和值會在 data 區塊中定義。

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "SECRET_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME:環境資源的名稱,其中包含 Terraform 中的環境定義。這個資源也會指定實際環境的名稱。
  • LOCATION:環境所在的地區。
  • SECRET_NAME:Secret 的名稱。
  • KEY_NAME:此 Secret 的一或多個鍵。
  • KEY_VALUE:金鑰的 base64 編碼值。您可以使用 base64encode 函式對值進行編碼 (請參閱範例)。

以下兩個 Kubernetes 密鑰範例會在本指南後續的範例中使用。

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta

  name = "airflow-secrets"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    sql_alchemy_conn: base64encode("postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db")
  }
}

以下是另一個示例,說明如何納入檔案。您可以使用 file 函式以字串形式讀取檔案內容,然後以 Base64 編碼:

resource "google_composer_user_workloads_secret" "service_account_secret" {
  provider = google-beta

  name = "service-account"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "service-account.json": base64encode(file("./key.json"))
  }
}

在 DAG 中使用 Kubernetes Secret

本範例說明使用 Kubernetes Secret 的兩種方式:做為環境變數,以及做為 Pod 掛載的磁碟區。

第一個密鑰 airflow-secrets 會設為名為 SQL_CONN 的 Kubernetes 環境變數 (而非 Airflow 或 Cloud Composer 環境變數)。

第二個密鑰 service-account 會將 service-account.json 掛載至 /var/secrets/googleservice-account.json 是含有服務帳戶權杖的檔案。

密鑰物件如下所示:

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

第一個 Kubernetes Secret 的名稱會在 secret_env 變數中定義。這個 Secret 的名稱為 airflow-secretsdeploy_type 參數會指定該參數必須以環境變數形式公開。環境變數的名稱為 SQL_CONN,如 deploy_target 參數所指定。最後,將 SQL_CONN 環境變數的值設為 sql_alchemy_conn 鍵的值。

secret_volume 變數中定義第二個 Kubernetes 密鑰的名稱。這個 Secret 的名稱為 service-account。系統會將其公開為音量,如 deploy_type 參數所指定。要掛載的檔案路徑 deploy_target/var/secrets/google。最後,儲存在 deploy_target 中的 Secret 的 keyservice-account.json

運算子設定如下所示:

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. The env_vars parameter is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

管理 Kubernetes ConfigMap

gcloud

建立 ConfigMap

如要建立 ConfigMap,請執行下列指令:

gcloud beta composer environments user-workloads-config-maps create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

更改下列內容:

  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的地區。
  • CONFIG_MAP_FILE:包含 ConfigMap 設定的本機 YAML 檔案路徑。

範例:

gcloud beta composer environments user-workloads-config-maps create \
  --environment example-environment \
  --location us-central1 \
  --config-map-file-path ./configs/example-configmap.yaml

更新 ConfigMap

如要更新 ConfigMap,請執行下列指令。ConfigMap 的名稱會從指定的 YAML 檔案中取得,並取代 ConfigMap 的內容。

gcloud beta composer environments user-workloads-config-maps update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

更改下列內容:

  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的地區。
  • CONFIG_MAP_FILE:包含 ConfigMap 設定的本機 YAML 檔案路徑。在這個檔案的 metadata > name 欄位中指定 ConfigMap 的名稱。

列出 ConfigMap

如要取得環境的 ConfigMap 清單及其欄位,請執行下列指令。輸出內容中的鍵值會原封不動地顯示。

gcloud beta composer environments user-workloads-config-maps list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

更改下列內容:

  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的地區。

取得 ConfigMap 的詳細資料

如要取得 ConfigMap 的詳細資訊,請執行下列指令。輸出內容中的鍵值會原封不動地顯示。

gcloud beta composer environments user-workloads-config-maps describe \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

更改下列內容:

  • CONFIG_MAP_NAME:ConfigMap 的名稱,如同 YAML 檔案中 metadata > name 欄位所定義的 ConfigMap 設定。
  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的地區。

刪除 ConfigMap

如要刪除 ConfigMap,請執行下列指令:

gcloud beta composer environments user-workloads-config-maps delete \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • CONFIG_MAP_NAME:ConfigMap 的名稱,如同 YAML 檔案中 metadata > name 欄位所定義的 ConfigMap 設定。
  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的地區。

API

建立 ConfigMap

  1. 建立 environments.userWorkloadsConfigMaps.create API 要求。

  2. 在這個要求中:

    1. 在要求主體的 name 欄位中,指定新 ConfigMap 的 URI。
    2. 在要求主體的 data 欄位中,指定 ConfigMap 的鍵和值。

範例:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value"
  }
}

更新 ConfigMap

  1. 建立 environments.userWorkloadsConfigMaps.update API 要求。

  2. 在這個要求中:

    1. 在要求主體的 name 欄位中,指定 ConfigMap 的 URI。
    2. 在要求主體的 data 欄位中,指定 ConfigMap 的鍵和值。系統會替換這些值。

範例:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value",
    "another_key": "another_value"
  }
}

列出 ConfigMap

建立 environments.userWorkloadsConfigMaps.list API 要求。輸出內容中的鍵值會原封不動地顯示。您可以使用分頁功能搭配此要求,請參閱要求的參考資料瞭解詳情。

範例:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

取得 ConfigMap 詳細資料

建立 environments.userWorkloadsConfigMaps.get API 要求。輸出內容中的鍵值會原封不動地顯示。

範例:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

刪除 ConfigMap

建立 environments.userWorkloadsConfigMaps.delete API 要求。

範例:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Terraform

google_composer_user_workloads_config_map 資源定義 ConfigMap,其中鍵和值皆在 data 區塊中定義。

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "CONFIG_MAP_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME:環境資源的名稱,其中包含 Terraform 中的環境定義。這個資源也會指定實際環境的名稱。
  • LOCATION:環境所在的地區。
  • CONFIG_MAP_NAME:ConfigMap 的名稱。
  • KEY_NAME:這個 ConfigMap 的一或多個鍵。
  • KEY_VALUE:鍵的值。

範例:

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta

  name = "example-config-map"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "example_key": "example_value"
  }
}

在 DAG 中使用 ConfigMap

本範例說明如何在 DAG 中使用 ConfigMap。

在以下範例中,系統會在 configmaps 參數中傳遞 ConfigMap。這個 ConfigMap 的所有鍵都可以做為環境變數使用:

import datetime

from airflow import models
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_env_vars',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'echo "Value: $example_key"',
    ],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

以下範例說明如何將 ConfigMap 掛載為磁碟區:

import datetime

from airflow import models
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

volume_mount = k8s.V1VolumeMount(name='confmap-example',
  mount_path='/config',
  sub_path=None,
  read_only=False)

volume = k8s.V1Volume(name='confmap-example',
  config_map=k8s.V1ConfigMapVolumeSource(name='example-configmap'))

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_volume_mount',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'ls /config'
    ],
    volumes=[volume],
    volume_mounts=[volume_mount],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

CNCF Kubernetes 供應商相關資訊

KubernetesPodOperator 是在 apache-airflow-providers-cncf-kubernetes 供應器中實作。

如要查看 CNCF Kubernetes 供應商的詳細版本資訊,請前往 CNCF Kubernetes 供應商網站

資源需求

Cloud Composer 3 支援下列資源需求值。如需資源需求的使用範例,請參閱「其他設定」。

資源 下限 上限 步驟
CPU 0.25 32 間距值:0.25、0.5、1、2、4、6、8、10、...、32。系統會將要求的值無條件進位至最接近的支援步驟值 (例如 5 到 6)。
記憶體 2G (GB) 128G (GB) 步長值:2、3、4、5、...、128。系統會將要求的值無條件進位至最接近的支援步數值 (例如 3.5G 到 4G)。
儲存空間 - 100G (GB) 任何值。如果要求的容量超過 100 GB,系統只會提供 100 GB。

如要進一步瞭解 Kubernetes 中的資源單位,請參閱「Kubernetes 中的資源單位」。

疑難排解

本節提供常見 KubernetesPodOperator 問題的疑難排解建議:

查看記錄

排解問題時,您可以按照以下順序查看記錄:

  1. Airflow 工作記錄:

    1. 前往 Google Cloud 控制台的「Environments」頁面。

      前往「環境」

    2. 在環境清單中,按一下環境名稱。「環境詳細資料」頁面隨即開啟。

    3. 前往「DAG」分頁。

    4. 按一下 DAG 名稱,然後點選 DAG 執行作業,即可查看詳細資料和記錄。

  2. Airflow 排程器記錄:

    1. 前往「環境詳細資料」頁面。

    2. 前往「Logs」分頁。

    3. 檢查 Airflow 排程器記錄。

  3. 使用者工作負載記錄:

    1. 前往「環境詳細資料」頁面。

    2. 前往「監控」分頁。

    3. 選取「使用者工作負載」

    4. 檢查已執行的工作負載清單。您可以查看每項工作負載的記錄和資源使用率資訊。

非零值的傳回代碼

使用 KubernetesPodOperator (和 GKEStartPodOperator) 時,容器進入點的傳回代碼會決定工作是否視為成功。非零值的傳回代碼表示失敗。

常見的模式是執行殼層指令碼做為容器進入點,以便將容器內的多項作業分組。

如果您要編寫這類指令碼,建議您在指令碼頂端加入 set -e 指令,以便指令碼中的失敗指令終止指令碼,並將失敗情形傳播至 Airflow 工作例項。

Pod 逾時

KubernetesPodOperator 的預設逾時時間為 120 秒,可能會導致在下載較大圖片之前發生逾時。建立 KubernetesPodOperator 時,您可以變更 startup_timeout_seconds 參數來增加逾時時間。

Pod 逾時時,Airflow UI 會顯示專屬於該工作記錄。例如:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

如果 Cloud Composer 服務帳戶缺少執行當前任務所需的 IAM 權限,也會發生 Pod 逾時。如要確認這項資訊,請使用 GKE 資訊主頁查看 Pod 層級錯誤,以便查看特定工作負載的記錄檔,或使用 Cloud Logging。

執行大量工作時,KubernetesPodOperator 工作會失敗

當環境同時執行大量 KubernetesPodOperator 或 KubernetesExecutor 工作時,Cloud Composer 3 會等到部分現有工作完成後,才接受新工作。

如要進一步瞭解如何排解這個問題,請參閱「排解 KubernetesExecutor 工作問題」。

後續步驟