Soluciona problemas del procesador de DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página, solo se abordan los problemas relacionados con el procesamiento de archivos de DAG. Si tienes problemas para programar tareas, consulta Cómo solucionar problemas del programador de Airflow.

Flujo de trabajo para la solución de problemas

Inspecciona los registros del procesador de DAG

Si tienes DAG complejos, es posible que el procesador de DAG no analice todos. Esto puede generar muchos problemas que tengan los siguientes síntomas.

Síntomas:

  • Si el procesador de DAG encuentra problemas cuando analiza tus DAG, es posible que se genere una combinación de los problemas que se enumeran aquí. Si los DAG se generan de forma dinámica, estos problemas pueden ser más graves en comparación con los DAG estáticos.

  • Los DAG no son visibles en la IU de Airflow ni en la IU de DAG.

  • Los DAG no están programados para su ejecución.

  • Hay errores en los registros del procesador de DAG, por ejemplo:

    dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started
    at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
    

    o

    dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py exited with return
    code 1.
    
  • Los procesadores de DAG tienen problemas que generan reinicios.

  • Las tareas de Airflow programadas para su ejecución se cancelan, y las ejecuciones de DAG para los DAG que no se pudieron analizar pueden marcarse como failed. Por ejemplo:

    airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1
    manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag
    'dag-example'. Marking it as removed.
    

Solution:

  • Aumenta los parámetros relacionados con el análisis de DAG:

  • Corrige o quita los DAG que causan problemas al procesador de DAG.

Inspecciona los tiempos de análisis del DAG

Para verificar si el problema ocurre durante el análisis del DAG, sigue estos pasos.

Console

En Google Cloud console, puedes usar la página Monitoring y la pestaña Logs para inspeccionar los tiempos de análisis de DAG.

Inspecciona los tiempos de análisis del DAG con la página de supervisión de Cloud Composer:

  1. En la Google Cloud consola, ve a la página Entornos.

    [Ir a Entornos][console-list-env]

  2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Supervisión.

  3. En la pestaña Monitoring, revisa el gráfico Tiempo total de análisis de todos los archivos DAG en la sección Ejecuciones de DAG y, luego, identifica posibles problemas.

    En la sección Ejecuciones de DAG, en la pestaña Monitoring de Composer, se muestran las métricas de estado de los DAG en tu entorno

Inspecciona los tiempos de análisis del DAG con la pestaña Registros de Cloud Composer:

  1. En la Google Cloud consola, ve a la página Entornos.

    [Ir a Entornos][console-list-env]

  2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Supervisión.

  3. Ve a la pestaña Registros y, en el árbol de navegación Todos los registros, selecciona la sección Administrador de procesadores DAG.

  4. Revisa los registros de dag-processor-manager y, luego, identifica los posibles problemas.

    Los registros del procesador de DAG mostrarán los tiempos de análisis de DAG.

gcloud

Usa el comando dags report para ver el tiempo de análisis de todos tus DAG.

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags report

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME por el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno.

El resultado del comando es similar al siguiente:

Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file                  | duration       | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py       | 0:00:00.038334 | 2       | 10       | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1       | 1        | airflow_monitoring

Busca el valor de duration para cada uno de los DAG que aparecen en la tabla. Un valor alto puede indicar que uno de tus DAG no se implementa de manera óptima. En la tabla de resultados, puedes identificar qué DAGs tienen un tiempo de análisis prolongado.

Soluciona problemas durante el análisis de DAG

En las siguientes secciones, se describen los síntomas y las posibles soluciones para algunos problemas comunes durante el análisis de DAG.

Cantidad limitada de subprocesos

Permitir que el administrador del procesador de DAG use solo una cantidad limitada de subprocesos podría afectar el tiempo de análisis del DAG.

Para resolver el problema, anula las siguientes opciones de configuración de Airflow:

  • Anula el parámetro parsing_processes:

    Sección Clave Valor Notas
    scheduler parsing_processes NUMBER_OF_CPUs_IN_DAG_PROCESSOR - 1 Reemplaza NUMBER_OF_CPUs_IN_DAG_PROCESSOR por la cantidad de CPUs
    en el procesador de DAG.

