Como solucionar problemas do programador do Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Nesta página, você encontra informações sobre a solução de problemas comuns com programadores do Airflow e etapas de solução de problemas.

Identificar a origem do problema

Para começar a solução de problemas, identifique se o problema acontece no tempo de análise do DAG ou durante o processamento de tarefas no momento da execução. Para mais informações sobre o tempo de análise e o tempo de execução do DAG, leia Diferença entre o tempo de análise do DAG e o tempo de execução do DAG.

Como inspecionar problemas de processamento de DAG

  1. Inspecione os registros do processador DAG.

  2. Confira os tempos de análise do DAG.

Como monitorar tarefas em execução ou na fila

Para verificar se há tarefas travadas em uma fila, siga estas etapas.

  1. No Google Cloud console, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.

  3. Acesse a guia Monitoramento.

  4. Na guia Monitoring, consulte o gráfico Tarefas do Airflow na seção Execuções do DAG e identifique possíveis problemas. As tarefas do Airflow são tarefas que estão em um estado em fila no Airflow. Elas podem ir para a fila de agentes do Celery ou do Kubernetes. As tarefas em fila do Celery são instâncias de tarefas colocadas na fila de agentes do Celery.

Solução de problemas no momento da análise do DAG

As seções a seguir descrevem sintomas e possíveis correções para alguns problemas comuns no tempo de análise do DAG.

Análise e programação de DAGs no Cloud Composer 1 e no Airflow 1

A eficiência da análise de DAG foi significativamente melhorada no Airflow 2. Se você tiver problemas de desempenho relacionados à análise e programação do DAG, considere migrar para o Airflow 2.

No Cloud Composer 1, o programador é executado em nós do cluster com outros componentes do Cloud Composer. Por isso, a carga de nós individuais do cluster pode ser maior ou menor em comparação com outros nós. O desempenho do programador (análise e programação de DAG) pode variar dependendo do nó em que ele é executado. Além disso, um nó individual em que o programador é executado pode mudar como resultado de operações de upgrade ou manutenção. Essa limitação foi resolvida no Cloud Composer 2, em que é possível alocar recursos de CPU e memória para o programador, e o desempenho dele não depende da carga dos nós do cluster.

Número e distribuição de tempo das tarefas

O Airflow é conhecido por ter problemas com a programação de um grande número de tarefas pequenas. Nesses casos, opte por um número menor de tarefas mais consolidadas.

Programar um grande número de DAGs ou tarefas ao mesmo tempo também pode ser uma fonte de problemas. Para evitar esse problema, distribua suas tarefas de maneira mais uniforme com o tempo.

Solução de problemas com tarefas em execução e na fila

As seções a seguir descrevem sintomas e possíveis correções para alguns problemas comuns com tarefas em execução e na fila.

As execuções de DAG não são executadas

Sintoma:

Quando uma data de programação de um DAG é definida dinamicamente, isso pode levar a vários efeitos colaterais inesperados. Exemplo:

  • Uma execução de DAG é sempre no futuro, e o DAG nunca é executado.

  • As execuções anteriores de DAG são marcadas como executadas e bem-sucedidas, mesmo que não sejam executadas.

Mais informações estão disponíveis na documentação do Apache Airflow (em inglês).

Possíveis soluções:

  • Siga as recomendações na documentação do Apache Airflow.

  • Defina start_date estático para DAGs. Como opção, use catchup=False para desativar a execução da DAG para datas anteriores.

  • Evite usar datetime.now() ou days_ago(<number of days>), a menos que você esteja ciente dos efeitos colaterais dessa abordagem.

Filas de tarefas são muito longas

Em alguns casos, uma fila de tarefas pode ser longa demais para o programador. Para informações sobre como otimizar parâmetros de worker e acelerado, leia sobre como escalonar o ambiente do Cloud Composer junto com a empresa.

Como usar o recurso TimeTable do Airflow Scheduler

A partir do Airflow 2.2, é possível definir uma tabela de horários para um DAG usando um novo recurso chamado TimeTable.

É possível definir uma tabela de horários usando um dos seguintes métodos:

Recursos de cluster limitados

Esta seção se aplica apenas ao Cloud Composer 1.

É possível que ocorram problemas de desempenho se o cluster do GKE do ambiente for muito pequeno para processar todos os DAGs e tarefas. Nesse caso, tente uma destas soluções:

  • Crie um novo ambiente com um tipo de máquina que ofereça mais desempenho e migre seus DAGs para ele.
  • Criar mais ambientes do Cloud Composer e dividir os DAGs entre eles
  • Altere o tipo de máquina dos nós do GKE, conforme descrito em Como fazer upgrade do tipo de máquina para os nós do GKE. Como esse procedimento é propenso a erros, é a opção menos recomendada.
  • Faça upgrade do tipo de máquina da instância do Cloud SQL que executa o banco de dados do Airflow no ambiente, por exemplo, usando os comandos gcloud composer environments update. O baixo desempenho do banco de dados do Airflow pode ser o motivo da lentidão do programador.

Evite programar tarefas durante janelas de manutenção

