Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Nesta página, você encontra informações sobre a solução de problemas comuns do fluxo de trabalho e suas etapas.
Muitos problemas de execução de DAG são causados por uma performance não ideal do ambiente. Para otimizar seu ambiente, siga o guia Otimizar o desempenho e os custos do ambiente.
Alguns problemas de execução de DAG podem ser causados pelo programador do Airflow não funcionar corretamente ou de maneira ideal. Siga as instruções de solução de problemas do Scheduler para resolver essas questões.
Como resolver problemas do fluxo de trabalho
Para começar a solução de problemas, siga estes passos:
Verifique os registros do Airflow.
É possível aumentar o nível de geração de registros do Airflow substituindo a seguinte opção de configuração do Airflow.
Seção Chave Valor logging
logging_level
O valor padrão é INFO
. Defina comoDEBUG
para aumentar o nível de detalhes nas mensagens de registro.Confira o Painel de monitoramento.
Revise o Cloud Monitoring.
No console Google Cloud , verifique se há erros nas páginas dos componentes do seu ambiente.
Na interface da Web do Airflow, verifique na Visualização do gráfico (em inglês) do DAG se há instâncias de tarefa com falha.
Seção Chave Valor webserver
dag_orientation
LR
,TB
,RL
ouBT
Como depurar falhas do operador
Para depurar uma falha do operador, siga estes passos:
- Verifique se há erros específicos da tarefa.
- Verifique os registros do Airflow.
- Revise o Cloud Monitoring.
- Verifique os registros específicos do operador.
- Corrija os erros.
- Faça upload do DAG para a pasta
/dags
. - Na interface da Web do Airflow, limpe os estados anteriores do DAG.
- Execute o DAG ou retome esse processo.
Solução de problemas na execução de tarefas
O Airflow é um sistema distribuído com muitas entidades, como programador, executor, workers que se comunicam por uma fila de tarefas e o banco de dados do Airflow, e enviam sinais (como SIGTERM). O diagrama a seguir mostra uma visão geral das interconexões entre os componentes do Airflow.
Em um sistema distribuído como o Airflow, pode haver problemas de conectividade de rede ou a infraestrutura subjacente pode apresentar problemas intermitentes. Isso pode levar a situações em que as tarefas falham e são reagendadas para execução ou não são concluídas com êxito (por exemplo, tarefas zumbi ou que ficaram presas na execução). O Airflow tem mecanismos para lidar com essas situações e retomar automaticamente o funcionamento normal. As seções a seguir explicam problemas comuns que ocorrem durante a execução de tarefas pelo Airflow.
As tarefas falham sem emitir registros
A tarefa falha sem emitir registros devido a erros de análise do DAG
Às vezes, pode haver erros sutis no DAG que levam a uma situação em que
o programador do Airflow pode programar tarefas para execução, o processador de DAG pode
analisar o arquivo DAG, mas o worker do Airflow não consegue executar tarefas
do DAG porque há erros de programação no arquivo DAG. Isso pode
levar a uma situação em que uma tarefa do Airflow é marcada como Failed
e não há
registro da execução dela.
Soluções:
Verifique nos registros do worker do Airflow se não há erros gerados pelo worker do Airflow relacionados a um DAG ausente ou a erros de análise do DAG.
Aumente os parâmetros relacionados à análise do DAG:
Aumente [dagbag-import-timeout][ext-airflow-dagrun-import-timeout] para pelo menos 120 segundos (ou mais, se necessário).
Aumente dag-file-processor-timeout para pelo menos 180 segundos (ou mais, se necessário). Esse valor precisa ser maior que
dagbag-import-timeout
.
Consulte também Solução de problemas do processador de DAGs.
As tarefas são interrompidas abruptamente
Durante a execução da tarefa, os workers do Airflow podem ser encerrados abruptamente devido a problemas não relacionados especificamente à tarefa em si. Consulte Causas principais comuns para conferir uma lista desses cenários e possíveis soluções. As seções a seguir abordam alguns sintomas adicionais que podem ter origem nessas causas principais:
Tarefas zumbi
O Airflow detecta dois tipos de incompatibilidade entre uma tarefa e um processo que a executa:
Tarefas zumbi são aquelas que deveriam estar em execução, mas não estão. Isso pode acontecer se o processo da tarefa foi encerrado ou não está respondendo, se o worker do Airflow não informou um status de tarefa a tempo porque está sobrecarregado ou se a VM em que a tarefa é executada foi desligada. O Airflow encontra essas tarefas periodicamente e as falha ou tenta novamente, dependendo das configurações.
Descobrir tarefas zumbi
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-scheduler") textPayload:"Detected zombie job"
As tarefas zumbis são aquelas que não deveriam estar em execução. O Airflow encontra essas tarefas periodicamente e as encerra.
Consulte Causas principais comuns para mais informações sobre como resolver problemas de tarefas zumbi.
Indicadores SIGTERM
Os sinais SIGTERM são usados pelo Linux, pelo Kubernetes, pelo programador do Airflow e pelo Celery para encerrar processos responsáveis por executar workers ou tarefas do Airflow.
Há vários motivos para o envio de sinais SIGTERM em um ambiente:
Uma tarefa se tornou uma tarefa zumbi e precisa ser interrompida.
O programador descobriu uma tarefa duplicada e enviou sinais de instância de encerramento e SIGTERM para interrompê-la.
No escalonamento automático horizontal de pods, o plano de controle do GKE envia sinais SIGTERM para remover pods que não são mais necessários.
O programador pode enviar sinais SIGTERM para o processo DagFileProcessorManager. Esses sinais SIGTERM são usados pelo Scheduler para gerenciar o ciclo de vida do processo DagFileProcessorManager e podem ser ignorados com segurança.
Exemplo:
Launched DagFileProcessorManager with pid: 353002 Sending Signals.SIGTERM to group 353002. PIDs of all processes in the group: [] Sending the signal Signals.SIGTERM to group 353002 Sending the signal Signals.SIGTERM to process 353002 as process group is missing.
Condição de disputa entre o callback de pulsação e os callbacks de saída no local_task_job, que monitora a execução da tarefa. Se o heartbeat detectar que uma tarefa foi marcada como concluída, não será possível distinguir se a tarefa em si foi concluída ou se o Airflow foi instruído a considerar a tarefa como concluída. No entanto, ele vai encerrar um executor de tarefas sem esperar que ele seja encerrado.
Esses sinais SIGTERM podem ser ignorados com segurança. A tarefa já está no estado de sucesso, e a execução do DAG como um todo não será afetada.
A entrada de registro
Received SIGTERM.
é a única diferença entre a saída regular e a conclusão da tarefa no estado de êxito.Figura 2. Condição de disputa entre os callbacks de pulsação e saída (clique para ampliar) Um componente do Airflow usa mais recursos (CPU, memória) do que o permitido pelo nó do cluster.
O serviço do GKE realiza operações de manutenção e envia sinais SIGTERM para os pods que são executados em um nó prestes a ser atualizado.
Quando uma instância de tarefa é encerrada com SIGTERM, as seguintes entradas de registro aparecem nos registros de um worker do Airflow que executou a tarefa:
{local_task_job.py:211} WARNING - State of this instance has been externally set to queued. Terminating instance. {taskinstance.py:1411} ERROR - Received SIGTERM. Terminating subprocesses. {taskinstance.py:1703} ERROR - Task failed with exception
Possíveis soluções:
Esse problema ocorre quando uma VM que executa a tarefa fica sem memória. Isso não está relacionado às configurações do Airflow, mas à quantidade de memória disponível para a VM.
No Cloud Composer 2, é possível atribuir mais recursos de CPU e memória aos workers do Airflow.
É possível diminuir o valor da opção de configuração de simultaneidade
[celery]worker_concurrency
do Airflow. Essa opção determina quantas tarefas são executadas simultaneamente por um determinado worker do Airflow.
Para mais informações sobre como otimizar seu ambiente, consulte Otimizar o desempenho e os custos do ambiente.
A tarefa do Airflow foi interrompida por Negsignal.SIGKILL
Às vezes, sua tarefa pode estar usando mais memória do que o worker do Airflow aloca.
Nesse caso, ela pode ser interrompida por Negsignal.SIGKILL
. O sistema envia esse sinal para evitar mais consumo de memória, o que pode afetar a execução de outras tarefas do Airflow. No registro do worker do Airflow, você pode encontrar
a seguinte entrada de registro:
{local_task_job.py:102} INFO - Task exited with return code Negsignal.SIGKILL
Negsignal.SIGKILL
também pode aparecer como código -9
.
Possíveis soluções:
Diminuir o
worker_concurrency
de workers do Airflow.Aumente a quantidade de memória disponível para os workers do Airflow.
Gerencie tarefas que consomem muitos recursos no Cloud Composer usando o KubernetesPodOperator ou o GKEStartPodOperator para isolamento de tarefas e alocação de recursos personalizada.
Otimize suas tarefas para usar menos memória.
A tarefa falha devido à pressão de recursos
Sintoma: durante a execução de uma tarefa, o subprocesso do worker do Airflow responsável pela execução da tarefa do Airflow é interrompido abruptamente. O erro visível no registro do worker do Airflow pode ser semelhante ao abaixo:
...
File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task R = retval = fun(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__ return self.run(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 88, in execute_command _execute_in_fork(command_to_exec) File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 99, in _execute_in_fork
raise AirflowException('Celery command failed on host: ' + get_hostname())airflow.exceptions.AirflowException: Celery command failed on host: airflow-worker-9qg9x
...
Solução:
No Cloud Composer 2, aumente os limites de memória para os workers do Airflow.
Se o ambiente também gerar tarefas zumbi, consulte Solução de problemas de tarefas zumbi.
Para um tutorial sobre como depurar problemas de falta de memória, consulte Depurar problemas de falta de memória e de armazenamento no DAG.
A tarefa falha devido à remoção de pods
Os pods do Google Kubernetes Engine estão sujeitos ao ciclo de vida de pods do Kubernetes e à remoção deles. Picos de tarefas são a causa mais comum de remoção de pods no Cloud Composer.
A remoção de pods pode ocorrer quando um determinado pod usa em excesso os recursos de um nó, em relação às expectativas de consumo de recursos configuradas para o nó. Por exemplo, a remoção pode acontecer quando várias tarefas com muita memória são executadas em um pod, e a carga combinada faz com que o nó em que o pod é executado exceda o limite de consumo da memória.
Se um pod de worker do Airflow for removido, todas as instâncias de tarefas em execução nele serão interrompidas e, posteriormente, marcadas como com falha pelo Airflow.
Os registros são armazenados em buffer. Se um pod de worker for removido antes da limpeza do buffer, os registros não serão emitidos. Quando uma tarefa falha sem emitir registros, isso indica que os workers do Airflow serão reiniciados devido à falta de memória (OOM, na sigla em inglês). Alguns registros podem estar presentes no Cloud Logging mesmo que os registros do Airflow não tenham sido emitidos.
Para ver os registros:
No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Registros.
Confira os registros de workers individuais do Airflow em Todos os registros > Registros do Airflow > Workers.
Sintoma:
No console Google Cloud , acesse a página Cargas de trabalho.
Se houver pods
airflow-worker
que mostremEvicted
, clique em cada pod removido e procure a mensagemThe node was low on resource: memory
na parte de cima da janela.
Solução:
Aumente os limites de memória para workers do Airflow.
Verifique os registros dos pods
airflow-worker
para ver possíveis causas de remoção. Para mais informações sobre como buscar registros de pods individuais, consulte Solução de problemas com cargas de trabalho implantadas.Garanta que as tarefas no DAG sejam idempotentes e que possam ser repetidas.
Evite baixar arquivos desnecessários para o sistema de arquivos local dos workers do Airflow.
Os workers do Airflow têm capacidade limitada no sistema de arquivos local. Um worker do Airflow pode ter de 1 GB a 10 GB de armazenamento. Quando o espaço de armazenamento acaba, o pod do worker do Airflow é removido pelo plano de controle do GKE. Isso causa falha em todas as tarefas que o worker removido estava executando.
Exemplos de operações problemáticas:
- Fazer o download de arquivos ou objetos e armazená-los localmente em um worker do Airflow. Em vez disso, armazene esses objetos diretamente em um serviço adequado, como um bucket do Cloud Storage.
- Acessar objetos grandes na pasta
/data
de um worker do Airflow. O worker do Airflow baixa o objeto para o sistema de arquivos local. Em vez disso, implemente seus DAGs para que arquivos grandes sejam processados fora do pod de worker do Airflow.
Causas comuns
O worker do Airflow ficou sem memória
Cada worker do Airflow pode executar até [celery]worker_concurrency
instâncias de tarefa simultaneamente. Se o consumo cumulativo de memória dessas instâncias de tarefa exceder o limite de memória de um worker do Airflow, um processo aleatório será encerrado para liberar recursos.
Descobrir eventos de falta de memória do worker do Airflow
resource.type="k8s_node" resource.labels.cluster_name="GKE_CLUSTER_NAME" log_id("events") jsonPayload.message:"Killed process" jsonPayload.message:("airflow task" OR "celeryd")
Às vezes, a falta de memória em um worker do Airflow pode levar ao envio de pacotes malformados durante uma sessão do SQL Alchemy para o banco de dados, um servidor DNS ou qualquer outro serviço chamado por um DAG. Nesse caso, a outra extremidade da conexão pode rejeitar ou descartar conexões do worker do Airflow. Exemplo:
"UNKNOWN:Error received from peer
{created_time:"2024-11-31T10:09:52.217738071+00:00", grpc_status:14,
grpc_message:"failed to connect to all addresses; last error: UNKNOWN:
ipv4:<ip address>:443: handshaker shutdown"}"
Soluções:
Otimize as tarefas para usar menos memória, por exemplo, evitando código de nível superior.
Diminuir
[celery]worker_concurrency
.Aumente a memória para os workers do Airflow para acomodar as mudanças de
[celery]worker_concurrency
.Em versões do Cloud Composer 2 anteriores à 2.6.0, atualize
[celery]worker_concurrency
usando a fórmula atual se esse valor for menor.
O worker do Airflow foi removido
As remoções de pods são parte normal da execução de cargas de trabalho no Kubernetes. O GKE remove pods se eles ficarem sem armazenamento ou para liberar recursos para cargas de trabalho com uma prioridade mais alta.
Descobrir remoções de workers do Airflow
resource.type="k8s_pod" resource.labels.cluster_name="GKE_CLUSTER_NAME" resource.labels.pod_name:"airflow-worker" log_id("events") jsonPayload.reason="Evicted"
Soluções:
- Se uma remoção for causada pela falta de armazenamento, reduza o uso ou remova os arquivos temporários assim que eles não forem mais necessários.
Como alternativa, você pode aumentar o armazenamento disponível ou executar cargas de trabalho em um pod dedicado com
KubernetesPodOperator
.
O worker do Airflow foi encerrado
Os workers do Airflow podem ser removidos externamente. Se as tarefas em execução não forem concluídas durante um período de encerramento normal, elas serão interrompidas e poderão ser detectadas como zumbis.
Descobrir encerramentos de pods de worker do Airflow
resource.type="k8s_cluster" resource.labels.cluster_name="GKE_CLUSTER_NAME" protoPayload.methodName:"pods.delete" protoPayload.response.metadata.name:"airflow-worker"
Possíveis cenários e soluções:
Os workers do Airflow são reiniciados durante modificações no ambiente, como upgrades ou instalação de pacotes:
Descobrir modificações no ambiente do Composer
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("cloudaudit.googleapis.com%2Factivity")
Você pode realizar essas operações quando nenhuma tarefa crítica estiver em execução ou ativar novas tentativas de tarefa.
Vários componentes podem ficar temporariamente indisponíveis durante as operações de manutenção.
Descobrir as operações de manutenção do GKE
resource.type="gke_nodepool" resource.labels.cluster_name="GKE_CLUSTER_NAME" protoPayload.metadata.operationType="UPGRADE_NODES"
É possível especificar janelas de manutenção para minimizar
se sobrepõe à execução de tarefas críticas.
Em versões do Cloud Composer 2 anteriores à 2.4.5, um worker do Airflow em encerramento pode ignorar o sinal SIGTERM e continuar executando tarefas:
Descobrir a redução de escala com o escalonamento automático do Composer
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-worker-set") textPayload:"Workers deleted"
Faça upgrade para uma versão mais recente do Cloud Composer em que esse problema foi corrigido.
O worker do Airflow estava sob carga pesada
A quantidade de recursos de CPU e memória disponíveis para um worker do Airflow é limitada pela configuração do ambiente. Se a utilização de recursos se aproximar dos limites, isso poderá causar uma disputa de recursos e atrasos desnecessários durante a execução da tarefa. Em situações extremas, quando os recursos ficam em falta por longos períodos, isso pode causar tarefas zumbi.
Soluções:
- Monitore o uso de CPU e memória dos workers e ajuste para evitar ultrapassar 80%.
Consultas do Cloud Logging para descobrir motivos de reinicializações ou remoções de pods
Os ambientes do Cloud Composer usam clusters do GKE como camada de infraestrutura de computação. Nesta seção, você encontra consultas úteis que podem ajudar a encontrar motivos para reinicializações ou remoções de workers ou programadores do Airflow.
As consultas apresentadas a seguir podem ser ajustadas da seguinte maneira:
É possível especificar a linha do tempo necessária no Cloud Logging. Por exemplo, as últimas 6 horas, 3 dias ou um período personalizado.
Especifique o nome do cluster do ambiente em CLUSTER_NAME.
É possível limitar a pesquisa a um pod específico adicionando o POD_NAME.
Descobrir contêineres reiniciados
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME"
Consulta alternativa para limitar os resultados a um pod específico:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"will be restarted" resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Descobrir contêineres encerrados como resultado de um evento de falta de memória
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME"
Consulta alternativa para limitar os resultados a um pod específico:
resource.type="k8s_node" log_id("events") (jsonPayload.reason:("OOMKilling" OR "SystemOOM") OR jsonPayload.message:("OOM encountered" OR "out of memory")) severity=WARNING resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
Descobrir contêineres que pararam de ser executados
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME"
Consulta alternativa para limitar os resultados a um pod específico:
resource.type="k8s_node" log_id("kubelet") jsonPayload.MESSAGE:"ContainerDied" severity=DEFAULT resource.labels.cluster_name="CLUSTER_NAME" "POD_NAME"
O banco de dados do Airflow estava sob carga pesada
Um banco de dados é usado por vários componentes do Airflow para se comunicar entre si e, em particular, para armazenar heartbeats de instâncias de tarefas. A falta de recursos no banco de dados aumenta o tempo de consulta e pode afetar a execução de tarefas.
Às vezes, os seguintes erros estão presentes nos registros de um worker do Airflow:
(psycopg2.OperationalError) connection to server at <IP address>,
port 3306 failed: server closed the connection unexpectedly
This probably means the server terminated abnormally before or while
processing the request.
Soluções:
- Evite usar muitas instruções
Variables.get
no código DAG de nível superior. Em vez disso, use modelos do Jinja para recuperar valores de variáveis do Airflow. - Otimize (reduza) o uso de instruções xcom_push e xcom_pull em modelos do Jinja no código DAG de nível superior.
- Considere fazer upgrade para um tamanho de ambiente maior (médio ou grande).
- Diminuir o número de programadores
- Diminua a frequência de análise de DAGs.
- Monitore o uso de CPU e memória do banco de dados.
O banco de dados do Airflow ficou temporariamente indisponível
Um worker do Airflow pode levar tempo para detectar e processar erros intermitentes, como problemas temporários de conectividade. Ele pode exceder o limite padrão de detecção de zumbis.
Descobrir tempos limite de pulsação do Airflow
resource.type="cloud_composer_environment" resource.labels.environment_name="ENVIRONMENT_NAME" log_id("airflow-worker") textPayload:"Heartbeat time limit exceeded"
Soluções:
Aumente o tempo limite para tarefas zumbi e substitua o valor da opção de configuração
[scheduler]scheduler_zombie_task_threshold
do Airflow:Seção Chave Valor Observações scheduler
scheduler_zombie_task_threshold
Novo tempo limite (em segundos) O valor padrão é 300
.
As tarefas falham porque ocorreu um erro durante a execução
Encerrando instância
O Airflow usa o mecanismo de instância de encerramento para encerrar tarefas do Airflow. Esse mecanismo é usado nas seguintes situações:
- Quando um programador encerra uma tarefa que não foi concluída a tempo.
- Quando uma tarefa atinge o tempo limite ou é executada por muito tempo.
Quando o Airflow encerra instâncias de tarefas, as seguintes entradas de registro aparecem nos registros de um worker do Airflow que executou a tarefa:
INFO - Subtask ... WARNING - State of this instance has been externally set
to success. Terminating instance.
INFO - Subtask ... INFO - Sending Signals.SIGTERM to GPID <X>
INFO - Subtask ... ERROR - Received SIGTERM. Terminating subprocesses.
Possíveis soluções:
Verifique se há erros no código da tarefa que possam fazer com que ela seja executada por muito tempo.
Aumente a CPU e a memória dos workers do Airflow para que as tarefas sejam executadas mais rapidamente.
Aumente o valor da opção de configuração do Airflow
[celery_broker_transport_options]visibility_timeout
.Como resultado, o programador espera mais tempo para que uma tarefa seja concluída antes de considerá-la uma tarefa zumbi. Essa opção é especialmente útil para tarefas demoradas que duram muitas horas. Se o valor for muito baixo (por exemplo, 3 horas), o programador vai considerar tarefas que são executadas por 5 ou 6 horas como "penduradas" (tarefas zumbi).
Aumente o valor da opção de configuração
[core]killed_task_cleanup_time
do Airflow.Um valor maior dá mais tempo para os workers do Airflow concluírem as tarefas de maneira adequada. Se o valor for muito baixo, as tarefas do Airflow poderão ser interrompidas abruptamente, sem tempo suficiente para concluir o trabalho normalmente.
A execução do DAG não termina dentro do tempo esperado
Sintoma:
Às vezes, uma execução de DAG não termina porque as tarefas do Airflow ficam travadas e a execução dura mais do que o esperado. Em condições normais, as tarefas do Airflow não ficam indefinidamente no estado enfileirado ou em execução, porque o Airflow tem procedimentos de tempo limite e limpeza que ajudam a evitar essa situação.
Corrigir:
Use o parâmetro
dagrun_timeout
para os DAGs. Por exemplo:dagrun_timeout=timedelta(minutes=120)
. Como resultado, cada execução de DAG precisa ser concluída dentro do tempo limite de execução do DAG. Para mais informações sobre estados de tarefas do Airflow, consulte a documentação do Apache Airflow.Use o parâmetro tempo limite de execução da tarefa para definir um tempo limite padrão para tarefas executadas com base em operadores do Apache Airflow.
A conexão perdida com o servidor Postgres durante a exceção de consulta é gerada durante a execução da tarefa ou logo depois dela
As exceções Lost connection to Postgres server during query
geralmente acontecem quando as seguintes condições são atendidas:
- O DAG usa
PythonOperator
ou um operador personalizado. - O DAG faz consultas no banco de dados do Airflow.
Se várias consultas forem feitas a partir de uma função chamável, os tracebacks poderão apontar incorretamente para a linha self.refresh_from_db(lock_for_update=True)
no código do Airflow. é a primeira consulta do banco de dados após a execução da tarefa. A causa real da exceção acontece antes disso, quando uma sessão do SQLAlchemy não é fechada corretamente.
O escopo das sessões do SQLAlchemy é uma linha de execução e é criado em uma sessão de função chamável que pode ser continuada dentro do código do Airflow. Se houver atrasos significativos entre as consultas em uma sessão, a conexão já poderá ser fechada pelo servidor Postgres. O tempo limite de conexão nos ambientes do Cloud Composer é definido como aproximadamente 10 minutos.
Solução:
- Use o decorador
airflow.utils.db.provide_session
. Esse decorador fornece uma sessão válida para o banco de dados do Airflow no parâmetrosession
e fecha corretamente a sessão no final da função. - Não use uma única função de longa duração. Em vez disso, mova todas as consultas
do banco de dados para funções separadas, de modo que haja várias funções com
o decorador
airflow.utils.db.provide_session
. Nesse caso, as sessões são fechadas automaticamente depois de recuperar os resultados da consulta.
Interrupções temporárias ao se conectar ao banco de dados de metadados do Airflow
O Cloud Composer é executado em uma infraestrutura distribuída. Isso significa que, de vez em quando, alguns problemas transitórios podem aparecer e interromper a execução das tarefas do Airflow.
Nessas situações, as seguintes mensagens de erro podem aparecer nos registros dos workers do Airflow:
"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (111)"
ou
"Can't connect to Postgres server on 'airflow-sqlproxy-service.default.svc.cluster.local' (104)"
Esses problemas intermitentes também podem ser causados por operações de manutenção realizadas nos seus ambientes do Cloud Composer.
Normalmente, esses erros são intermitentes e, se as tarefas do Airflow forem idempotentes e você tiver novas tentativas configuradas, eles não vão afetar você. Você também pode definir janelas de manutenção.
Outro motivo para esses erros pode ser a falta de recursos no cluster do seu ambiente. Nesses casos, é possível escalonar verticalmente ou otimizar o ambiente conforme descrito nas instruções Como escalonar ambientes ou Como otimizar seu ambiente.
Uma execução de DAG é marcada como bem-sucedida, mas não tem tarefas executadas
Se uma execução de DAG execution_date
for anterior ao start_date
do DAG, talvez você veja execuções de DAG sem execuções de tarefas, mas ainda marcadas como bem-sucedidas.

Causa
Isso pode acontecer em um dos seguintes casos:
Uma incompatibilidade é causada pela diferença de fuso horário entre o
execution_date
e ostart_date
do DAG. Isso pode acontecer, por exemplo, ao usarpendulum.parse(...)
para definirstart_date
.O
start_date
do DAG é definido como um valor dinâmico, por exemplo,airflow.utils.dates.days_ago(1)
Solução
Verifique se
execution_date
estart_date
estão usando o mesmo fuso horário.Especifique um
start_date
estático e combine comcatchup=False
para evitar executar DAGs com datas de início passadas.
Práticas recomendadas
Impacto das operações de atualização ou upgrade nas execuções de tarefas do Airflow
As operações de atualização ou upgrade interrompem as tarefas do Airflow em execução, a menos que uma tarefa seja executada no modo adiável.
Recomendamos realizar essas operações quando você espera um impacto mínimo nas execuções de tarefas do Airflow e configurar mecanismos de repetição adequados nos seus DAGs e tarefas.
Não programe DAGs gerados de maneira programática ao mesmo tempo
Gerar objetos DAG de maneira programática de um arquivo DAG é um método eficiente para criar muitos DAGs semelhantes que têm apenas pequenas diferenças.
É importante não programar todos esses DAGs para execução imediata. Há uma grande chance de que os workers do Airflow não tenham recursos suficientes de CPU e memória para executar todas as tarefas programadas ao mesmo tempo.
Para evitar problemas com o agendamento de DAGs programáticos:
- Aumente a simultaneidade do worker e escalonar verticalmente do ambiente para que ele possa executar mais tarefas simultaneamente.
- Gere DAGs de forma a distribuir os programações de maneira uniforme ao longo do tempo para evitar programar centenas de tarefas ao mesmo tempo. Assim, os workers do Airflow têm tempo para executar todas as tarefas programadas.
Controlar o tempo de execução de DAGs, tarefas e execuções paralelas do mesmo DAG
Se você quiser controlar a duração de uma única execução de um DAG específico, use o parâmetro dagrun_timeout
do DAG. Por exemplo, se você espera que uma única execução de DAG (independente de a execução terminar com sucesso ou falha) não dure mais de uma hora, defina esse parâmetro como 3.600 segundos.
Também é possível controlar a duração de uma única tarefa do Airflow. Para isso, use execution_timeout
.
Se você quiser controlar quantas execuções ativas de DAGs quer ter para um
DAG específico, use a opção de configuração do Airflow [core]max-active-runs-per-dag
.
Se você quiser ter apenas uma instância de uma execução de DAG em um determinado momento, defina o parâmetro max-active-runs-per-dag
como 1
.
Evitar o aumento do tráfego de rede de entrada e saída do banco de dados do Airflow
A quantidade de rede de tráfego entre o cluster do GKE do ambiente e o banco de dados do Airflow depende do número de DAGs, do número de tarefas nos DAGs e da maneira como os DAGs acessam os dados no banco de dados. Os fatores a seguir podem influenciar o uso da rede:
Consultas no banco de dados do Airflow. Se os DAGs fazem muitas consultas, eles geram grandes quantidades de tráfego. Exemplos: verificação do status de tarefas antes de prosseguir com outras tarefas, consultar a tabela XCom, despejar conteúdo do banco de dados do Airflow.
Um grande número de tarefas. Quanto mais tarefas houver para programar, mais tráfego de rede será gerado. Essa consideração se aplica ao número total de tarefas nos DAGs e à frequência de programação. Quando o programador do Airflow programa as execuções de DAG, ele faz consultas no banco de dados do Airflow e gera tráfego.
A interface da Web do Airflow gera tráfego de rede porque faz consultas ao banco de dados. O uso intenso de páginas com gráficos, tarefas e diagramas pode gerar grandes volumes de tráfego de rede.
A seguir
- Resolver problemas de instalação do pacote PyPI
- Como resolver problemas de atualizações e upgrades do ambiente