Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Este tutorial mostra as etapas para depurar um DAG do Airflow com falha no Cloud Composer e diagnosticar problemas relacionados a recursos de worker, como falta de memória ou espaço de armazenamento do worker, com a ajuda de registros e monitoramento do ambiente.
Introdução
Este tutorial se concentra em problemas relacionados a recursos para demonstrar maneiras de depurar um DAG.
A falta de recursos de worker alocados causa falhas no DAG. Se uma tarefa do Airflow ficar sem memória ou armazenamento, talvez você veja uma exceção do Airflow, como:
WARNING airflow.exceptions.AirflowException: Task received SIGTERM signal
INFO - Marking task as FAILED.
ou
Task exited with return code Negsignal.SIGKILL
Nesses casos, a recomendação geral é aumentar os recursos do worker do Airflow ou reduzir o número de tarefas por worker. No entanto, como as exceções do Airflow podem ser genéricas, talvez seja difícil identificar o recurso específico que está causando o problema.
Este tutorial explica como diagnosticar o motivo de uma falha de DAG e identificar o tipo de recurso que causa problemas depurando dois exemplos de DAGs que falham devido à falta de memória e armazenamento do worker.
Objetivos
Execute exemplos de DAGs que falham pelos seguintes motivos:
- Falta de memória do worker
- Falta de armazenamento do worker
Diagnosticar os motivos da falha
Aumentar os recursos de worker alocados
Teste os DAGs com novos limites de recursos
Custos
Neste tutorial, usamos os seguintes componentes faturáveis do Google Cloud:
- Cloud Composer (consulte custos extras)
- Cloud Monitoring
Ao concluir este tutorial, exclua os recursos criados para evitar o faturamento contínuo. Para mais detalhes, consulte Limpeza.
Antes de começar
Nesta seção, descrevemos as ações necessárias antes de iniciar o tutorial.
Criar e configurar um projeto
Para este tutorial, você precisa de um projeto Google Cloud. Configure o projeto da seguinte maneira:
No console do Google Cloud , selecione ou crie um projeto:
Verifique se o faturamento foi ativado para o projeto. Saiba como verificar se o faturamento está ativado em um projeto.
Verifique se o usuário do projeto Google Cloud tem as seguintes funções para criar os recursos necessários:
- Administrador de objetos do armazenamento e do ambiente
(
roles/composer.environmentAndStorageObjectAdmin
) - Administrador do Compute (
roles/compute.admin
) - Editor do Monitoring (
roles/monitoring.editor
)
- Administrador de objetos do armazenamento e do ambiente
(
Ativar as APIs do projeto
Enable the Cloud Composer API.
Criar o ambiente do Cloud Composer
Crie um ambiente do Cloud Composer 2.
Ao criar o ambiente, você concede o papel Extensão do agente de serviço da API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext
) à conta do agente de serviço do Composer. O Cloud Composer usa essa conta para realizar operações no seu projeto Google Cloud .
Verificar limites de recursos de workers
Verifique os limites de recursos do worker do Airflow no seu ambiente:
No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Configuração do ambiente.
Acesse Recursos > Configuração de cargas de trabalho > Worker.
Verifique se os valores são 0,5 vCPUs, 1,875 GB de memória e 1 GB de armazenamento. Estes são os limites de recursos do worker do Airflow com que você vai trabalhar nas próximas etapas deste tutorial.
Exemplo: diagnosticar problemas de falta de memória
Faça upload do DAG de exemplo a seguir para o ambiente
criado nas etapas anteriores. Neste tutorial, o DAG é chamado de
create_list_with_many_strings
.
Esse DAG contém uma tarefa que executa as seguintes etapas:
- Cria uma lista vazia
s
. - Executa um ciclo para anexar a string
More
à lista. - Imprime quanta memória a lista consome e aguarda 1 segundo em cada iteração de 1 minuto.
import time
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import sys
from datetime import timedelta
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'create_list_with_many_strings',
default_args=default_args,
schedule_interval=None)
def consume():
s = []
for i in range(120):
for j in range(1000000):
s.append("More")
print(f"i={i}; size={sys.getsizeof(s) / (1000**3)}GB")
time.sleep(1)
t1 = PythonOperator(
task_id='task0',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0
)
Acionar o exemplo de DAG
Acione o exemplo de DAG, create_list_with_many_strings
:
No console Google Cloud , acesse a página Ambientes.
Na coluna Servidor da Web do Airflow, siga o link Airflow do ambiente.
Na interface da Web do Airflow, na página DAGs, na coluna Links do DAG, clique no botão Acionar DAG.
Clique em Gatilho.
Na página DAGs, clique na tarefa que você acionou e analise os registros de saída para garantir que o DAG começou a ser executado.
Enquanto a tarefa está em execução, os registros de saída mostram o tamanho da memória em GB que o DAG está usando.
Depois de alguns minutos, a tarefa vai falhar porque excede o limite de memória do worker do Airflow de 1,875 GB.
Diagnosticar o DAG com falha
Se você estava executando várias tarefas no momento da falha, considere executar apenas uma tarefa e diagnosticar a pressão de recursos durante esse período para identificar quais tarefas causam pressão de recursos e quais recursos precisam ser aumentados.
Analisar os registros de tarefas do Airflow
Observe que a tarefa do DAG create_list_with_many_strings
tem um estado Failed
.
Analise os registros da tarefa. Você vai encontrar a seguinte entrada de registro:
```none
{local_task_job.py:102} INFO - Task exited with return code
Negsignal.SIGKILL
```
`Netsignal.SIGKILL` might be an indication of your task using more memory
than the Airflow worker is allocated. The system sends
the `Negsignal.SIGKILL` signal to avoid further memory consumption.
Analisar cargas de trabalho
Revise as cargas de trabalho para verificar se a carga da tarefa não faz com que o nó em que o pod é executado exceda o limite de consumo de memória:
No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Configuração do ambiente.
Em Recursos > Cluster do GKE > Cargas de trabalho, clique em ver cargas de trabalho do cluster.
Verifique se alguns dos pods de carga de trabalho têm status semelhantes aos seguintes:
Error with exit code 137 and 1 more issue. ContainerStatusUnknown with exit code 137 and 1 more issue
Exit code 137
significa que um contêiner ou pod está tentando usar mais memória do que o permitido. O processo é encerrado para evitar o uso da memória.
Revisar o monitoramento da integridade do ambiente e do consumo de recursos
Revise o monitoramento da integridade do ambiente e do consumo de recursos:
No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Monitoring e selecione Visão geral.
No painel Visão geral do ambiente, localize o gráfico Integridade do ambiente (DAG de monitoramento do Airflow). Ela contém uma área vermelha, que corresponde ao momento em que os registros começaram a imprimir erros.
Selecione Workers e encontre o gráfico Uso total de memória de workers. Observe que a linha Uso de memória tem um pico no momento em que a tarefa estava em execução.