É possível definir janelas de manutenção específicas para seu ambiente. Durante esses períodos, ocorrem eventos de manutenção para o Cloud SQL e o GKE.

Uso de "wait_for_downstream" nos DAGs

Se você definir o parâmetro wait_for_downstream como True nos DAGs, para que uma tarefa seja bem-sucedida, todas as tarefas que estiverem imediatamente downstream também serão bem-sucedidas. de dados. Isso significa que a execução de tarefas pertencentes a uma determinada execução do DAG pode ser reduzida pela execução de tarefas da execução anterior do DAG. Leia mais sobre isso na documentação do Airflow.

As tarefas em fila por muito tempo serão canceladas e reprogramadas.

Se uma tarefa do Airflow for mantida na fila por muito tempo, o programador a reagendará para execução. Nas versões do Airflow anteriores à 2.3.1, a tarefa também é marcada como com falha e refeita se for qualificada para uma nova tentativa.

Uma maneira de observar os sintomas dessa situação é analisar o gráfico com o número de tarefas na fila (guia "Monitoramento" na interface do Cloud Composer). Se os picos no gráfico não diminuírem em cerca de duas horas, as tarefas provavelmente serão reprogramadas (sem registros), seguidas por "As tarefas adotadas ainda estão pendentes..." nas entradas de registro do agendador. Nesses casos, você pode ver a mensagem "O arquivo de registro não foi encontrado..." nos registros de tarefas do Airflow porque a tarefa não foi executada.

Em geral, esse comportamento é esperado, e a próxima instância da tarefa programada deve ser executada de acordo com a programação. Se você observar muitos casos desse tipo nos seus ambientes do Cloud Composer, isso pode significar que não há workers do Airflow suficientes no ambiente para processar todas as tarefas programadas.

Resolução: para resolver esse problema, verifique se há capacidade suficiente nos workers do Airflow para executar tarefas na fila. Por exemplo, é possível aumentar o número de workers ou worker_concurrency. Você também pode ajustar o paralelismo ou os pools para evitar que as tarefas da fila excedam a capacidade.

Às vezes, tarefas desatualizadas podem bloquear a execução de um DAG específico.

Em casos normais, o programador do Airflow pode lidar com situações em que há tarefas desatualizadas na fila e, por algum motivo, não é possível executá-las corretamente (por exemplo, um DAG a que as tarefas desatualizadas pertencem foi excluído).

Se essas tarefas desatualizadas não forem excluídas pelo programador, talvez seja necessário excluí-las manualmente. Isso pode ser feito, por exemplo, na interface do Airflow. Acesse (Menu > Browser > Task Instances), encontre as tarefas em fila que pertencem a um DAG desatualizado e exclua-as.

Para resolver esse problema, faça upgrade do ambiente para a versão 2.1.12 ou mais recente do Cloud Composer.

Abordagem do Cloud Composer para o parâmetro [scheduler]min_file_process_interval

O Cloud Composer muda a forma como o [scheduler]min_file_process_interval é usado pelo programador do Airflow.

Airflow 1

No caso do Cloud Composer usando o Airflow 1, os usuários podem definir o valor de [scheduler]min_file_process_interval entre 0 e 600 segundos. Valores maiores que 600 segundos trazem os mesmos resultados que se [scheduler]min_file_process_interval for definido como 600 segundos.

Airflow 2

No Airflow 2, o [scheduler]min_file_process_interval só pode ser usado com as versões 1.19.9 e 2.0.26 ou mais recentes.

  • Versões do Cloud Composer anteriores à 1.19.9 e 2.0.26

    Nessas versões, [scheduler]min_file_process_interval é ignorado.

  • Cloud Composer 1.19.9 ou 2.0.26 ou versões mais recentes

    O programador do Airflow é reiniciado depois de um determinado número de vezes que todos os DAGs são programados, e o parâmetro [scheduler]num_runs controla quantas vezes isso é feito pelo programador. Quando o programador chega a loops de programação [scheduler]num_runs, ele é reiniciado. O programador é um componente sem estado, e essa reinicialização é um mecanismo de recuperação automática para qualquer problema que o programador possa ter. Quando não especificado, o valor padrão de [scheduler]num_runs é aplicado, que é 5.000.

    O [scheduler]min_file_process_interval pode ser usado para configurar a frequência de análise de DAG, mas esse parâmetro não pode ser mais longo do que o tempo necessário para que um programador execute loops [scheduler]num_runs ao programar seus DAGs.

Como escalonar a configuração do Airflow

