Usar KubernetesPodOperator

En esta página, se describe cómo usar KubernetesPodOperator para implementar Pods de Kubernetes de Cloud Composer a Google Kubernetes Engine clúster que es parte de tu entorno de Cloud Composer y garantizar de que tu entorno cuente con los recursos adecuados.

KubernetesPodOperator inicios Pods de Kubernetes en el clúster de tu entorno. En comparación, Los operadores de Google Kubernetes Engine ejecutan Pods de Kubernetes en un que puede ser un clúster independiente que no esté relacionado con tu en un entorno de nube. También puedes crear y borrar clústeres con operadores de Google Kubernetes Engine.

KubernetesPodOperator es una buena opción si necesitas lo siguiente:

  • Dependencias de Python personalizadas que no están disponibles a través del repositorio público de PyPI.
  • Dependencias binarias que no están disponibles en stock Imagen del trabajador de Cloud Composer.

En esta página, se explica un ejemplo de DAG de Airflow que incluye lo siguiente Parámetros de configuración de KubernetesPodOperator:

Antes de comenzar

  • En Cloud Composer 3, el clúster de tu entorno escala automáticamente. Cargas de trabajo adicionales que ejecutas con KubernetesPodOperator escalan de forma independiente de tu entorno. Tu entorno no se ve afectado por el aumento en la demanda de recursos, pero el clúster de tu entorno escala verticalmente según el recurso demanda. El precio de las cargas de trabajo adicionales que ejecutas en tu clúster sigue el Modelo de precios de Cloud Composer 3 y sus usos SKU de Cloud Composer 3.

  • En Cloud Composer 3, el clúster de tu entorno se encuentra en el proyecto de usuario. Sin embargo, KubernetesPodOperator funciona de la misma manera, sin tener que realizar cambios en el código en comparación con Cloud Composer 2. Los pods se ejecutan en el clúster del entorno, en un espacio de nombres aislado, pero con acceso a tu red de VPC (si está habilitada).

  • Cloud Composer 3 usa clústeres de GKE con Federación de identidades para cargas de trabajo para GKE. De forma predeterminada, los Pods que se ejecutan en un espacio de nombres recién creado o en composer-user-workloads no pueden acceder a los recursos de Google Cloud. Cuando se usa Workload Identity Federation para GKE, las cuentas de servicio de Kubernetes asociadas con espacios de nombres deben asignarse a cuentas de servicio de Google Cloud para habilitar la autorización de identidad de servicio para las solicitudes a las APIs de Google y otros servicios.

    Debido a esto, si ejecutas Pods en el espacio de nombres composer-user-workloads o un espacio de nombres recién creado en el clúster de tu entorno, se debe Vinculaciones de IAM entre Kubernetes y Google Cloud no se crean cuentas de servicio y estos Pods no pueden acceder a los recursos de tu proyecto de Google Cloud.

    Si deseas que los Pods tengan acceso a los recursos de Google Cloud, Luego, usa el espacio de nombres composer-user-workloads o crea uno propio el espacio de nombres, tal como se describe más adelante.

    Para proporcionar acceso a los recursos de tu proyecto, sigue las instrucciones de la Federación de identidades para cargas de trabajo para GKE y configura las vinculaciones:

    1. Crea un espacio de nombres independiente en el clúster de tu entorno.
    2. Crea una vinculación entre la cuenta de servicio de Kubernetes composer-user-workloads/<namespace_name> y la cuenta de servicio de tu entorno.
    3. Agrega la anotación de la cuenta de servicio de tu entorno a la biblioteca de Kubernetes cuenta de servicio.
    4. Cuando uses KubernetesPodOperator, especifica el espacio de nombres cuenta de servicio de Kubernetes en namespace y Parámetros service_account_name.
  • Cloud Composer 3 usa clústeres de GKE con Workload Identity. El servidor de metadatos de GKE toma unos segundos en comenzar a aceptar solicitudes en un Pod recién creado. Por lo tanto, los intentos de se autentican con Workload Identity en los primeros segundos de La vida del Pod podría fallar. Para obtener más información sobre esta limitación, consulta Restricciones de Workload Identity.

  • Cloud Composer 3 usa clústeres de Autopilot que presentan la noción de clases de procesamiento:

    • De forma predeterminada, si no se selecciona ninguna clase, la clase general-purpose se que se da por sentado cuando creas Pods con KubernetesPodOperator.

    • Cada clase se asocia con propiedades y límites de recursos específicos. Puedes leer sobre ellas en Documentación de Autopilot. Por ejemplo, los Pods que se ejecutan dentro de la clase general-purpose pueden usar hasta 110 GiB de memoria.

Configuración de KubernetesPodOperator

Para continuar con este ejemplo, coloca todo el archivo en la carpeta dags/ de tu entorno o agrega el código relevante KubernetesPodOperator a un DAG.

Las siguientes secciones explican cada configuración de KubernetesPodOperator en el ejemplo. Para obtener información sobre cada variable de configuración, consulta la referencia de Airflow.

