Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Esta página fornece etapas de solução de problemas e informações para fluxos de trabalho comuns problemas.
Muitos problemas de execução de DAGs são causados por desempenho de ambiente não ideal. Para otimizar seu ambiente do Cloud Composer 2, siga as orientações do guia sobre desempenho e custos do ambiente.
Alguns problemas de execução do DAG podem ser causados pelo programador do Airflow não está funcionando de maneira correta ou otimizada. Siga Instruções de solução de problemas do programador para resolver esses problemas.
Como resolver problemas do fluxo de trabalho
Para começar a solução de problemas, siga estes passos:
Verifique os registros do Airflow.
É possível aumentar o nível de geração de registros do Airflow substituindo a seguinte opção de configuração do Airflow.
Airflow 2
Seção Chave Valor logging
logging_level
O valor padrão é INFO
. Defina comoDEBUG
para exibir mais detalhes nas mensagens de registro.Airflow 1
Seção Chave Valor core
logging_level
O valor padrão é INFO
. Defina comoDEBUG
para exibir mais detalhes nas mensagens de registro.Verifique o painel de monitoramento.
Revise o Cloud Monitoring.
No console do Google Cloud, verifique se há erros nas páginas dos os componentes do seu ambiente.
Na interface da Web do Airflow, verificar na visualização de gráfico do DAG instâncias de tarefa com falha.
Seção Chave Valor webserver
dag_orientation
LR
,TB
,RL
ouBT
Como depurar falhas do operador
Para depurar uma falha do operador, siga estes passos:
- Verifique se há erros específicos da tarefa.
- Verifique os registros do Airflow.
- Revise o Cloud Monitoring.
- Verifique os registros específicos do operador.
- Corrija os erros.
- Faça upload do DAG para a pasta
dags/
. - Na interface da Web do Airflow, limpe os estados anteriores do DAG.
- Execute o DAG ou retome esse processo.
Solução de problemas na execução de tarefas
O Airflow é um sistema distribuído com muitas entidades, como programador, executor, os workers que se comunicam entre si por meio de uma fila de tarefas e do Airflow banco de dados e enviar sinais (como SIGTERM). O diagrama a seguir mostra uma Visão geral das interconexões entre os componentes do Airflow.
Em um sistema distribuído, como o Airflow, pode haver alguns problemas de conectividade de rede, ou a infraestrutura subjacente pode apresentar problemas intermitentes. Isso pode levar a situações em que as tarefas podem falhar e ser reprogramadas para execução ou não serem concluídas (por exemplo, tarefas zumbis ou tarefas que ficaram presas na execução). O Airflow tem mecanismos para lidar em tais situações e retomar o funcionamento normal automaticamente. Seguindo as seções explicam problemas comuns que ocorrem durante a execução de tarefas pelo Airflow: Tarefas de zumbis, pílulas de veneno e sinais SIGTERM.
Solução de problemas de tarefas zumbis
O Airflow detecta dois tipos de incompatibilidade entre uma tarefa e um processo executado a tarefa:
As tarefas zumbi são tarefas que deveriam estar em execução, mas não estão em execução. Isso pode acontecer se o processo da tarefa tiver sido encerrado ou não for responder se o worker do Airflow não informar o status da tarefa a tempo por estar sobrecarregada ou se a VM em que a tarefa é executada tiver sido encerrada. O Airflow encontra essas tarefas periodicamente e falha ou repete a tarefa. dependendo das configurações da tarefa.
Descobrir tarefas zumbi
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-scheduler") textPayload:"Detected zombie job"
Tarefas não inativas são tarefas que não podem estar em execução. Descobertas do Airflow essas tarefas periodicamente e as encerra.
Os motivos e as soluções mais comuns para tarefas zumbis estão listados abaixo.
O worker do Airflow ficou sem memória
Cada worker do Airflow pode executar até [celery]worker_concurrency
instâncias de tarefa
ao mesmo tempo. Se o consumo de memória cumulativo dessas instâncias de tarefa
ultrapassar o limite de memória de um worker do Airflow, um processo aleatório será
encerrado para liberar recursos.
Descobrir eventos de falta de memória do worker do Airflow
resource.type="k8s_node" resource.labels.cluster_name="GKE_CLUSTER_NAME" log_id("events") jsonPayload.message:"Killed process" jsonPayload.message:("airflow task" OR "celeryd")
Soluções:
Otimizar tarefas para usar menos memória, por exemplo, ao evitar o código de nível superior;
Nas versões do Cloud Composer 2 anteriores à 2.6.0, atualize
[celery]worker_concurrency
usando a fórmula atual se o valor for menor.No Cloud Composer 2, use substituições de configuração do Airflow para manter
[celery]worker_concurrency
e aumentar a memória para workers do Airflow;No Cloud Composer 1, faça upgrade para um tipo de máquina maior.
Diminuir
[celery]worker_concurrency
.
O worker do Airflow foi removido
As remoções de pods são parte normal da execução de cargas de trabalho no Kubernetes. O GKE elimina pods se eles ficarem sem armazenamento ou forem liberados recursos para cargas de trabalho com prioridade mais alta.
Descubra as remoções de workers do Airflow
resource.type="k8s_pod" resource.labels.cluster_name="GKE_CLUSTER_NAME" resource.labels.pod_name:"airflow-worker" log_id("events") jsonPayload.reason="Evicted"
Soluções:
- Se a remoção for causada por falta de armazenamento, é possível reduzir o espaço
ou remover arquivos temporários assim que eles deixarem de ser necessários.
Também é possível
aumentar o armazenamento disponível ou executar
cargas de trabalho em um pod dedicado usando
KubernetesPodOperator
.
O worker do Airflow foi encerrado
Os workers do Airflow podem ser removidos externamente. Se as tarefas em execução no momento não terminar durante um período de rescisão normal, eles serão interrompidos e poderão acabam sendo detectados como zumbis.
descobrir encerramentos de pods de workers do Airflow
resource.type="k8s_cluster" resource.labels.cluster_name="GKE_CLUSTER_NAME" protoPayload.methodName:"pods.delete" protoPayload.response.metadata.name:"airflow-worker"
Possíveis cenários e soluções:
Os workers do Airflow são reiniciados durante modificações no ambiente, como atualizações ou instalação de pacotes:
Descobrir modificações no ambiente do Composer
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("cloudaudit.googleapis.com%2Factivity")
É possível realizar essas operações quando nenhuma tarefa importante estiver em execução ou ativar novas tentativas da tarefa.
Vários componentes podem ficar temporariamente indisponíveis durante a manutenção operações:
Descubra as operações de manutenção do GKE
resource.type="gke_nodepool" resource.labels.cluster_name="GKE_CLUSTER_NAME" protoPayload.metadata.operationType="UPGRADE_NODES"
É possível especificar janelas de manutenção para minimizar se sobrepõe à execução das tarefas críticas.
No Cloud Composer 2 versões anteriores à 2.4.5, um encerramento do Airflow o worker pode ignorar o sinal SIGTERM e continuar a executar tarefas:
Descubra como reduzir a escala vertical pelo escalonamento automático do Composer
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-worker-set") textPayload:"Workers deleted"
Você pode fazer upgrade para uma versão mais recente do Cloud Composer em que esse problema foi corrigido.
O worker do Airflow estava sobrecarregado
A quantidade de recursos de CPU e memória disponíveis para um worker do Airflow é limitada pela configuração do ambiente. Se uma utilização se aproximar dos limites, causaria uma contenção de recursos e atrasos desnecessários durante a tarefa execução. Em situações extremas, quando faltam recursos durante períodos mais longos tempo, isso poderia causar tarefas zumbi.
Soluções:
- Monitorar o uso de CPU e memória dos workers e ajustá-lo para evitar ultrapassando 80%.
O banco de dados do Airflow estava sobrecarregado
Vários componentes do Airflow usam um banco de dados para se comunicar entre si e, em particular, para armazenar batimentos cardíacos de instâncias de tarefas. A falta de recursos no banco de dados vai levar a tempos de consulta mais longos e pode afetar a execução de tarefas.
Soluções:
O banco de dados do Airflow estava temporariamente indisponível
Um worker do Airflow pode levar algum tempo para detectar e lidar com erros como problemas temporários de conectividade. Ele pode exceder o valor limite de detecção de zumbis.
Descobrir tempos limite do sinal de funcionamento do Airflow
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-worker") textPayload:"Heartbeat time limit exceeded"
Soluções:
Aumente o tempo limite para tarefas zumbi e substitua o valor de
[scheduler]scheduler_zombie_task_threshold
Opção de configuração do Airflow:Seção Chave Valor Observações scheduler
scheduler_zombie_task_threshold
Novo tempo limite (em segundos) O padrão o valor é 300
Solução de problemas do Veneno Pill
O Poison Pill é um mecanismo usado pelo Airflow para encerrar tarefas do Airflow.
O Airflow usa o Poison Pill nestas situações:
- Quando um programador encerra uma tarefa que não foi concluída no prazo.
- Quando uma tarefa expira ou é executada por muito tempo.
Quando o Airflow usa o Poison Pill, as seguintes entradas de registro aparecem nos registros de um worker do Airflow que executou a tarefa:
INFO - Subtask ... WARNING - State of this instance has been externally set
to success. Taking the poison pill.
INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.
Soluções possíveis:
- Verifique se há erros no código da tarefa que podem fazer com que ela seja executada por muito tempo.
- (Cloud Composer 2) Aumente a CPU e a memória para o Airflow workers, o que acelera a execução das tarefas.
Aumente o valor da opção de configuração
[celery_broker_transport_options]visibility-timeout
do Airflow.Como resultado, o programador espera mais tempo para uma tarefa ser concluída, antes de considerar que é uma tarefa zumbi. Essa opção é especialmente útil para tarefas demoradas que duram muitas horas. Se o valor for muito baixo (por exemplo, 3 horas), o programador considerará tarefas que são executadas por 5 ou 6 horas como "interrompidas" (tarefas zumbis).
Aumente o valor do Airflow
[core]killed_task_cleanup_time
de configuração do Terraform.Um valor maior dá mais tempo para que os workers do Airflow concluam as tarefas graciosamente. Se o valor for muito baixo, as tarefas do Airflow poderão ser interrompidas abruptamente, sem tempo suficiente para terminar seu trabalho graciosamente.
Solução de problemas de sinais SIGTERM
Sinais SIGTERM são usados pelo Linux, o Kubernetes, o programador do Airflow e o Celery encerrarem processos responsáveis pela que executam workers ou tarefas do Airflow.
Há vários motivos para que os sinais SIGTERM sejam enviados em um ambiente:
Uma tarefa se tornou uma tarefa zumbi e precisa ser interrompida.
O programador descobriu uma cópia de uma tarefa e envia a Poison Pill e SIGTERM sinaliza à tarefa para interrompê-la.
No Escalonamento automático horizontal de pods, o serviço do GKE O plano de controle envia sinais SIGTERM para remover pods que não são mais necessários.
O programador pode enviar sinais SIGTERM para o processo DagFileProcessorManager. Esses sinais SIGTERM são usados pelo Programador para gerenciar Ciclo de vida do processo DagFileProcessorManager e pode ser ignorado com segurança.
Exemplo:
Launched DagFileProcessorManager with pid: 353002 Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: [] Sending the signal Signals.SIGTERM to group 353002 Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
Disputa entre o retorno de chamada do sinal de funcionamento e os callbacks de saída no local_task_job, que monitora a execução da tarefa. Se o heartbeat detectar que uma tarefa foi marcada como concluída, ele não poderá distinguir se a tarefa foi concluída ou se o Airflow foi instruído a considerar a tarefa como concluída. No entanto, ele encerrará um executor de tarefas, sem esperar para que ele seja fechado.
Esses sinais SIGTERM podem ser ignorados com segurança. A tarefa já está no um estado bem-sucedido e a execução do DAG como um todo não serão afetadas.
A entrada de registro
Received SIGTERM.
é a única diferença entre as tags saída e o encerramento da tarefa no estado bem-sucedido.Um componente do Airflow usa mais recursos (CPU, memória) do que o permitido pelo nó do cluster.
o serviço do GKE executa operações de manutenção e envia sinais SIGTERM para pods executados em um nó que está prestes a ser atualizado. Quando uma instância de tarefa é encerrada com SIGTERM, é possível ver o seguinte registro entradas nos registros de um worker do Airflow que executou a tarefa:
{local_task_job.py:211} WARNING - State of this instance has been externally
set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received
SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed
with exception
Possíveis soluções:
Esse problema acontece quando uma VM que executa a tarefa está sem memória. Isso não é relacionadas às configurações do Airflow, mas à quantidade de memória disponível para o VM.
O aumento da memória depende da versão do Cloud Composer que você usa. Exemplo:
No Cloud Composer 2, é possível atribuir mais recursos de CPU e memória ao Airflow trabalhadores
No caso do Cloud Composer 1, é possível recriar o ambiente usando uma tipo de máquina com mais desempenho.
Nas duas versões do Cloud Composer, é possível diminuir o valor da opção de configuração de simultaneidade
[celery]worker_concurrency
do Airflow. Essa opção determina quantas tarefas são executadas simultaneamente por um determinado worker do Airflow.
Para mais informações sobre como otimizar seu ambiente do Cloud Composer 2, consulte Otimizar o desempenho e os custos do ambiente
Consultas do Cloud Logging para descobrir os motivos das reinicializações ou expulsões de pods
Os ambientes do Cloud Composer usam clusters do GKE como infraestrutura de computação camada Nesta seção, você poderá encontrar consultas úteis que podem ajudar a encontrar motivos para reinicializações ou remoções do worker ou do programador do Airflow.
As consultas apresentadas abaixo podem ser ajustadas da seguinte maneira:
você pode especificar uma linha do tempo interessante para você no Cloud Logging; por exemplo, as últimas seis horas ou três dias. Também é possível definir um período personalizado
você precisa especificar o Cloud Composer CLUSTER_NAME
é possível limitar a pesquisa a um pod específico adicionando o POD_NAME
Descobrir contêineres reiniciados
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME"
Consulta alternativa para limitar os resultados a um pod específico:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Descobrir contêineres que foram encerrados como resultado de um evento de falta de memória.
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME"
Consulta alternativa para limitar os resultados a um pod específico:
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Descobrir contêineres que pararam de ser executados
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME"
Consulta alternativa para limitar os resultados a um pod específico:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Impacto das operações de atualização ou upgrade nas execuções de tarefas do Airflow
As operações de atualização ou upgrade interrompem a execução de tarefas do Airflow. a menos que uma tarefa seja executada no modo adiável.
Recomendamos a realização dessas operações quando você espera um impacto mínimo nas execuções de tarefas do Airflow e configurar mecanismos de repetição adequados nas DAGs e tarefas.
Como solucionar problemas de tarefas KubernetesExecutor
O CeleryKubernetesExecutor é um tipo de executor no Cloud Composer 3. que pode usar o CeleryExecutor e o KubernetesExecutor ao mesmo tempo de resposta.
Consulte a página Usar CeleryKubernetesExecutor para ver mais informações sobre solução de problemas de tarefas executadas com KubernetesExecutor.
Problemas comuns
Nas seções a seguir, você encontra descrições dos sintomas e possíveis correções de alguns problemas comuns do DAG.
A tarefa do Airflow foi interrompida por Negsignal.SIGKILL
Às vezes, sua tarefa pode estar usando mais memória do que o worker do Airflow está alocado.
Nessa situação, ele pode ser interrompido por Negsignal.SIGKILL
. O sistema
envia esse sinal para evitar mais consumo de memória, o que pode afetar
a execução de outras tarefas do Airflow. No registro do worker do Airflow, pode aparecer
a seguinte entrada de registro:
{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL
Negsignal.SIGKILL
também pode aparecer como código -9
.
Soluções possíveis:
Menor
worker_concurrency
de workers do Airflow.No caso do Cloud Composer 2, aumente a memória dos workers do Airflow.
No caso do Cloud Composer 1, faça upgrade para um tipo de máquina maior usado no cluster do Cloud Composer.
Otimize suas tarefas para usar menos memória.
Gerenciar tarefas que consomem muitos recursos no Cloud Composer usando KubernetesPodOperator ou GKEStartPodOperator, isolamento de tarefas e alocação personalizada de recursos.
A tarefa falha sem emitir registros devido a erros de análise do DAG
Às vezes, pode haver erros sutis de DAG que levam a uma situação em que
um programador do Airflow e um processador de DAG conseguem programar tarefas para execução
e analisar um arquivo DAG (respectivamente), mas o worker do Airflow falha ao executar tarefas
de um DAG porque há erros de programação no arquivo DAG do Python. Isso pode
levar a uma situação em que uma tarefa do Airflow é marcada como Failed
e não há registro da execução dela.
Soluções:
Verifique nos registros de workers do Airflow se não há erros gerados pelo Worker do Airflow relacionado a erros de análise de DAG ou DAG ausentes.
Aumente os parâmetros relacionados à análise do DAG:
Aumentar dagbag-import-timeout para pelo menos 120 segundos (ou mais, se necessário).
Aumente dag-file-processor-timeout para pelo menos 180 segundos (ou mais, se necessário). Esse valor precisa ser maior que
dagbag-import-timeout
.
Consulte também Como inspecionar registros do processador de DAG.
A tarefa falha sem emitir registros devido à pressão de recursos
Sintoma: durante a execução de uma tarefa, o subprocesso do worker do Airflow é responsável para a execução de tarefas do Airflow é interrompida abruptamente. O erro visível no registro do worker do Airflow pode ser semelhante a este:
...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task R = retval = fun(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__ return self.run(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command _execute_in_fork(command_to_exec) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...
Solução:
- No Cloud Composer 1, crie um novo ambiente com o
um tipo de máquina maior que a máquina atual
não é válido. Considere adicionar mais nós ao ambiente e menos
[celery]worker_concurrency
para os workers. - No Cloud Composer 2, aumente os limites de memória. para workers do Airflow.
- Caso seu ambiente também gere tarefas zumbi, consulte Solução de problemas com tarefas zumbi.
- Para acessar um tutorial sobre como depurar problemas de falta de memória, consulte Depurar problemas de falta de memória e de DAG sem armazenamento
A tarefa falha sem emitir registros devido à remoção do pod
Os pods do Google Kubernetes Engine estão sujeitos à Ciclo de vida do pod do Kubernetes e remoção de pods. Tarefa picos e co-programação de workers são as duas causas mais comuns de remoção de pods no Cloud Composer.
A remoção de pods pode ocorrer quando um determinado pod usa em excesso os recursos de um nó, em relação às expectativas de consumo de recursos configuradas para o nó. Por exemplo, a remoção pode acontecer quando várias tarefas com muita memória são executadas em um pod, e a carga combinada faz com que o nó em que o pod é executado exceda o limite de consumo de memória.
Se um pod de worker do Airflow for removido, todas as instâncias de tarefas em execução nele serão interrompidas e, posteriormente, marcadas como com falha pelo Airflow.
Os registros são armazenados em buffer. Se um pod de worker for removido antes da limpeza do buffer, os registros não serão emitidos. Quando uma tarefa falha sem emitir registros, isso indica que os workers do Airflow serão reiniciados devido à falta de memória (OOM, na sigla em inglês). Alguns registros podem estar presentes no Cloud Logging mesmo que os registros do Airflow não tenham sido emitidos.
Para ver os registros:
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, clique no nome do ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Registros.
Visualize os registros de workers individuais em Todos os registros -> Registros do Airflow -> Workers -> (funcionário individual).
A execução do DAG é limitada pela memória. Todas as tarefas são iniciadas com dois processos do Airflow: execução de tarefa e monitoramento. Cada nó aceita até seis tarefas simultâneas, com aproximadamente 12 processos carregados com módulos do Airflow. É possível que mais memória seja consumida dependendo da natureza do DAG.
Sintoma:
No console do Google Cloud, acesse a página Cargas de trabalho.
Se houver pods
airflow-worker
que mostremEvicted
, clique em cada pod removido e procure a mensagemThe node was low on resource: memory
na parte superior da janela.
Corrigir:
- No Cloud Composer 1, crie um novo ambiente do Cloud Composer com um tipo de máquina maior que a máquina atual não é válido.
- No Cloud Composer 2, aumente os limites de memória. para workers do Airflow.
- Verifique os registros de pods
airflow-worker
para encontrar possíveis causas de remoção. Para mais informações sobre a busca de registros de pods individuais, consulte Como solucionar problemas com cargas de trabalho implantadas. - Garanta que as tarefas no DAG sejam idempotentes e que possam ser repetidas.
Evite fazer o download de arquivos desnecessários para o sistema de arquivos local dos workers do Airflow.
Os workers do Airflow têm capacidade limitada de sistema de arquivos local. Por exemplo, em Cloud Composer 2, um worker pode ter de 1 GB a 10 GB de armazenamento. Quando o se o espaço de armazenamento acabar, o pod de worker do Airflow será removido Plano de controle do GKE. Isso falha em todas as tarefas removidas worker estava em execução.
Exemplos de operações problemáticas:
- fazer o download de arquivos ou objetos e armazená-los localmente em um worker. Em vez disso, armazene esses objetos diretamente em um serviço adequado, como um bucket do Cloud Storage.
- Acessar objetos grandes na pasta
/data
por um worker do Airflow. O worker do Airflow faz o download do objeto para o sistema de arquivos local. Em vez disso, implemente os DAGs para que arquivos grandes sejam processados fora do pod de worker do Airflow.
A importação da carga do DAG atingiu o tempo limite
Sintoma:
- Na interface da Web do Airflow, na parte de cima da página da lista de DAGs, uma caixa de alerta vermelha mostra
Broken DAG: [/path/to/dagfile] Timeout
. No Cloud Monitoring: os registros
airflow-scheduler
contêm entradas semelhante a:ERROR - Process timed out
ERROR - Failed to import: /path/to/dagfile
AirflowTaskTimeout: Timeout
Corrigir:
Modifique a opção de configuração do Airflow dag_file_processor_timeout
e conceda mais tempo para a análise do DAG:
Seção | Chave | Valor |
---|---|---|
core |
dag_file_processor_timeout |
Novo valor de tempo limite |
A execução do DAG não termina dentro do tempo esperado
Sintoma:
Às vezes, uma execução do DAG não termina porque as tarefas do Airflow ficam travadas e o DAG é executado durar mais do que o esperado. Em condições normais, as tarefas do Airflow não permanecem indefinidamente no estado na fila ou em execução, porque o Airflow tem tempo limite e procedimentos de limpeza que ajudam a evitar essa situação.
Corrigir:
Use o
dagrun_timeout
para os DAGs. Por exemplo,dagrun_timeout=timedelta(minutes=120)
. Assim, cada execução do DAG precisa ser concluídas dentro do tempo limite de execução do DAG e as tarefas não concluídas sejam marcadas comoFailed
ouUpstream Failed
. Para mais informações sobre os estados de tarefas do Airflow, consulte a documentação do Apache Airflow.Use o parâmetro task execution timeout para definir um tempo limite padrão para tarefas executadas com base nos operadores do Apache Airflow.
Execuções do DAG não executadas
Sintoma:
Quando uma data de programação para um DAG é definida dinamicamente, isso pode gerar várias efeitos colaterais inesperados. Exemplo:
Uma execução de DAG é sempre no futuro, e o DAG nunca é executado.
As execuções de DAG anteriores são marcadas como executadas e bem-sucedidas apesar de não terem sido executada.
Mais informações estão disponíveis na documentação do Apache Airflow.
Corrigir:
Siga as recomendações na documentação do Apache Airflow.
Defina
start_date
estático para DAGs. Como opção, você pode usarcatchup=False
para desativar a execução do DAG de datas passadas.Evite usar
datetime.now()
oudays_ago(<number of days>)
, a menos que você cientes dos efeitos colaterais dessa abordagem.
Aumento do tráfego de rede de entrada e saída do banco de dados do Airflow
A quantidade de rede de tráfego entre o cluster do GKE do ambiente e o banco de dados do Airflow depende do número de DAGs, do número de tarefas nos DAGs e da maneira como os DAGs acessam os dados no banco de dados. Os fatores a seguir podem influenciar o uso da rede:
Consultas no banco de dados do Airflow. Se os DAGs fazem muitas consultas, eles geram grandes quantidades de tráfego. Exemplos: verificação do status de tarefas antes de prosseguir com outras tarefas, consultar a tabela XCom, despejar conteúdo do banco de dados do Airflow.
Um grande número de tarefas. Quanto mais tarefas houver para programar, mais tráfego de rede será gerado. Essa consideração se aplica ao número total de tarefas nos DAGs e à frequência de programação. Quando o programador do Airflow programa as execuções de DAG, ele faz consultas no banco de dados do Airflow e gera tráfego.
A interface da Web do Airflow gera tráfego de rede porque faz consultas ao banco de dados. O uso intenso de páginas com gráficos, tarefas e diagramas pode gerar grandes volumes de tráfego de rede.
O DAG falha no servidor da Web do Airflow ou faz com que ele retorne um erro 502 gateway timeout
As falhas do servidor da Web podem ocorrer por diversos motivos. Marca de seleção
airflow-webserver faz login.
o Cloud Logging para determinar a causa do
Erro 502 gateway timeout
.
Computação pesada
Esta seção se aplica apenas ao Cloud Composer 1.
Evite executar computação pesada durante a análise do DAG.
Ao contrário dos nós de worker e de programador, os tipos de máquina que podem ser personalizados para têm mais capacidade de CPU e memória, o servidor da Web usa um tipo de máquina fixo, o que pode causar falhas na análise do DAG se a computação do tempo de análise for muito pesado.
O servidor da Web tem duas vCPUs e 2 GB de memória.
O valor padrão para core-dagbag_import_timeout
é de 30 segundos. Esse valor de tempo limite define o limite máximo de quanto tempo o Airflow gasta carregando um módulo do Python na pasta dags/
.
Permissões incorretas
Esta seção se aplica apenas ao Cloud Composer 1.
O servidor da Web não é executado na mesma conta de serviço que os workers e o programador. Assim, eles podem acessar recursos gerenciados pelo usuário a que o servidor não tem acesso.
Recomendamos que você evite acessar recursos que não sejam públicos durante a análise do DAG. Às vezes, isso é inevitável, e você precisará conceder permissões à conta de serviço do servidor da Web. O nome da conta de serviço é derivado do domínio do servidor da Web. Por exemplo, se o domínio
for example-tp.appspot.com
, a conta de serviço
example-tp@appspot.gserviceaccount.com
.
Erros do DAG
Esta seção se aplica apenas ao Cloud Composer 1.
O servidor da Web é executado no App Engine e fica separado do cluster do GKE do ambiente. O servidor da Web analisa os arquivos de definição do DAG, e um 502 gateway timeout
pode ocorrer se houver erros no DAG. O Airflow funciona normalmente sem um servidor da Web funcional
um DAG problemático não está interrompendo nenhum processo em execução no GKE.
Nesse caso, use gcloud composer environments run
para recuperar
do seu ambiente e como solução alternativa caso o servidor da Web fique
indisponível.
Em outros casos, é possível executar a análise do DAG no GKE, além de pesquisar os DAGs que causam exceções fatais do Python ou esse tempo limite (padrão de 30 segundos). Para resolver os problemas, conecte-se a um shell remoto em um contêiner de worker do Airflow e teste os erros de sintaxe. Para mais informações, consulte Como testar DAGs.
Processar um grande número de DAGs e plug-ins em pastas de DAGs e plug-ins
O conteúdo das pastas /dags
e /plugins
é sincronizado a partir do
bucket do seu ambiente para sistemas de arquivos locais de workers do Airflow e
programadores.
Quanto mais dados forem armazenados nessas pastas, mais tempo levará para realizar a e sincronização. Para lidar com essas situações:
Limite o número de arquivos nas pastas
/dags
e/plugins
. Armazene somente o mínimo de arquivos necessários.Se possível, aumente o espaço em disco disponível para os programadores do Airflow e trabalhadores
Se possível, aumente a CPU e a memória dos workers e programadores do Airflow. Assim, que a operação de sincronização seja realizada mais rapidamente.
No caso de um número muito grande de DAGs, divida-os em lotes, compacte-os em arquivos ZIP e implantá-los na pasta
/dags
. Essa abordagem acelera o processo de sincronização de DAGs. Componentes do Airflow descompactar arquivos ZIP antes de processar os DAGs.A geração de DAGs em uma programática também pode ser um método para limitar o número de arquivos DAG armazenados na pasta
/dags
. Consulte a seção sobre DAGs programáticos para evitar problemas com a programação e a execução de DAGs gerados de maneira programática.
Não programe DAGs gerados programaticamente ao mesmo tempo
Gerar objetos DAG de maneira programática com base em um arquivo DAG é um método eficiente criar muitos DAGs semelhantes com pequenas diferenças.
É importante não programar todos esses DAGs para execução imediatamente. é uma grande chance de que os workers do Airflow não tenham CPU e memória suficientes recursos para executar todas as tarefas agendadas ao mesmo tempo.
Para evitar problemas com a programação de DAGs programáticos, faça o seguinte:
- Aumente a simultaneidade de workers e escalonar verticalmente seu ambiente verticalmente para que ele possa executar mais tarefas simultaneamente.
- Gerar DAGs de modo a distribuir os cronogramas de maneira uniforme ao longo do tempo para evitar o agendamento de centenas de tarefas ao mesmo tempo, para que os workers do Airflow tempo para executar todas as tarefas agendadas.
Erro 504 ao acessar o servidor da Web do Airflow
Consulte Erro 504 ao acessar a interface do Airflow.
A exceção Lost connection to Postgres server during query
é gerada durante a execução da tarefa ou logo depois dela
Lost connection to Postgres server during query
exceção
geralmente acontecem quando as seguintes condições são atendidas:
- O DAG usa
PythonOperator
ou um operador personalizado. - O DAG faz consultas no banco de dados do Airflow.
Se várias consultas forem feitas a partir de uma função chamável, os tracebacks poderão apontar incorretamente para a linha self.refresh_from_db(lock_for_update=True)
no código do Airflow. é a primeira consulta do banco de dados após a execução da tarefa. A causa real da exceção acontece antes disso, quando uma sessão do SQLAlchemy não é fechada corretamente.
O escopo das sessões do SQLAlchemy é uma linha de execução e é criado em uma sessão de função chamável que pode ser continuada dentro do código do Airflow. Se houver entre as consultas em uma sessão, talvez a conexão já esteja fechado pelo servidor do Postgres. O tempo limite de conexão nos ambientes do Cloud Composer é definido como aproximadamente 10 minutos.
Corrigir:
- Use o decorador
airflow.utils.db.provide_session
. Esse decorador fornece uma sessão válida para o banco de dados do Airflow no parâmetrosession
e fecha corretamente a sessão no final da função. - Não use uma única função de longa duração. Em vez disso, mova todas as consultas
do banco de dados para funções separadas, de modo que haja várias funções com
o decorador
airflow.utils.db.provide_session
. Nesse caso, as sessões são fechadas automaticamente depois de recuperar os resultados da consulta.
Como controlar o tempo de execução de DAGs, tarefas e execuções paralelas do mesmo DAG
Se você quiser controlar por quanto tempo uma única execução de DAG para um DAG específico
dura, use o parâmetro dagrun_timeout
para fazer isso.
Por exemplo, se você espera que uma única execução de DAG (independentemente de
a execução terminar com sucesso ou falha) não dure mais de uma hora,
defina esse parâmetro como 3600 segundos.
Também é possível controlar o tempo que uma única tarefa do Airflow pode durar. Afazeres
Portanto, use execution_timeout
.
Se você quiser controlar quantas execuções de DAGs ativas quer ter para um
um DAG específico, use o [core]max-active-runs-per-dag
Opção de configuração do Airflow para fazer isso.
Se você quiser ter apenas uma instância de execução de DAG em um determinado momento, defina o parâmetro max-active-runs-per-dag
como 1
.
Problemas que afetam DAGs e plug-ins sincronizados com programadores, workers e servidores da Web
O Cloud Composer sincroniza o conteúdo das pastas /dags
e /plugins
para programadores e workers. Certos objetos em pastas /dags
e /plugins
pode impedir que a sincronização funcione corretamente ou pelo menos desacelerá-la.
A pasta
/dags
é sincronizada com programadores e workers. Esta pasta não está sincronizada para servidores da Web no Cloud Composer 2 ou se você ativarDAG Serialization
no Cloud Composer 1.A pasta
/plugins
está sincronizada com programadores, workers e servidores da Web.
Você pode encontrar os seguintes problemas:
Você fez upload de arquivos compactados por gzip que usam transcodificação por compactação como
/dags
e/plugins
pastas. Isso geralmente acontece quando você usa a flag--gzip-local-all
em um o comandogcloud storage cp
para fazer o upload de dados no bucket.Solução: exclua o objeto que usou a transcodificação de compactação e faça o upload novamente para o bucket.
Um dos objetos é chamado de ".". Esse objeto não é sincronizado com programadores e workers, e pode parar de sincronizar.
Solução: renomeie o objeto problemático.
Uma pasta e um arquivo Python do DAG têm os mesmos nomes, por exemplo,
a.py
. Nesse caso, o arquivo DAG não é sincronizado corretamente com os componentes do Airflow.Solução: remova a pasta que tem o mesmo nome de um arquivo Python do DAG.
Um dos objetos nas pastas
/dags
ou/plugins
contém um símbolo/
. no final do nome do objeto. Esses objetos podem confundir o processo de sincronização porque o símbolo/
significa que um objeto é uma pasta, não um arquivo.Solução: remova o símbolo
/
do nome do objeto problemático.Não armazene arquivos desnecessários nas pastas
/dags
e/plugins
.Às vezes, os DAGs e plug-ins que você implementa são acompanhados de arquivos adicionais, como arquivos que armazenam testes para esses componentes. Esses os arquivos são sincronizados com os workers e programadores e afetam o tempo necessário para copie esses arquivos para programadores, workers e servidores da Web.
Solução: não armazene arquivos adicionais e desnecessários em
/dags
e/plugins
pastas.
O erro Done [Errno 21] Is a directory: '/home/airflow/gcs/dags/...'
é gerado por programadores e workers
Esse problema acontece porque os objetos podem ter sobre o namespace no Cloud Storage e, ao mesmo tempo, programadores e workers usam sistemas de arquivos tradicionais. Por exemplo, é possível para adicionar uma pasta e um objeto com o mesmo nome a uma pasta do Google Cloud. Quando o bucket é sincronizado com os programadores e workers do ambiente, esse erro é gerado, o que pode levar a falhas nas tarefas.
Para corrigir esse problema, certifique-se de que não haja namespaces sobrepostos no
bucket do seu ambiente de execução. Por exemplo, se /dags/misc
(um arquivo) e
/dags/misc/example_file.txt
(outro arquivo) estiverem em um bucket, um erro será
gerado pelo programador.
Interrupções temporárias ao se conectar ao Airflow Metadata DB
O Cloud Composer é executado na infraestrutura de nuvem distribuída. Significa que, de tempos em tempos, podem surgir alguns problemas transitórios interromper a execução das tarefas do Airflow.
Nessas situações, as mensagens de erro a seguir podem aparecer na página de workers do Airflow registros:
"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"
ou
"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"
Esses problemas intermitentes também podem ser causados por operações de manutenção realizadas nos seus ambientes do Cloud Composer.
Normalmente, esses erros são intermitentes e, se as tarefas do Airflow forem idempotentes e você configurou novas tentativas, deve estar imune a elas. Você também pode considere definir janelas de manutenção.
Outro motivo para esses erros pode ser a falta de recursos em seu no cluster do ambiente de execução. Nesses casos, é possível escalonar verticalmente ou otimizar ambiente, conforme descrito nas Ambientes de escalonamento ou Instruções para como otimizar o ambiente.
Uma execução do DAG é marcada como bem-sucedida, mas não tem tarefas executadas
Se um DAG executar execution_date
for anterior ao start_date
do DAG,
é possível que você veja execuções do DAG que não têm execuções de tarefas, mas ainda estão marcadas como bem-sucedidas.
Causa
Essa situação pode acontecer em um dos seguintes casos:
Uma incompatibilidade é causada pela diferença de fuso horário entre os
execution_date
estart_date
. Isso pode acontecer, por exemplo, quando usandopendulum.parse(...)
para definirstart_date
.O
start_date
do DAG está definido como um valor dinâmico, por exemploairflow.utils.dates.days_ago(1)
Solução
Verifique se
execution_date
estart_date
estão usando o mesmo fuso horário.Especifique um
start_date
estático e combine-o comcatchup=False
para evitar executar DAGs com datas de início passadas.
Um DAG não fica visível na interface do Airflow ou do DAG, e o programador não o agenda.
O processador DAG analisa cada DAG antes que ele possa ser programado pelo programador e antes que um DAG fique visível a interface do Airflow ou a interface do DAG.
As seguintes opções de configuração do Airflow definem os tempos limite para a análise de DAGs:
[core]dagrun_import_timeout
define quanto tempo o processador de DAG tem para analisar um único DAG.[core]dag_file_processor_timeout
define o tempo total que o processador de DAG pode gastar analisando todos os DAGs.
Se um DAG não estiver visível na interface do Airflow ou do DAG:
- Verifique se o processador de DAG consegue processar corretamente seu DAG nos registros do processador de DAG. Em caso de problemas, você pode ver as seguintes entradas de registro nos registros do processador de DAG ou do programador:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
/usr/local/airflow/dags/example_dag.py with PID 21903 started at
2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
- Verifique os registros do programador para ver se ele funciona corretamente. No caso de problemas, talvez você veja as seguintes entradas de registro nos registros do programador:
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
Process timed out, PID: 68496
Soluções:
Corrigir todos os erros de análise do DAG. O processador DAG analisa vários DAGs casos raros de análise de erros de um DAG pode impactar negativamente a análise de de outros DAGs.
Se a análise do seu DAG demorar mais do que a quantidade de segundos definida
[core]dagrun_import_timeout
, e aumente esse tempo limite.Se a análise de todos os DAGs levar mais do que a quantidade de segundos definida em
[core]dag_file_processor_timeout
, aumente esse tempo limite.Se o DAG levar muito tempo para analisar, isso também pode significar que ele não está implementados da melhor forma. Por exemplo, se ele lê muitos variáveis de ambiente ou chamadas para serviços externos ou para o Airflow no seu banco de dados. Na medida do possível, evite realizar essas operações seções globais dos DAGs.
Aumentar os recursos de CPU e memória do Programador para que ele funcione mais rapidamente.
Aumentar o número de processos de processador de DAG para que a análise possa ser feita mais rápido. Para fazer isso, aumente o valor de
[scheduler]parsing_process
.
Sintomas de sobrecarga do banco de dados do Airflow
Para mais informações, consulte Sintomas de como o banco de dados do Airflow está sob pressão de carga (em inglês).
A seguir
- Como solucionar problemas de instalação do pacote PyPI
- Como resolver problemas de atualizações e upgrades do ambiente