Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Esta página descreve como usar o KubernetesPodOperator para implantar pods do Kubernetes do Cloud Composer no cluster do Google Kubernetes Engine que faz parte do seu ambiente do Cloud Composer.
O KubernetesPodOperator inicia pods do Kubernetes no cluster do seu ambiente. Os operadores do Google Kubernetes Engine executam pods do Kubernetes em um cluster especificado, que pode ser um cluster separado não relacionado ao seu ambiente. Também é possível criar e excluir clusters usando os operadores do Google Kubernetes Engine.
O KubernetesPodOperator é uma boa opção se você precisar de:
- Dependências personalizadas do Python que não estão disponíveis no repositório PyPI público.
- Dependências binárias que não estão disponíveis na imagem do worker do Cloud Composer.
Antes de começar
Se a versão 5.0.0 do provedor do Kubernetes da CNCF for usada, siga as instruções documentadas na seção do provedor do Kubernetes da CNCF.
A configuração de afinidade de pod não está disponível no Cloud Composer 2. Se você quiser usar a afinidade de pods, use os operadores do GKE para iniciar pods em um cluster diferente.
Sobre o KubernetesPodOperator no Cloud Composer 2
Nesta seção, descrevemos como o KubernetesPodOperator funciona no Cloud Composer 2.
Uso de recursos
No Cloud Composer 2, o cluster do ambiente é escalonado automaticamente. As cargas de trabalho extras que você executa usando o KubernetesPodOperator são escalonadas independentemente do ambiente.
Seu ambiente não é afetado pelo aumento da demanda de recursos, mas o cluster do ambiente aumenta ou diminui dependendo da demanda de recursos.
Os preços das cargas de trabalho extras executadas no cluster do ambiente seguem o modelo de preços do Cloud Composer 2 e usam as SKUs do Cloud Composer Compute.
O Cloud Composer 2 usa clusters do Autopilot, que introduzem a noção de classes de computação:
O Cloud Composer é compatível apenas com a classe de computação
general-purpose
.Por padrão, se nenhuma classe for selecionada, a classe
general-purpose
será assumida quando você criar pods usando o KubernetesPodOperator.Cada classe está associada a propriedades e limites de recursos específicos. Leia sobre eles na documentação do Autopilot. Por exemplo, os pods executados na classe
general-purpose
podem usar até 110 GiB de memória.
Acesso aos recursos do projeto
O Cloud Composer 2 usa clusters do GKE com a
federação de identidade da carga de trabalho para o GKE. Os pods executados no namespace composer-user-workloads
podem acessar recursos Google Cloud no projeto sem
configuração adicional. A conta de serviço do seu ambiente
é usada para acessar esses recursos.
Se você quiser usar um namespace personalizado, as contas de serviço do Kubernetes associadas a ele precisam ser mapeadas para contas de serviço Google Cloud , para ativar a autorização de identidade de serviço para solicitações às APIs do Google e outros serviços. Se você executar pods em um namespace personalizado no cluster do ambiente, as vinculações do IAM entre o Kubernetes e as contas de serviçoGoogle Cloud não serão criadas, e esses pods não poderão acessar os recursos do projeto Google Cloud .
Se você usar um namespace personalizado e quiser que seus pods tenham acesso aos recursos doGoogle Cloud , siga as orientações da federação de identidade da carga de trabalho para o GKE e configure as vinculações para um namespace personalizado:
- Crie um namespace separado no cluster do ambiente.
- Crie uma vinculação entre a conta de serviço do Kubernetes do namespace personalizado e a conta de serviço do seu ambiente.
- Adicione a anotação da conta de serviço do seu ambiente à conta de serviço do Kubernetes.
- Ao usar o KubernetesPodOperator, especifique o namespace e a
conta de serviço do Kubernetes nos parâmetros
namespace
eservice_account_name
.
Configuração mínima
Para criar um KubernetesPodOperator, somente os parâmetros name
, image
e
task_id
do pod são obrigatórios. O /home/airflow/composer_kube_config
contém credenciais para autenticação no GKE.
Configurações avançadas
Este exemplo mostra outros parâmetros que podem ser configurados no KubernetesPodOperator.
Consulte os seguintes recursos para mais informações:
Para informações sobre como usar Secrets e ConfigMaps do Kubernetes, consulte Usar Secrets e ConfigMaps do Kubernetes.
Para informações sobre como usar modelos Jinja com o KubernetesPodOperator, consulte Usar modelos Jinja.
Para informações sobre os parâmetros do KubernetesPodOperator, consulte a referência do operador na documentação do Airflow.
Usar modelos Jinja
O Airflow é compatível com modelos Jinja em DAGs.
Você precisa declarar os parâmetros obrigatórios do Airflow (task_id
, name
e
image
) com o operador. Como mostra o exemplo a seguir,
é possível criar modelos de todos os outros parâmetros com o Jinja, incluindo cmds
,
arguments
, env_vars
e config_file
.
O parâmetro env_vars
no exemplo é definido com base em uma
variável do Airflow chamada my_value
. O DAG de exemplo
recebe o valor da variável de modelo vars
no Airflow. O Airflow tem mais variáveis que fornecem acesso a diferentes tipos de informações. Por exemplo, use a variável de modelo conf
para acessar valores de opções de configuração do Airflow. Para mais informações e a lista de variáveis disponíveis no Airflow, consulte a Referência de modelos na documentação do Airflow.
Sem mudar o DAG ou criar a variável env_vars
, a tarefa
ex-kube-templates
no exemplo falha porque a variável não
existe. Crie essa variável na interface do Airflow ou com a Google Cloud CLI:
IU do Airflow
Acesse a IU do Airflow.
Na barra de ferramentas, selecione Administrador > Variáveis.
Na página Listar variáveis, clique em Adicionar um novo registro.
Na página Adicionar variável, digite as seguintes informações:
- Chave:
my_value
- Val:
example_value
- Chave:
Clique em Save.
gcloud
Digite este comando:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
Substitua:
ENVIRONMENT
pelo nome do ambienteLOCATION
pela região em que o ambiente está localizado;
O exemplo a seguir demonstra como usar modelos Jinja com KubernetesPodOperator:
Usar secrets e ConfigMaps do Kubernetes
Um secret do Kubernetes é um objeto que contém dados sensíveis. Um ConfigMap do Kubernetes é um objeto que contém dados não confidenciais em pares de chave-valor.
No Cloud Composer 2, é possível criar Secrets e ConfigMaps usando Google Cloud CLI, a API ou o Terraform e, em seguida, acessar esses recursos do KubernetesPodOperator.
Sobre arquivos de configuração YAML
Ao criar um secret do Kubernetes ou um ConfigMap usando a Google Cloud CLI e a API, você fornece um arquivo no formato YAML. Esse arquivo precisa seguir o mesmo formato usado por Secrets e ConfigMaps do Kubernetes. A documentação do Kubernetes fornece muitos exemplos de código de ConfigMaps e Secrets. Para começar, consulte as páginas Distribuir credenciais com segurança usando secrets e ConfigMaps.
Assim como nos secrets do Kubernetes, use a representação base64 ao definir valores em secrets.
Para codificar um valor, use o seguinte comando (esta é uma das muitas maneiras de receber um valor codificado em base64):
echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64
Saída:
cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
Os dois exemplos de arquivos YAML a seguir são usados em exemplos mais adiante neste guia. Exemplo de arquivo de configuração YAML para um secret do Kubernetes:
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
data:
sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
Outro exemplo que demonstra como incluir arquivos. Assim como no exemplo
anterior, primeiro codifique o conteúdo de um arquivo (cat ./key.json | base64
) e
forneça esse valor no arquivo YAML:
apiVersion: v1
kind: Secret
metadata:
name: service-account
data:
service-account.json: |
ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K
Um exemplo de arquivo de configuração YAML para um ConfigMap. Não é necessário usar a representação base64 em ConfigMaps:
apiVersion: v1
kind: ConfigMap
metadata:
name: example-configmap
data:
example_key: example_value
Gerenciar secrets do Kubernetes
No Cloud Composer 2, você cria secrets usando a Google Cloud CLI e kubectl
:
Receber informações sobre o cluster do ambiente:
Execute este comando:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
Substitua:
ENVIRONMENT
pelo nome do ambiente.LOCATION
é a região em que o ambiente do Cloud Composer está localizado.
A saída desse comando usa o seguinte formato:
projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>
.Para receber o ID do cluster do GKE, copie a saída depois de
/clusters/
(termina em-gke
).
Conecte-se ao cluster do GKE com o seguinte comando:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --region LOCATION
Substitua:
CLUSTER_ID
: o ID do cluster do ambiente.PROJECT_ID
: o ID do projeto.LOCATION
: a região em que o ambiente está localizado.
Crie secrets do Kubernetes:
Os comandos a seguir demonstram duas abordagens diferentes para criar secrets do Kubernetes. A abordagem
--from-literal
usa pares de chave-valor. A abordagem--from-file
usa o conteúdo do arquivo.Para criar um secret do Kubernetes fornecendo pares de chave-valor, execute o comando a seguir. Este exemplo cria um secret chamado
airflow-secrets
que tem um camposql_alchemy_conn
com o valortest_value
.kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
Para criar um secret do Kubernetes fornecendo o conteúdo do arquivo, execute o comando a seguir. Este exemplo cria um Secret chamado
service-account
que tem o camposervice-account.json
com o valor extraído do conteúdo de um arquivo./key.json
local.kubectl create secret generic service-account \ --from-file service-account.json=./key.json -n composer-user-workloads
Usar secrets do Kubernetes nos seus DAGs
Neste exemplo, mostramos duas maneiras de usar os Kubernetes Secrets: como uma variável de ambiente e como um volume ativado pelo pod.
O primeiro secret, airflow-secrets
, é definido
como uma variável de ambiente do Kubernetes chamada SQL_CONN
(em vez de uma variável de ambiente do Airflow
ou do Cloud Composer).
O segundo secret, service-account
, instala service-account.json
, um arquivo
com um token de conta de serviço, em /var/secrets/google
.
Veja como são os objetos Secret:
O nome do primeiro secret do Kubernetes é definido na variável secret_env
.
Esse secret é chamado de airflow-secrets
. O parâmetro deploy_type
especifica que ele precisa ser exposto como uma variável de ambiente. O nome da variável de ambiente é SQL_CONN
, conforme especificado no parâmetro deploy_target
. Por fim, o valor da variável de ambiente SQL_CONN
é definido como o valor da chave sql_alchemy_conn
.
O nome do segundo secret do Kubernetes é definido na variável secret_volume
. Esse secret é chamado de service-account
. Ele é exposto como um
volume, conforme especificado no parâmetro deploy_type
. O caminho do arquivo a ser
ativado, deploy_target
, é /var/secrets/google
. Por fim, o key
do
secret armazenado no deploy_target
é service-account.json
.
Veja como é a configuração do operador:
Informações sobre o provedor do Kubernetes da CNCF
O KubernetesPodOperator é implementado no provedor
apache-airflow-providers-cncf-kubernetes
.
Para notas de lançamento detalhadas do provedor do Kubernetes da CNCF, consulte o site do provedor do Kubernetes da CNCF.
Versão 6.0.0
Na versão 6.0.0 do pacote CNCF Kubernetes Provider,
a conexão kubernetes_default
é usada por padrão no KubernetesPodOperator.
Se você especificou uma conexão personalizada na versão 5.0.0, ela ainda será usada pelo operador. Para voltar a usar a conexão kubernetes_default
, ajuste seus DAGs de acordo.
Versão 5.0.0
Esta versão apresenta algumas mudanças incompatíveis com versões anteriores
em comparação com a versão 4.4.0. Os mais importantes estão relacionados à conexão kubernetes_default
, que não é usada na versão 5.0.0.
- A conexão
kubernetes_default
precisa ser modificada. O caminho de configuração do Kubernetes precisa ser definido como/home/airflow/composer_kube_config
, conforme mostrado na figura a seguir. Como alternativa,config_file
precisa ser adicionado à configuração do KubernetesPodOperator, conforme mostrado no exemplo de código a seguir.