O Airflow oferece opções de configuração que controlam quantas tarefas e DAGs o Airflow pode executar ao mesmo tempo. Para definir essas opções de configuração, modifique os valores para o ambiente.

  • Simultaneidade do worker

    O parâmetro [celery]worker_concurrency controla o número máximo de tarefas que um worker do Airflow pode executar ao mesmo tempo. Se você multiplicar o valor desse parâmetro pelo número de workers do Airflow no ambiente do Cloud Composer, você receberá o número máximo de tarefas que podem ser executadas em um determinado momento no ambiente. Esse número é limitado pela opção de configuração [core]parallelism do Airflow, que é descrita em mais detalhes.

    Nos ambientes do Cloud Composer 2, o valor padrão de [celery]worker_concurrency é calculado automaticamente.

    • Para as versões do Airflow 2.3.3 e mais recentes, [celery]worker_concurrency é definido como um valor mínimo de 32, 12 * worker_CPU e 8 * worker_memory.

    • Para versões do Airflow 2.2.5 ou anteriores, [celery]worker_concurrency é definido como 12 * número de CPUs dos workers.

  • Máximo de execuções de DAGs ativas

    A opção de configuração [core]max_active_runs_per_dag do Airflow controla o número máximo de execuções ativas de DAGs por DAG. O programador não criará mais execuções de DAGs se atingir esse limite.

    Se esse parâmetro for definido incorretamente, você poderá encontrar um problema em que o programador restringe a execução do DAG, porque não é possível criar mais instâncias de execução do DAG em um determinado momento.

  • Máximo de tarefas ativas por DAG

    A opção de configuração [core]max_active_tasks_per_dag do Airflow controla o número máximo de instâncias de tarefa que podem ser executadas simultaneamente em cada DAG. É um parâmetro no nível do DAG.

    Se esse parâmetro for definido incorretamente, você poderá encontrar um problema em que a execução de uma única instância do DAG é lenta porque há apenas um número limitado de tarefas do DAG que podem ser executadas em um determinado momento

    Solução: aumente [core]max_active_tasks_per_dag.

  • Paralelismo e tamanho do pool

    A opção de configuração [core]parallelism do Airflow controla quantas tarefas o programador do Airflow pode enfileirar na fila do executor após todas as dependências dessas tarefas serem atendidas.

    Este é um parâmetro global para toda a configuração do Airflow.

    As tarefas são enfileiradas e executadas em um pool. Os ambientes do Cloud Composer usam apenas um pool. O tamanho desse pool controla quantas tarefas podem ser enfileiradas pelo programador para execução em um determinado momento. Se o tamanho do pool for muito pequeno, o programador não poderá enfileirar tarefas para execução, mesmo que os limites sejam definidos pela opção de configuração [core]parallelism e pelo [celery]worker_concurrency. opção de configuração multiplicada pelo número de workers do Airflow ainda não foi atendida.

    É possível configurar o tamanho do pool na IU do Airflow (Menu > Administrador > Pools). Ajuste o tamanho do pool com o nível de paralelismo esperado no ambiente.

    Normalmente, [core]parallelism é definido como um produto do número máximo de workers e [celery]worker_concurrency.

Marcação de tarefas como com falha após atingir dagrun_timeout

O programador marca as tarefas que não foram concluídas (em execução, programadas e na fila) como com falha se uma execução de DAG não for concluída dentro de dagrun_timeout (um parâmetro de DAG).

Solução:

Sintomas de pressão de carga no banco de dados do Airflow

Às vezes, nos registros do agendador do Airflow, você pode encontrar a seguinte entrada de registro de aviso:

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

Também é possível observar sintomas semelhantes nos registros do worker do Airflow:

Para MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

Para PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

Esses erros ou avisos podem ser um sintoma de que o banco de dados do Airflow está sobrecarregado com o número de conexões abertas ou o número de consultas executadas ao mesmo tempo, por programadores ou outros componentes do Airflow, como workers, acionadores e servidores da Web.

Possíveis soluções:

O servidor da Web mostra o aviso "The scheduler does not appear to be running" (O agendador não parece estar em execução).

O programador informa o sinal de funcionamento regularmente no banco de dados do Airflow. Com base nessas informações, o servidor da Web do Airflow determina se o programador está ativo.

Às vezes, se o programador estiver com carga pesada, ele não poderá informar o heartbeat a cada [scheduler]scheduler-heartbeat-sec.

Nesse caso, o servidor da Web do Airflow pode mostrar o seguinte aviso:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

Possíveis soluções:

Soluções alternativas para problemas encontrados durante o preenchimento de DAGs

Às vezes, você pode querer executar novamente DAGs que já foram executados. Você pode fazer isso com a ferramenta de linha de comando do Airflow da seguinte maneira:

Airflow 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

Para executar novamente apenas as tarefas com falha de um DAG específico, use também o argumento --rerun-failed-tasks.

Airflow 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
  backfill -- -B \
  -s START_DATE \
  -e END_DATE \
  DAG_NAME

Para executar novamente apenas as tarefas com falha de um DAG específico, use também o argumento --rerun_failed_tasks.

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente
  • LOCATION pela região em que o ambiente está localizado;
  • START_DATE com um valor para o parâmetro DAG start_date, no formato YYYY-MM-DD.
  • END_DATE com um valor para o parâmetro DAG end_date, no formato YYYY-MM-DD.
  • DAG_NAME pelo nome do DAG.

Às vezes, a operação de preenchimento pode gerar uma situação de deadlock em que o preenchimento não é possível porque há um bloqueio em uma tarefa. Exemplo:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

Em alguns casos, é possível usar as seguintes soluções alternativas para superar os deadlocks:

A seguir