Mesmo que a linha de uso de memória no gráfico não atinja o limite, ao diagnosticar os motivos da falha, é necessário considerar o uso de memória por workers individuais. Cada worker usa uma parte da memória para executar outros contêineres que realizam ações necessárias para a operação do worker, como sincronizar os arquivos DAG com o bucket do ambiente. A quantidade real de memória disponível para um worker executar tarefas do Airflow é menor que o limite de memória. Se um worker atingir o limite da memória real disponível para ele, a tarefa executada poderá falhar devido à falta de memória do worker. Nesses casos, é possível observar falhas de tarefas mesmo que a linha no gráfico de uso total de memória dos workers não atinja o limite de memória.
Aumentar o limite de memória do worker
Alocar mais memória de worker para que o DAG de exemplo seja executado:
No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Configuração do ambiente.
Encontre a configuração Recursos > Cargas de trabalho e clique em Editar.
Na seção Worker, no campo Memória, especifique o novo limite de memória para os workers do Airflow. Neste tutorial, use 3 GB.
Salve as mudanças e aguarde alguns minutos para que os workers do Airflow sejam reiniciados.
Teste seu DAG com o novo limite de memória
Acione o DAG create_list_with_many_strings
novamente e aguarde até que ele
termine de ser executado.
Nos registros de saída da execução do DAG, você vai encontrar
Marking task as SUCCESS
, e o estado da tarefa vai indicar Sucesso.Revise a seção Visão geral do ambiente na guia Monitoring e verifique se não há áreas vermelhas.
Clique na seção Workers e encontre o gráfico Uso total de memória de workers. A linha Limite de memória reflete a mudança no limite de memória, e a linha Uso de memória está muito abaixo do limite de memória alocável real.
Exemplo: diagnosticar problemas de falta de armazenamento
Nesta etapa, você vai fazer upload de dois DAGs que criam arquivos grandes. O primeiro DAG cria um arquivo grande. O segundo DAG cria um arquivo grande e imita uma operação de longa duração.
O tamanho do arquivo nos dois DAGs excede o limite padrão de armazenamento do worker do Airflow de 1 GB, mas o segundo DAG tem uma tarefa de espera adicional para estender a duração artificialmente.
Você vai investigar as diferenças no comportamento dos dois DAGs nas próximas etapas.
Fazer upload de um DAG que cria um arquivo grande
Faça upload do DAG de exemplo a seguir para o ambiente
criado nas etapas anteriores. Neste tutorial, o DAG é chamado de
create_large_txt_file_print_logs
.
Esse DAG contém uma tarefa que executa as seguintes etapas:
- Grava um arquivo
localfile.txt
de 1,5 GB no armazenamento do worker do Airflow. - Imprime o tamanho do arquivo criado usando o módulo
os
do Python. - Imprime a duração da execução do DAG a cada minuto.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'create_large_txt_file_print_logs',
default_args=default_args,
schedule_interval=None)
def consume():
size = 1000**2 # bytes in 1 MB
amount = 100
def create_file():
print(f"Start creating a huge file")
with open("localfile.txt", "ab") as f:
for j in range(15):
f.write(os.urandom(amount) * size)
print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")
create_file()
print("Success!")
t1 = PythonOperator(
task_id='create_huge_file',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0)
Fazer upload de um DAG que cria um arquivo grande em uma operação de longa duração
Para imitar um DAG de longa duração e investigar o impacto da duração da tarefa
no estado final, faça o upload do segundo DAG de exemplo para seu
ambiente. Neste tutorial, o DAG é chamado de
long_running_create_large_txt_file_print_logs
.
Esse DAG contém uma tarefa que executa as seguintes etapas:
- Grava um arquivo
localfile.txt
de 1,5 GB no armazenamento do worker do Airflow. - Imprime o tamanho do arquivo criado usando o módulo
os
do Python. - Espera 1 hora e 15 minutos para imitar algum tempo necessário para operações com o arquivo, por exemplo, leitura do arquivo.
- Imprime a duração da execução do DAG a cada minuto.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import os
from datetime import timedelta
import time
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=10)
}
dag = DAG(
'long_running_create_large_txt_file_print_logs',
default_args=default_args,
schedule_interval=None)
def consume():
size = 1000**2 # bytes in 1 MB
amount = 100
def create_file():
print(f"Start creating a huge file")
with open("localfile.txt", "ab") as f:
for j in range(15):
f.write(os.urandom(amount) * size)
print("localfile.txt size:", os.stat("localfile.txt").st_size / (1000**3), "GB")
create_file()
for k in range(75):
time.sleep(60)
print(f"{k+1} minute")
print("Success!")
t1 = PythonOperator(
task_id='create_huge_file',
python_callable=consume,
dag=dag,
depends_on_past=False,
retries=0)
Acionar exemplos de DAGs
Acione o primeiro DAG, create_large_txt_file_print_logs
:
No console Google Cloud , acesse a página Ambientes.
Na coluna Servidor da Web do Airflow, siga o link Airflow do ambiente.
Na interface da Web do Airflow, na página DAGs, na coluna Links do DAG, clique no botão Acionar DAG.
Clique em Gatilho.
Na página DAGs, clique na tarefa que você acionou e analise os registros de saída para garantir que o DAG começou a ser executado.
Aguarde até que a tarefa criada com o DAG
create_large_txt_file_print_logs
seja concluída. Isso pode levar alguns minutos.Na página DAGs, clique na execução do DAG. Você vai notar que a tarefa tem um estado
Success
, mesmo que o limite de armazenamento tenha sido excedido.
Analise os registros do Airflow da tarefa:
No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Registros e selecione Todos os registros > Registros do Airflow > Workers > Ver na Análise de registros.
Filtre os registros por tipo: mostre apenas mensagens de Erro.
Nos registros, você vai encontrar mensagens semelhantes a estas:
Worker: warm shutdown (Main Process)
ou
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.
Esses registros indicam que o pod iniciou o processo de "desligamento gradual" porque o armazenamento usado excedeu o limite e foi removido em uma hora. No entanto, a execução do DAG não falhou porque foi concluída dentro do período de encerramento normal do Kubernetes, que é explicado mais adiante neste tutorial.
Para ilustrar o conceito de período de carência de encerramento, analise o resultado do segundo exemplo de DAG, long_running_create_large_txt_file_print_logs
.
Acione o segundo DAG, long_running_create_large_txt_file_print_logs
:
No console Google Cloud , acesse a página Ambientes.
Na coluna Servidor da Web do Airflow, siga o link Airflow do ambiente.
Na interface da Web do Airflow, na página DAGs, na coluna Links do DAG, clique no botão Acionar DAG.
Clique em Gatilho.
Na página DAGs, clique na tarefa que você acionou e analise os registros de saída para garantir que o DAG começou a ser executado.
Aguarde até que a execução do DAG
long_running_create_large_txt_file_print_logs
falhe. Isso vai levar cerca de uma hora.
Analise os resultados da execução do DAG:
Na página DAGs, clique na execução do DAG
long_running_create_large_txt_file_print_logs
. Você vai notar que a tarefa tem um estadoFailed
e que a duração da execução foi exatamente de 1 hora e 5 minutos, o que é menos do que o período de espera da tarefa de 1 hora e 15 minutos.Analise os registros da tarefa. Depois que o DAG cria o arquivo
localfile.txt
no contêiner do worker do Airflow, o registro mostra que o DAG começou a esperar, e a duração da execução é impressa nos registros de tarefas a cada 1 minuto. Neste exemplo, o DAG imprime o registrolocalfile.txt size:
e o tamanho do arquivolocalfile.txt
será de 1,5 GB.
Quando o arquivo gravado no contêiner do worker do Airflow excede o limite de armazenamento, a execução do DAG falha. No entanto, a tarefa não falha imediatamente e continua sendo executada até atingir 1 hora e 5 minutos. Isso acontece porque o Kubernetes não encerra a tarefa imediatamente e continua em execução para permitir uma hora de tempo de recuperação, conhecido como "período de encerramento normal". Quando um nó fica sem recursos, o Kubernetes não encerra o pod imediatamente para processar o encerramento de maneira normal, de modo que haja um impacto mínimo no usuário final.
O período de carência de encerramento ajuda os usuários a recuperar arquivos após falhas de tarefas, mas pode causar confusão ao diagnosticar DAGs. Quando o limite de armazenamento do worker do Airflow é excedido, o estado da tarefa final depende da duração da execução do DAG:
Se a execução do DAG exceder o limite de armazenamento do worker, mas for concluída em menos de uma hora, a tarefa será concluída com um status
Success
porque foi concluída dentro do período de carência de encerramento. No entanto, o Kubernetes encerra o pod e o arquivo gravado é excluído imediatamente do contêiner.Se o DAG exceder o limite de armazenamento do worker e for executado por mais de uma hora, ele continuará sendo executado por uma hora e poderá exceder o limite de armazenamento em milhares de porcentagens antes que o Kubernetes elimine o pod e o Airflow marque a tarefa como
Failed
.
Diagnosticar o DAG com falha
Se você estava executando várias tarefas no momento da falha, considere executar apenas uma tarefa e diagnosticar a pressão de recursos durante esse período para identificar quais tarefas causam pressão de recursos e quais recursos precisam ser aumentados.
Analise os registros de tarefas do segundo DAG,
long_running_create_large_txt_file_print_logs
:
No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Registros e selecione Todos os registros > Registros do Airflow > Workers > Ver na Análise de registros.
Filtre os registros por tipo: mostre apenas mensagens de Erro.
Nos registros, você vai encontrar mensagens semelhantes a estas:
Container storage usage of worker reached 155.7% of the limit.
This likely means that the total size of local files generated by your DAGs is
close to the storage limit of worker.
You may need to decrease the storage usage or increase the worker storage limit
in your Cloud Composer environment configuration.
ou
Pod storage usage of worker reached 140.2% of the limit.
A worker pod was evicted at 2023-12-01T12:30:05Z with message: Pod ephemeral
local storage usage exceeds the total limit of containers 1023Mi.
This eviction likely means that the total size of dags and plugins folders plus
local files generated by your DAGs exceeds the storage limit of worker.
Please decrease the storage usage or increase the worker storage limit in your
Cloud Composer environment configuration.
Essas mensagens indicam que, à medida que a tarefa progredia, os registros do Airflow começaram a imprimir erros quando o tamanho dos arquivos gerados pelo DAG excedeu o limite de armazenamento do worker e o período de espera de encerramento começou. Durante o período de carência de encerramento, o consumo de armazenamento não voltou ao limite, o que levou à remoção do pod após o término do período de carência.
Revise o monitoramento da integridade do ambiente e do consumo de recursos:
No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Monitoring e selecione Visão geral.
No painel Visão geral do ambiente, localize o gráfico Integridade do ambiente (DAG de monitoramento do Airflow). Ela contém uma área vermelha, que corresponde ao momento em que os registros começaram a imprimir erros.
Selecione Workers e encontre o gráfico Uso total do disco dos workers. Observe que a linha Uso de disco tem um pico e excede a linha Limite de disco no momento em que sua tarefa estava em execução.