- Modifique o código de uma tarefa usando KubernetesPodOperator da seguinte maneira:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Para mais informações sobre a versão 5.0.0, consulte as Notas da versão do provedor do Kubernetes da CNCF (em inglês).
Solução de problemas
Esta seção oferece dicas para solucionar problemas comuns do KubernetesPodOperator:
Ver registros
Ao resolver problemas, verifique os registros na seguinte ordem:
Registros de tarefas do Airflow:
No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia DAGs.
Clique no nome do DAG e depois na execução para conferir os detalhes e os registros.
Registros do programador do Airflow:
Acesse a página Detalhes do ambiente.
Acesse a guia Registros.
Inspecione os registros do agendador do Airflow.
Registros do pod no console Google Cloud , em cargas de trabalho do GKE. Esses registros incluem o arquivo YAML de definição, os eventos e os detalhes do pod.
Códigos de retorno diferentes de zero
Ao usar o KubernetesPodOperator (e o GKEStartPodOperator), o código de retorno do ponto de entrada do contêiner determina se a tarefa é considerada bem-sucedida ou não. Os códigos de retorno diferentes de zero indicam a falha.
Um padrão comum é executar um script de shell como o ponto de entrada do contêiner para agrupar várias operações dentro dele.
Se você estiver escrevendo esse script, recomendamos que inclua o comando
set -e
na parte de cima para que comandos com falha encerrem o script
e propaguem a falha para a instância de tarefa do Airflow.
Tempos limite do pod
O tempo limite padrão para KubernetesPodOperator é de 120 segundos, o que pode resultar na ocorrência de tempos limite antes do download de imagens maiores. É possível aumentar o tempo limite alterando o parâmetro startup_timeout_seconds
ao criar o KubernetesPodOperator.
Quando um pod expira, o registro específico da tarefa fica disponível na IU do Airflow. Exemplo:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Os tempos limite do pod também podem ocorrer quando a conta de serviço do Cloud Composer não tiver as permissões necessárias do IAM para executar a tarefa em questão. Para verificar isso, veja os erros no nível do pod usando os painéis do GKE para analisar os registros para uma carga de trabalho específica, ou use o Cloud Logging.
Falha ao estabelecer uma nova conexão
O upgrade automático é ativado por padrão nos clusters do GKE. Se um pool de nós estiver em um cluster que está fazendo um upgrade, você poderá receber este erro:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Para verificar se o upgrade do cluster está sendo feito, no console Google Cloud , acesse a página de clusters do Kubernetes e procure o ícone de carregamento ao lado do nome do cluster do ambiente.