"""Example DAG using KubernetesPodOperator."""
import datetime

from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
from kubernetes.client import models as k8s_models

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.
secret_env = Secret(
    # Expose the secret as environment variable.
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    # Name of the Kubernetes Secret
    # Key of a secret stored in this Secret object
secret_volume = Secret(
    # Path where we mount the secret as volume
    # Name of Kubernetes Secret
    # Key in the form of service account file name
# If you are running Airflow in more than one time zone
# see
# for best practices
YESTERDAY = - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, the config file found at
    # `/home/airflow/composer_kube_config` contains credentials for
    # Cloud Composer's Google Kubernetes Engine cluster that is created
    # upon environment creation.
    kubernetes_min_pod = KubernetesPodOperator(
        # The ID specified for the task.
        # Name of task you want to run, used to generate Pod ID.
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        # The namespace to run within Kubernetes. In Composer 2 environments
        # after December 2022, the default namespace is
        # `composer-user-workloads`.
        # Docker image specified. Defaults to, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # images if the Composer Environment is under the same
        # project-id as the images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        # Specifies path to kubernetes config. The config_file is templated.
        # Identifier of connection that should be used
    kubernetes_template_ex = KubernetesPodOperator(
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=["{{ ds }}"],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}",
        # Identifier of connection that should be used
    kubernetes_secret_vars_ex = KubernetesPodOperator(
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
            "EXAMPLE_VAR": "/example/value",
            "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
        # Specifies path to kubernetes config. The config_file is templated.
        # Identifier of connection that should be used
    kubernetes_full_pod = KubernetesPodOperator(
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        # Labels to apply to the Pod.
        labels={"pod-label": "label-name"},
        # Timeout to start up the Pod, default is 600.
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={"EXAMPLE_VAR": "/example/value"},
        # If true, logs stdout output of container. Defaults to True.
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={"key1": "value1"},
        # Optional resource specifications for Pod, this will allow you to
        # set both cpu and memory limits and requirements.
        # Prior to Airflow 2.3 and the cncf providers package 5.0.0
        # resources were passed as a dictionary. This change was made in
        # Additionally, "memory" and "cpu" were previously named
        # "limit_memory" and "limit_cpu"
        # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
            requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
            limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        # Specifies path to kubernetes config. The config_file is templated.
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        # List of Volume objects to pass to the Pod.
        # List of VolumeMount objects to pass to the Pod.
        # Identifier of connection that should be used
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # Pod affinity with the KubernetesPodOperator
        # is not supported with Composer 2
        # instead, create a cluster and use the GKEStartPodOperator

Configuración mínima

Para crear un KubernetesPodOperator, solo se requieren el name del Pod, el namespace donde ejecutarlo, el image que se usará y task_id.

Cuando colocas el siguiente fragmento de código en un DAG, la configuración utiliza los valores predeterminados en /home/airflow/composer_kube_config. No es necesario modificar el código para que la tarea pod-ex-minimum se realice correctamente.

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    # Name of task you want to run, used to generate Pod ID.
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`.
    # Docker image specified. Defaults to, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # images if the Composer Environment is under the same
    # project-id as the images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    # Specifies path to kubernetes config. The config_file is templated.
    # Identifier of connection that should be used

Configuración de la plantilla

Airflow admite el uso de plantillas de Jinja. Debes declarar las variables obligatorias (task_id, name, namespace y image) con el operador. Como se muestra en el siguiente ejemplo, puedes crear plantillas de todos los demás parámetros con Jinja, incluidos cmds, arguments, env_vars y config_file.

kubernetes_template_ex = KubernetesPodOperator(
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
    # Identifier of connection that should be used

Sin cambiar el DAG ni tu entorno, la tarea ex-kube-templates falla debido a dos errores. Los registros muestran que esta tarea está fallando porque la no existe la variable correspondiente (my_value). El segundo error, que después de corregir el primer error, muestra que la tarea falla porque core/kube_config no se encuentra en config.

Para corregir ambos errores, sigue los pasos que se describen a continuación.

Para configurar my_value con gcloud o la IU de Airflow:

IU de Airflow

En la IU de Airflow 2, haz lo siguiente:

  1. Ve a la IU de Airflow.

  2. En la barra de herramientas, selecciona Administrador > Variables.

  3. En la página Variable de lista, haz clic en Agregar un registro nuevo.

  4. En la página Agregar variable, ingresa la siguiente información:

    • Key: my_value
    • Val: example_value
  5. Haz clic en Guardar.


Para Airflow 2, ingresa el siguiente comando:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Reemplaza lo siguiente:

  • ENVIRONMENT por el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno

Para hacer referencia a un config_file personalizado (un archivo de configuración de Kubernetes), anula la opción de configuración de Airflow kube_config a una configuración de Kubernetes válida:

Sección Clave Valor
core kube_config /home/airflow/composer_kube_config

Espera unos minutos a que se actualice tu entorno. Luego, vuelve a ejecutar la tarea ex-kube-templates y verifica que la tarea ex-kube-templates se complete con éxito.

Configuración completa

En este ejemplo, se muestran todas las variables que puedes configurar en KubernetesPodOperator. No es necesario modificar el código para que la tarea ex-all-configs se realice correctamente.

Para obtener detalles sobre cada variable, consulta la referencia KubernetesPodOperator de Airflow.

kubernetes_full_pod = KubernetesPodOperator(
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    # Specifies path to kubernetes config. The config_file is templated.
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    # List of Volume objects to pass to the Pod.
    # List of VolumeMount objects to pass to the Pod.
    # Identifier of connection that should be used
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator

Información sobre el proveedor de Kubernetes para CNCF

GKEStartPodOperator y KubernetesPodOperator se implementan dentro del proveedor apache-airflow-providers-cncf-kubernetes.

Para conocer las notas de la versión fallidas del proveedor de Kubernetes para CNCF, consulta el sitio web del proveedor de Kubernetes para CNCF.

Versión 6.0.0

En la versión 6.0.0 del paquete del proveedor de Kubernetes de CNCF, la conexión kubernetes_default se usa de forma predeterminada en KubernetesPodOperator.

Si especificaste una conexión personalizada en la versión 5.0.0, el operador seguirá usándola. Para volver a usar kubernetes_default en la conexión, es posible que desees ajustar tus DAG según corresponda.

Versión 5.0.0

Esta versión incorpora algunos cambios incompatibles con versiones anteriores en comparación con la versión 4.4.0. Las más importantes se relacionan con la conexión kubernetes_default, que no se usa en la versión 5.0.0.

  • Se debe modificar la conexión kubernetes_default. Ruta de acceso de la configuración de Kube Debe establecerse en /home/airflow/composer_kube_config (como se muestra en la Figura 1). Como alternativa, se debe agregar config_file a la configuración de KubernetesPodOperator (como se muestra en el siguiente código) ejemplo).
Campo de ruta de configuración de Kube en la IU de Airflow
Figura 1. IU de Airflow, modificando la conexión kubernetes_default (haz clic para ampliar)
  • Modifica el código de una tarea con KubernetesPodOperator de la siguiente manera:
  # config_file parameter - can be skipped if connection contains this setting
  # definition of connection to be used by the operator

Para obtener más información sobre la versión 5.0.0, consulta las Notas de la versión del proveedor de Kubernetes de CNCF.

Soluciona problemas

Sugerencias para solucionar problemas de error de pod

Además de verificar los registros de tareas en la IU de Airflow, revisa también los siguientes registros:

  • El resultado del programador y los trabajadores de Airflow:

    1. En la consola de Google Cloud, ve a la página Entornos.

      Ir a Entornos

    2. Sigue el vínculo de los DAG de tu entorno.

    3. En el bucket de tu entorno, sube un nivel.

    4. Revisa los registros en la carpeta logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>.

  • Registros detallados de Pods en la consola de Google Cloud en las cargas de trabajo de GKE. Estos registros incluyen el archivo YAML de definición de pod, los eventos de los pods y sus detalles.

Códigos de retorno distintos de cero cuando también se usa el GKEStartPodOperator

Cuando se usa KubernetesPodOperator y GKEStartPodOperator, el código de retorno del punto de entrada del contenedor determina si la tarea se considera exitosa o no. Los códigos de retorno distintos de cero indican un error.

Un patrón común cuando se utiliza KubernetesPodOperator y GKEStartPodOperator es ejecutar una secuencia de comandos de shell como punto de entrada de contenedor para agrupar varias operaciones dentro de este.

Si escribes una secuencia de comandos de este tipo, recomendamos que incluyas el comando set -e en la parte superior de la secuencia de comandos para que sus comandos con error finalicen la secuencia y propaguen el error a la instancia de tarea de Airflow.

Tiempos de espera de los pods

El tiempo de espera predeterminado de KubernetesPodOperator es de 120 segundos, lo que puede provocar que el tiempo de espera se agote antes de que se descarguen las imágenes más grandes. Para aumentar el tiempo de espera, puedes modificar el parámetro startup_timeout_seconds cuando creas el KubernetesPodOperator.

Cuando se agota el tiempo de espera de un Pod, el registro específico de la tarea está disponible en la IU de Airflow. Por ejemplo:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/']
Event: pod-name-9a8e9d06 had an event of type Pending
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/", line 392, in run
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Los tiempos de espera de Pods también pueden ocurrir Cuenta de servicio de Cloud Composer carece de los permisos de IAM necesarios para realizar la tarea en mano. Para verificar esto, observa los errores a nivel del Pod con el Paneles de GKE para ver los registros de tu carga de trabajo específica o usa Cloud Logging.

No se pudo establecer una conexión nueva

La actualización automática está habilitada de forma predeterminada en los clústeres de GKE. Si un grupo de nodos está en un clúster que se está actualizando, es posible que veas el siguiente error:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

Para verificar si tu clúster se está actualizando, en la consola de Google Cloud, ve a Clústeres de Kubernetes y busca el ícono de carga junto nombre del clúster del entorno.