Aumentar o limite de armazenamento do worker
Alocar mais armazenamento de worker do Airflow para que o DAG de exemplo seja concluído:
No console Google Cloud , acesse a página Ambientes.
Na lista de ambientes, clique no nome do seu ambiente. A página Detalhes do ambiente é aberta.
Acesse a guia Configuração do ambiente.
Encontre a configuração Recursos > Cargas de trabalho e clique em Editar.
Na seção Worker, no campo Armazenamento, especifique o novo limite de armazenamento para workers do Airflow. Neste tutorial, defina como 2 GB.
Salve as mudanças e aguarde alguns minutos para que os workers do Airflow sejam reiniciados.
Teste seu DAG com o novo limite de armazenamento
Acione o DAG long_running_create_large_txt_file_print_logs
novamente e aguarde 1 hora e 15 minutos até que ele termine de ser executado.
Nos registros de saída da execução do DAG, você verá
Marking task as SUCCESS
, e o estado da tarefa vai indicar Sucesso, com uma duração de 1 hora e 15 minutos, que é igual ao tempo de espera definido no código do DAG.Revise a seção Visão geral do ambiente na guia Monitoring e verifique se não há áreas vermelhas.
Clique na seção Workers e encontre o gráfico Uso total do disco dos workers. Você vai notar que a linha Limite de disco reflete a mudança no limite de armazenamento, e a linha Uso de disco está dentro do intervalo permitido.
Resumo
Neste tutorial, você diagnosticou o motivo de uma falha de DAG e identificou o tipo de recurso que causa pressão ao depurar dois exemplos de DAGs que falham devido à falta de memória e armazenamento do worker. Em seguida, você executou os DAGs com sucesso depois de alocar mais memória e armazenamento para os workers. No entanto, é recomendável otimizar seus DAGs (fluxos de trabalho) para reduzir o consumo de recursos do worker, porque não é possível aumentar os recursos além de um determinado limite.
Limpar
Para evitar cobranças na sua conta do Google Cloud pelos recursos usados neste tutorial, exclua o projeto que os contém ou mantenha o projeto e exclua os recursos individuais.
Excluir o projeto
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Excluir recursos individuais
Se você planeja ver vários tutoriais e guias de início rápido, a reutilização de projetos pode evitar que você exceda os limites da cota do projeto.
Exclua o ambiente do Cloud Composer. Você também exclui o bucket do ambiente durante esse procedimento.