Usar o CeleryKubernetesExecutor

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Esta página explica como ativar o CeleryKubernetesExecutor no Cloud Composer e como usar o KubernetesExecutor nas suas DAGs.

Sobre o CeleryKubernetesExecutor

O CeleryKubernetesExecutor é um tipo de executor que pode usar CeleryExecutor e KubernetesExecutor ao mesmo tempo de resposta. O Airflow seleciona o executor com base na fila definida para o tarefa. Em um DAG, é possível executar algumas tarefas com o CeleryExecutor e outras com KubernetesExecutor:

  • O CeleryExecutor é otimizado para a execução rápida e escalonável de tarefas.
  • KubernetesExecutor foi projetado para a execução de tarefas que consomem muitos recursos e executar tarefas isoladamente.

CeleryKubernetesExecutor no Cloud Composer

Com o CeleryKubernetesExecutor, no Cloud Composer, é possível usar KubernetesExecutor nas tarefas. Não é possível usar KubernetesExecutor no Cloud Composer separado do CeleryKubernetesExecutor.

O Cloud Composer executa tarefas que você executa com KubernetesExecutor no cluster do ambiente e no mesmo namespace que os workers do Airflow. Essas tarefas têm as mesmas vinculações que os workers do Airflow e podem acessar recursos no seu projeto.

As tarefas que você executa com KubernetesExecutor usam o Modelo de preços do Cloud Composer, já que os pods com essas as tarefas são executadas no cluster do ambiente. SKUs de computação do Cloud Composer (para CPU, memória e armazenamento) aplicam-se a esses pods.

Recomendamos executar tarefas com o CeleryExecutor quando:

  • O tempo de inicialização da tarefa é importante.
  • As tarefas não exigem isolamento de execução e não consomem muitos recursos.

Recomendamos executar tarefas com o KubernetesExecutor quando:

  • As tarefas exigem isolamento do ambiente de execução. Por exemplo, para que as tarefas não concorram por memória e CPU, já que são executadas nos próprios pods.
  • As tarefas exigem bibliotecas de sistema (ou pacotes PyPI) adicionais.
  • As tarefas exigem muitos recursos, e você quer controlar os recursos disponíveis recursos de CPU e memória.

KubernetesExecutor em comparação com KubernetesPodOperator

A execução de tarefas com KubernetesExecutor é semelhante à executar tarefas usando o KubernetesPodOperator. As tarefas são executadas em os pods, fornecendo isolamento de tarefas no nível do pod e melhor gerenciamento de recursos.

No entanto, existem algumas diferenças importantes:

  • KubernetesExecutor executa tarefas apenas no Cloud Composer com controle de versão namespace do seu ambiente. Não é possível alterar esse namespace no Cloud Composer. É possível especificar um namespace em que o KubernetesPodOperator executa tarefas de pod.
  • O KubernetesExecutor pode usar qualquer operador integrado do Airflow. KubernetesPodOperator executa apenas um script fornecido definido pelo ponto de entrada do contêiner.
  • KubernetesExecutor usa a imagem Docker padrão do Cloud Composer com as mesmas substituições de opções de configuração do Python, do Airflow, variáveis e pacotes PyPI definidos no ambiente do Cloud Composer.

Sobre as imagens do Docker

Por padrão, KubernetesExecutor inicia tarefas usando a mesma imagem Docker que O Cloud Composer usa para workers do Celery. Esta é a imagem do Cloud Composer para seu ambiente, com todas as mudanças especificadas para seu ambiente, como PyPI personalizado pacotes ou variáveis de ambiente.

Antes de começar

  • É possível usar o CeleryKubernetesExecutor no Cloud Composer 3.

  • Não é possível usar nenhum executor, exceto o CeleryKubernetesExecutor, no Cloud Composer 3. Isso significa que você pode executar tarefas usando CeleryExecutor, KubernetesExecutor ou ambos em um DAG, mas não é possível configurar o ambiente para usar apenas KubernetesExecutor ou CeleryExecutor.

Configurar o CeleryKubernetesExecutor

substitua a configuração atual do Airflow opções relacionadas a KubernetesExecutor:

  • [kubernetes]worker_pods_creation_batch_size

    Essa opção define o número de chamadas de criação de pods de worker do Kubernetes por ciclo do agendador. O valor padrão é 1, então apenas um pod é iniciado por batimento cardíaco do agendador. Se você usa o KubernetesExecutor com frequência, recomendamos aumentar esse valor.

  • [kubernetes]worker_pods_pending_timeout

    Essa opção define, em segundos, por quanto tempo um worker pode permanecer no estado Pending (o pod está sendo criado) antes de ser considerado com falha. O padrão é de 5 minutos.

Executar tarefas com KubernetesExecutor ou CeleryExecutor

É possível executar tarefas usando o CeleryExecutor, o KubernetesExecutor ou ambos em um DAG:

  • Para executar uma tarefa com KubernetesExecutor, especifique o valor kubernetes no queue de uma tarefa.
  • Para executar uma tarefa com o CeleryExecutor, omita o parâmetro queue.

O exemplo a seguir executa a tarefa task-kubernetes usando KubernetesExecutor e a tarefa task-celery usando o CeleryExecutor:

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

Executar comandos da CLI do Airflow relacionados ao KubernetesExecutor

É possível executar várias Comandos da CLI do Airflow relacionados ao KubernetesExecutor usando gcloud.

Personalizar a especificação do pod de worker

É possível personalizar a especificação do pod de worker ao transmiti-la no executor_config de uma tarefa. É possível usar isso para definir CPU e memória personalizados e cumprimento de requisitos regulatórios.

É possível substituir toda a especificação do pod de worker usada para executar uma tarefa. Para recuperar a especificação do pod de uma tarefa usada pelo KubernetesExecutor, é possível executar a CLI kubernetes generate-dag-yaml do Airflow kubectl.

Para mais informações sobre como personalizar a especificação do pod de worker, consulte Documentação do Airflow.

O exemplo a seguir demonstra uma tarefa que usa a especificação de pod de worker personalizado:

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '500m',
                            'memory': '1000Mi',
                        })
                    ),
                ],
            ),
        )
    },
)

Ver registros de tarefas

Os registros das tarefas executadas pelo KubernetesExecutor estão disponíveis na guia Registros. com os registros de tarefas executadas pelo CeleryExecutor:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Registros.

  4. Acesse Todos os registros > Registros do Airflow > Workers.

  5. Execução de workers chamados airflow-k8s-worker KubernetesExecutor. Para procurar os registros de uma tarefa específica, usar um ID do DAG ou da tarefa como uma palavra-chave na pesquisa.

A seguir