Haz que el procesador de DAG ignore archivos innecesarios

Puedes mejorar el rendimiento del procesador de DAG si omites los archivos innecesarios en la carpeta de DAG. El procesador de DAG ignora los archivos y carpetas especificados en el archivo .airflowignore.

Para que el procesador de DAG ignore archivos innecesarios, haz lo siguiente:

  1. Crea un archivo .airflowignore.
  2. En este archivo, se muestra una lista de los archivos y las carpetas que se deben ignorar.
  3. Sube este archivo a la carpeta /dags en el bucket de tu entorno.

Para obtener más información sobre el formato de archivo .airflowignore, consulta la documentación de Airflow.

Airflow procesa los DAG pausados

Los usuarios de Airflow pausan los DAG para evitar su ejecución. Esto ahorra ciclos de procesamiento de los trabajadores de Airflow.

Airflow seguirá analizando los DAG pausados. Si en realidad deseas mejorar el rendimiento del procesador de DAG, usa .airflowignore o borra los DAG pausados de la carpeta de DAG.

Problemas comunes

En las siguientes secciones, se describen los síntomas y las posibles soluciones para algunos problemas comunes de análisis.

Tiempo de espera de importación de carga de DAG

Síntoma:

  • En la interfaz web de Airflow, en la parte superior de la página de la lista de DAGs, aparece un cuadro de alerta rojo que muestra Broken DAG: [/path/to/dagfile] Timeout.
  • En Cloud Monitoring: Los registros de airflow-scheduler contienen entradas similares a las siguientes:

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

Solución:

Anula la opción de configuración de Airflow dag_file_processor_timeout y permite más tiempo para el análisis del DAG:

Sección Clave Valor
core dag_file_processor_timeout Nuevo valor del tiempo de espera

Un DAG no es visible en la IU de Airflow ni en la IU de DAG, y el programador no lo programa.

El encargado del tratamiento de datos analiza cada DAG antes de que el programador pueda programarlo y antes de que un DAG se vuelva visible en la IU de Airflow o la IU de DAG.

Las siguientes opciones de configuración de Airflow definen los tiempos de espera para el análisis de DAG:

Si un DAG no se ve en la IU de Airflow o en la IU de DAG, haz lo siguiente:

  • Verifica los registros del procesador de DAG para ver si este puede procesar correctamente tu DAG. En caso de problemas, es posible que veas las siguientes entradas de registro en los registros del procesador o programador de DAG:

    [2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
    /usr/local/airflow/dags/example_dag.py with PID 21903 started at
    2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
    
  • Verifica los registros del programador para ver si funciona correctamente. En caso de problemas, es posible que veas las siguientes entradas de registro en los registros del programador:

    DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
    Process timed out, PID: 68496
    

Soluciones:

  • Corrige todos los errores de análisis de DAG. El procesador de DAG analiza varios DAG y, en casos excepcionales, los errores de análisis de un DAG pueden afectar negativamente el análisis de otros DAG.

  • Si el análisis de tu DAG tarda más que la cantidad de segundos definida en [core]dagbag_import_timeout, aumenta este tiempo de espera.

  • Si el análisis de todos tus DAG demora más que la cantidad de segundos definida en [core]dag_file_processor_timeout, aumenta este tiempo de espera.

  • Si tu DAG tarda mucho tiempo en analizarse, también puede significar que no se implementa de manera óptima. Por ejemplo, si lee muchas variables de entorno o realiza llamadas a servicios externos o a la base de datos de Airflow. En la medida de lo posible, evita realizar esas operaciones en las secciones globales de los DAG.

  • Aumenta los recursos de CPU y memoria del procesador de DAG para que funcione más rápido.

  • Disminuye la frecuencia del análisis de DAG.

  • Disminuye la carga en la base de datos de Airflow.

¿Qué sigue?