Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
This page describes how to use the KubernetesPodOperator
to deploy
Kubernetes pods
from Cloud Composer into the Google Kubernetes Engine
cluster that is part of your Cloud Composer environment and to ensure
that your environment has the appropriate resources.
KubernetesPodOperator
launches
Kubernetes pods
in your environment's cluster. In comparison,
Google Kubernetes Engine operators run Kubernetes pods in a specified
cluster, which can be a separate cluster that is not related to your
environment. You can also create and delete clusters using
Google Kubernetes Engine operators.
The KubernetesPodOperator
is a good option if you require:
- Custom Python dependencies that are not available through the public PyPI repository.
- Binary dependencies that are not available in the stock Cloud Composer worker image.
This page walks you through an example Airflow DAG that includes the following
KubernetesPodOperator
configurations:
- Minimal configuration: Sets only the required parameters.
- Template configuration: Uses parameters that you can template with Jinja.
Secret variables configuration: Passes a Kubernetes Secret object to the pod.
In Cloud Composer 2, Pod affinity configuration is not available. Instead, use the GKE operators to launch pods in a different cluster.
Full configuration: Includes all configurations.
Before you begin
In Cloud Composer 2, your environment's cluster scales automatically. Extra workloads that you run using
KubernetesPodOperator
scale independently from your environment. Your environment is not affected by the increased resource demand, but your environment's cluster scales up and down depending on the resource demand. The pricing for the extra workloads that you run in your environment's cluster follows the Cloud Composer 2 pricing model and uses Cloud Composer Compute SKUs.Cloud Composer 2 uses GKE clusters with Workload Identity Federation for GKE. By default, Pods that run in a newly created namespaces or the
composer-user-workloads
namespace cannot access Google Cloud resources. When using Workload Identity Federation for GKE, Kubernetes service accounts associated with namespaces must be mapped to Google Cloud service accounts, to enable service identity authorization for requests to Google APIs and other services.Because of this, if you run Pods in the
composer-user-workloads
namespace or a newly created namespace in your environment's cluster, then proper IAM bindings between Kubernetes and Google Cloud service accounts are not created, and these Pods cannot access resources of your Google Cloud project.If you want your Pods to have access to Google Cloud resources, then use the
composer-user-workloads
namespace or create your own namespace as described further.To provide access to your project's resources, follow the guidance in Workload Identity Federation for GKE and set up the bindings:
- Create a separate namespace in your environment's cluster.
- Create a binding between the
composer-user-workloads/<namespace_name>
Kubernetes Service Account and your environment's service account. - Add your environment's service account annotation to the Kubernetes service account.
- When you use
KubernetesPodOperator
, specify the namespace and the Kubernetes service account in thenamespace
andservice_account_name
parameters.
Cloud Composer 2 uses GKE clusters with Workload Identity. The GKE metadata server takes a few seconds to start accepting requests on a newly created Pod. Therefore, attempts to authenticate using Workload Identity within the first few seconds of a Pod's life might fail. For more information about this limitation, see Restrictions of Workload Identity.
Cloud Composer 2 uses Autopilot clusters which introduce the notion of compute classes:
By default, if no class is selected then the
general-purpose
class is assumed when you create Pods usingKubernetesPodOperator
.Each class is associated with specific properties and resource limits, You can read about them in Autopilot documentation. For example, Pods that run within the
general-purpose
class can use up to 110 GiB of memory.
If version 5.0.0 of CNCF Kubernetes Provider is used then follow instructions documented CNCF Kubernetes Provider section.
KubernetesPodOperator configuration
To follow along with this example, put the entire kubernetes_pod_operator.py
file in your environment's dags/
folder or
add the relevant KubernetesPodOperator
code to a DAG.
The following sections explain each KubernetesPodOperator
configuration
in the example. For information about each configuration variable,
see the Airflow reference.
Minimal configuration
To create a KubernetesPodOperator
, only Pod's name
, namespace
where to
run the pod, image
to use, and task_id
are required.
When you place the following code snippet in a DAG, the configuration uses the
defaults in /home/airflow/composer_kube_config
. You don't need to modify the
code for the pod-ex-minimum
task to succeed.
Template configuration
Airflow supports using
Jinja Templating.
You must declare the required variables (task_id
, name
, namespace
,
and image
) with the operator. As shown in the following example, you can
template all other parameters with Jinja, including cmds
, arguments
,
env_vars
, and config_file
.
Without changing the DAG or your environment, the ex-kube-templates
task
fails because of two errors. The logs show this task is failing because the
appropriate variable does not exist (my_value
). The second error, which you
can get after fixing the first error, shows that the task fails because
core/kube_config
is not found in config
.
To fix both errors, follow the steps outlined further.
To set my_value
with gcloud
or the Airflow UI:
Airflow UI
In the Airflow 2 UI:
Go to the Airflow UI.
In the toolbar, select Admin > Variables.
On the List Variable page, click Add a new record.
On the Add Variable page, enter the following information:
- Key:
my_value
- Val:
example_value
- Key:
Click Save.
gcloud
For Airflow 2, enter the following command:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Replace:
ENVIRONMENT
with the name of the environment.LOCATION
with the region where the environment is located.
To refer to a custom config_file
(a Kubernetes configuration file),
override the kube_config
Airflow configuration option to a
valid Kubernetes configuration:
Section | Key | Value |
---|---|---|
core |
kube_config |
/home/airflow/composer_kube_config |
Wait a few minutes for your environment to update. Then
run the ex-kube-templates
task again and verify that the
ex-kube-templates
task succeeds.
Secret variables configuration
A Kubernetes secret
is an object that contains sensitive data. You can pass secrets to the
Kubernetes pods by using the KubernetesPodOperator
.
Secrets must be defined in Kubernetes, or the pod fails to launch.
This example shows two ways of using Kubernetes Secrets: as an environment variable, and as a volume mounted by the pod.
The first secret, airflow-secrets
, is set
to a Kubernetes environment variable named SQL_CONN
(as opposed to an
Airflow or Cloud Composer environment variable).
The second secret, service-account
, mounts service-account.json
, a file
with a service account token, to /var/secrets/google
.
Here's what the secrets look like:
The name of the first Kubernetes secret is defined in the secret
variable.
This particular secret is named airflow-secrets
. It is exposed as an
environment variable, as dictated by the deploy_type
. The environment
variable it sets to, deploy_target
, is SQL_CONN
. Finally, the key
of the
secret that is stored in the deploy_target
is sql_alchemy_conn
.
The name of the second Kubernetes secret is defined in the secret
variable.
This particular secret is named service-account
. It is exposed as an
volume, as dictated by the deploy_type
. The path of the file to mount,
deploy_target
, is /var/secrets/google
. Finally, the key
of the secret
that is stored in the deploy_target
is service-account.json
.
Here's what the operator configuration looks like:
Without making any changes to the DAG or your environment,
the ex-kube-secrets
task fails. If you look at the logs, the task fails because of
a Pod took too long to start
error. This error occurs because Airflow
cannot find the secret specified in the configuration, secret_env
.
gcloud
To set the secret using gcloud
:
Get information about your Cloud Composer environment cluster.
Run the following command:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
Replace:
ENVIRONMENT
with the name of your environment.LOCATION
with the region where the Cloud Composer environment is located.
The output of this command uses the following format:
projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>
.To get the GKE cluster ID, copy the output after
/clusters/
(ends in-gke
).
Connect to your GKE cluster by running the following command:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --region LOCATION
Replace:
CLUSTER_ID
with your GKE cluster ID.PROJECT
with the ID of your Google Cloud project.LOCATION
with the region where the Cloud Composer environment is located.
Create Kubernetes secrets.
Create a Kubernetes secret that sets the value of
sql_alchemy_conn
totest_value
by running the following command:kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
Create a Kubernetes secret that sets the value of
service-account.json
to a local path of a service account key file calledkey.json
by running the following command:kubectl create secret generic service-account \ --from-file service-account.json=./key.json -n composer-user-workloads
After you set the secrets, run the
ex-kube-secrets
task again in the Airflow UI.Verify the
ex-kube-secrets
task succeeds.
Full configuration
This example shows all the variables that you can configure in the
KubernetesPodOperator
. You don't need to modify the code for the
the ex-all-configs
task to succeed.
For details on each variable, see the
Airflow KubernetesPodOperator
reference.
Information about CNCF Kubernetes Provider
GKEStartPodOperator and KubernetesPodOperator are implemented within
apache-airflow-providers-cncf-kubernetes
provider.
For defailed release notes for CNCF Kubernetes provider refer to CNCF Kubernetes Provider website.
Version 6.0.0
In version 6.0.0 of the CNCF Kubernetes Provider package,
the kubernetes_default
connection is used by default in
the KubernetesPodOperator
.
If you specified a custom connection in version 5.0.0, this custom connection
is still used by the operator. To switch back to using the kubernetes_default
connection, you might want to adjust your DAGs accordingly.
Version 5.0.0
This version introduces a few backward incompatible changes
compared to version 4.4.0. The most important ones are related to
the kubernetes_default
connection which is not used in version 5.0.0.
- The
kubernetes_default
connection needs to be modified. Kube config path must be set to/home/airflow/composer_kube_config
(as shown in Figure 1) As an alternative,config_file
must to be added to theKubernetesPodOperator
configuration (as shown in the following code example).
- Modify the code of a task using KubernetesPodOperator in the following way:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
For more information about Version 5.0.0 refer to CNCF Kubernetes Provider Release Notes
Troubleshooting
Tips for troubleshooting Pod failures
In addition to checking the task logs in the Airflow UI, also check the following logs:
Output of the Airflow scheduler and workers:
In the Google Cloud console, go to the Environments page.
Follow the DAGs link for your environment.
In the bucket of your environment, go up one level.
Review the logs in the
logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>
folder.
Detailed pod logs in the Google Cloud console under GKE workloads. These logs include the pod definition YAML file, pod events, and pod details.
Non-zero return codes when also using the GKEStartPodOperator
When using the KubernetesPodOperator
and the GKEStartPodOperator
, the
return code of the container's entrypoint determines whether the task is
considered successful or not. Non-zero return codes indicate failure.
A common pattern when using the KubernetesPodOperator
and
the GKEStartPodOperator
is to execute a shell script as the container
entrypoint to group together multiple operations within the container.
If you are writing such a script, we recommended that you include
the set -e
command at the top of the script
so that failed commands in the script terminate the script and
propagate the failure to the Airflow task instance.
Pod timeouts
The default timeout for KubernetesPodOperator
is 120 seconds, which
can result in timeouts occurring before larger images download. You can
increase the timeout by altering the startup_timeout_seconds
parameter when you create the KubernetesPodOperator
.
When a pod times out, the task specific log is available in the Airflow UI. For example:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Pod Timeouts can also occur when the Cloud Composer Service Account lacks the necessary IAM permissions to perform the task at hand. To verify this, look at pod-level errors using the GKE Dashboards to look at the logs for your particular Workload, or use Cloud Logging.
Failed to establish a new connection
Auto-upgrade is enabled by default in GKE clusters. If a node pool is in a cluster that is upgrading, you might see the following error:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
To check if your cluster is upgrading, in Google Cloud console, go to the Kubernetes clusters page and look for the loading icon next to your environment's cluster name.