Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta página, se proporcionan información y pasos para solucionar problemas comunes con los programadores de Airflow.
Identifica el origen del problema
Para comenzar a solucionar problemas, primero debes identificar si el problema ocurre en el momento del análisis del DAG o mientras se procesan las tareas en el momento de la ejecución. Para obtener más información sobre el tiempo de análisis y el tiempo de ejecución, consulta la Diferencia entre el tiempo de análisis y de ejecución del DAG.
Cómo inspeccionar problemas de procesamiento de DAG
Supervisa tareas en ejecución y en cola
Para verificar si hay tareas atascadas en una cola, sigue estos pasos.
En la consola de Google Cloud , ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña Monitoring.
En la pestaña Monitoring, revisa el gráfico Tareas de Airflow en la sección Ejecuciones de DAG y, luego, identifica posibles problemas. Las tareas de Airflow son tareas que se encuentran en un estado en cola en Airflow y pueden ir a la cola de agentes de Celery o de Kubernetes Executor. Las tareas en cola de Celery son instancias de tareas que se ponen en la cola de agentes de Celery.
Soluciona problemas durante el análisis de DAG
En las siguientes secciones, se describen los síntomas y las posibles soluciones para algunos problemas comunes durante el análisis de DAG.
Distribución de números y horarios de las tareas
Airflow es conocido por tener problemas para programar una gran cantidad de tareas pequeñas. En tales casos, debes elegir una cantidad menor de tareas más consolidadas.
Programar una gran cantidad de DAG o tareas al mismo tiempo también puede ser una posible fuente de problemas. Para evitar este problema, distribuye tus tareas de manera más uniforme en el tiempo.
Soluciona problemas de tareas en ejecución y en cola
En las siguientes secciones, se describen los síntomas y las posibles soluciones para algunos problemas comunes con las tareas en cola y en ejecución.
No se ejecutan las ejecuciones de DAG
Síntoma:
Cuando se establece de forma dinámica una fecha de programación para un DAG, esto puede generar varios efectos secundarios inesperados. Por ejemplo:
Una ejecución de DAG siempre está en el futuro y el DAG nunca se ejecuta.
Las ejecuciones de DAG anteriores se marcan como ejecutadas y correctas a pesar de que no se ejecutaron.
Hay más información disponible en la documentación de Apache Airflow.
Soluciones posibles:
Sigue las recomendaciones de la documentación de Apache Airflow.
Establece
start_date
estático para los DAG. Como opción, puedes usarcatchup=False
para inhabilitar la ejecución del DAG para fechas anteriores.Evita usar
datetime.now()
odays_ago(<number of days>)
, a menos que conozcas los efectos secundarios de este enfoque.
Las listas de tareas en cola son demasiado largas
En algunos casos, una lista de tareas en cola puede ser demasiado larga para el programador. Para obtener información sobre cómo optimizar los parámetros de trabajador y Celery, lee sobre cómo escalar tu entorno de Cloud Composer junto con tu negocio.
Cómo usar la función TimeTable del programador de Airflow
A partir de Airflow 2.2, puedes definir un programa de tiempo para un DAG con una función nueva llamada TimeTable.
Puedes definir un horario con uno de los siguientes métodos:
Recursos de clúster limitados
Esta sección solo se aplica a Cloud Composer 1.
Es posible que experimentes problemas de rendimiento si el clúster de GKE de tu entorno es demasiado pequeño para todos tus DAG y tareas. En este caso, prueba una de las siguientes soluciones:
- Crea un entorno nuevo con un tipo de máquina que proporcione más rendimiento y migra tus DAG a él
- Crea más entornos de Cloud Composer y divide los DAG entre ellos
- Cambia el tipo de máquina de los nodos de GKE, como se describe en Actualización del tipo de máquina de los nodos de GKE. Dado que este procedimiento es propenso a errores, es la opción menos recomendada.
- Actualiza el tipo de máquina de la instancia de Cloud SQL que ejecuta la base de datos de Airflow en tu entorno, por ejemplo, mediante los comandos
gcloud composer environments update
. El rendimiento bajo de la base de datos de Airflow podría ser la razón por la que el programador es lento.
Evita la programación de tareas durante los períodos de mantenimiento
Puedes definir períodos de mantenimiento específicos para tu entorno. Durante estos períodos, se producen los eventos de mantenimiento para Cloud SQL y GKE.
Uso de “wait_for_downstream” en tus DAG
Si configuras el parámetro wait_for_downstream
en True
en tus DAG, para que una tarea tenga éxito, todas las tareas que se encuentran en una etapa posterior de esta tarea también deben tener éxito. Significa que la ejecución de las tareas que pertenecen a una ejecución de DAG determinada puede ralentizarse mediante la ejecución de tareas de la ejecución del DAG anterior. Obtén más información sobre este tema en la documentación de Airflow.
Las tareas que estén en cola durante demasiado tiempo se cancelarán y reprogramarán.
Si una tarea de Airflow se mantiene en la cola durante demasiado tiempo, el programador la volverá a programar para su ejecución (en versiones de Airflow anteriores a la 2.3.1, la tarea también se marca como fallida y se vuelve a intentar si es apta para un reintento).
Una forma de observar los síntomas de esta situación es mirar el gráfico con la cantidad de tareas en cola (pestaña “Supervisión” en la IU de Cloud Composer) y, si los picos de este gráfico no disminuyen en aproximadamente dos horas, es probable que se reprogramen las tareas (sin registros) seguidas de entradas de registro “Las tareas adoptadas aún estaban pendientes …” en los registros del programador. En esos casos, es posible que veas el mensaje “No se encontró el archivo de registro…” en los registros de tareas de Airflow porque la tarea no se ejecutó.
En general, se espera este comportamiento, y la siguiente instancia de la tarea programada se debe ejecutar según la programación. Si observas muchos casos de este tipo en tus entornos de Cloud Composer, es posible que signifique que no hay suficientes trabajadores de Airflow en tu entorno para procesar todas las tareas programadas.
Resolución: Para resolver este problema, debes asegurarte de que siempre haya capacidad en los trabajadores de Airflow para ejecutar tareas en cola. Por ejemplo, puedes aumentar la cantidad de trabajadores o worker_concurrency. También puedes ajustar el paralelismo o los grupos para evitar que se almacenen en cola tareas más allá de la capacidad que tienes.
De forma esporádica, las tareas inactivas pueden bloquear la ejecución de un DAG específico.
En casos normales, el programador de Airflow debería poder controlar situaciones en las que hay tareas inactivas en la cola y, por algún motivo, no es posible ejecutarlas correctamente (p.ej., se borró un DAG al que pertenecen las tareas inactivas).
Si el programador no borra estas tareas inactivas, es posible que debas borrarlas de forma manual. Puedes hacerlo, por ejemplo, en la IU de Airflow. Puedes navegar a (Menú > Navegador > Instancias de tareas), encontrar tareas en cola que pertenecen a un DAG inactivo y borrarlas.
Para resolver este problema, actualiza tu entorno a la versión 2.1.12 o posterior de Cloud Composer.
Enfoque de Cloud Composer para el parámetro [scheduler]min_file_process_interval
Cloud Composer cambia la forma en que el programador de Airflow usa [scheduler]min_file_process_interval
.
Airflow 1
En el caso de Cloud Composer que usa Airflow 1, los usuarios pueden establecer el valor de [scheduler]min_file_process_interval
entre 0 y 600 segundos. Los valores superiores a 600 segundos proporcionan los mismos resultados que si [scheduler]min_file_process_interval
se establece en 600 segundos.
Airflow 2
En Airflow 2, [scheduler]min_file_process_interval
solo se puede usar con las versiones 1.19.9 y 2.0.26 o versiones posteriores.
Versiones de Cloud Composer anteriores a 1.19.9 y 2.0.26
En estas versiones, se ignora
[scheduler]min_file_process_interval
.Versiones 1.19.9 o 2.0.26 de Cloud Composer, o versiones posteriores
El programador de Airflow se reinicia después de una cierta cantidad de veces que se programan todos los DAG, y el parámetro
[scheduler]num_runs
controla cuántas veces lo hace el programador. Cuando el programador llega a los bucles de programación de[scheduler]num_runs
, se reinicia. El programador es un componente sin estado, y ese reinicio es un mecanismo de reparación automática para cualquier problema que pueda experimentar. Cuando no se especifica, se aplica el valor predeterminado de[scheduler]num_runs
, que es 5000.Se puede usar
[scheduler]min_file_process_interval
para configurar la frecuencia con la que se realiza el análisis de DAG, pero este parámetro no puede ser más largo que el tiempo requerido para que un programador realice bucles[scheduler]num_runs
cuando programe tus DAG.
Ajusta la configuración de Airflow
Airflow proporciona opciones de configuración que controlan cuántas tareas y DAG de Airflow pueden ejecutarse al mismo tiempo. Para establecer estas opciones de configuración, anula sus valores para tu entorno.
Simultaneidad de los trabajadores
El parámetro
[celery]worker_concurrency
controla la cantidad máxima de tareas que un trabajador de Airflow puede ejecutar al mismo tiempo. Si multiplicas el valor de este parámetro por la cantidad de trabajadores de Airflow en tu entorno de Cloud Composer, obtendrás la cantidad máxima de tareas que se pueden ejecutar en un momento determinado de tu entorno. Este número está limitado por la opción de configuración[core]parallelism
de Airflow, que se describe con más detalle.En los entornos de Cloud Composer 2, el valor predeterminado de
[celery]worker_concurrency
se calcula automáticamente.Para las versiones de Airflow 2.3.3 y posteriores,
[celery]worker_concurrency
se establece en un valor mínimo de 32, 12 × worker_CPU y 8 × worker_memory.Para las versiones de Airflow 2.2.5 o anteriores,
[celery]worker_concurrency
se establece en 12 × cantidad de CPUs de los trabajadores.
Ejecuciones máximas de DAG activas
La opción de configuración
[core]max_active_runs_per_dag
de Airflow controla la cantidad máxima de ejecuciones activas del DAG por DAG. El programador no crea más ejecuciones de DAG si alcanza este límite.Si este parámetro se establece de forma incorrecta, puedes encontrar un problema en el que el programador regula la ejecución del DAG porque no puede crear más instancias de ejecución de DAG en un momento determinado.
Cantidad máxima de tareas activas por DAG
La opción de configuración
[core]max_active_tasks_per_dag
de Airflow controla la cantidad máxima de instancias de tareas que se pueden ejecutar de forma simultánea en cada DAG. Es un parámetro a nivel del DAG.Si este parámetro se establece de manera incorrecta, es posible que tengas un problema en el que la ejecución de una sola instancia de DAG sea lenta porque solo hay una cantidad limitada de tareas de DAG que se pueden ejecutar en un momento determinado.
Solución: Aumenta
[core]max_active_tasks_per_dag
.Paralelismo y tamaño del grupo
La opción de configuración
[core]parallelism
de Airflow controla cuántas tareas puede poner en cola el programador de Airflow en la cola del ejecutor después de que se cumplen todas las dependencias para estas tareas.Este es un parámetro global para toda la configuración de Airflow.
Las tareas se ponen en cola y se ejecutan dentro de un grupo. Los entornos de Cloud Composer usan solo un grupo. El tamaño de este grupo controla cuántas tareas puede poner en cola el programador para su ejecución en un momento determinado. Si el tamaño del grupo es demasiado pequeño, el programador no puede poner en cola las tareas para su ejecución a pesar de los límites, que se definen mediante la opción de configuración
[core]parallelism
y[celery]worker_concurrency
multiplicada por la cantidad de trabajadores de Airflow, que todavía no se cumplen.Puedes configurar el tamaño del grupo en la IU de Airflow (Menú > Administrador > Grupos). Ajusta el tamaño del grupo al nivel del paralelismo que esperas en tu entorno.
Por lo general,
[core]parallelism
se establece como un producto de la cantidad máxima de trabajadores y [celery]worker_concurrency.
Marcar tareas como fallidas después de alcanzar dagrun_timeout
El programador marca las tareas que no están terminadas (en ejecución, programadas y en cola) como fallidas si una ejecución de DAG no finaliza dentro de dagrun_timeout
(un parámetro de DAG).
Solución:
Extiende
dagrun_timeout
para cumplir con el tiempo de espera.(Cloud Composer 2) Aumentar la cantidad de trabajadores o aumentar los parámetros de rendimiento de los trabajadores, para que el DAG se ejecute más rápido.
Síntomas de que la base de datos de Airflow está bajo presión de carga
A veces, en los registros del programador de Airflow, es posible que veas la siguiente entrada de registro de advertencia:
Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"
También se pueden observar síntomas similares en los registros de trabajadores de Airflow:
Para MySQL:
(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"
Para PostgreSQL:
psycopg2.OperationalError: connection to server at ... failed
Estos errores o advertencias pueden ser un síntoma de que la base de datos de Airflow está sobrecargada con la cantidad de conexiones abiertas o la cantidad de consultas que se ejecutan al mismo tiempo, ya sea por programadores o por otros componentes de Airflow, como trabajadores, activadores y servidores web.
Soluciones posibles:
Cómo quitar datos innecesarios de la base de datos de Airflow
Ajusta la escala de la base de datos de Airflow:
- (Cloud Composer 1) Cambia el tipo de máquina de la instancia de Cloud SQL que almacena la base de datos de Airflow de tu entorno.
- (Cloud Composer 2) Ajusta el tamaño de tu entorno.
Reduce la cantidad de programadores. En la mayoría de los casos, uno o dos programadores son suficientes para analizar y programar tareas de Airflow. No se recomienda configurar más de dos programadores, a menos que haya un caso justificado.
Evita usar variables globales en los DAG de Airflow: variables de entorno de Cloud Composer y variables de Airflow.
Establece [scheduler]scheduler-heartbeat-sec en un valor más alto, por ejemplo, en 15 segundos o más.
Establece [scheduler]job-heartbeat-sec en un valor más alto, por ejemplo, 30 segundos o más.
Establece [scheduler]scheduler_health_check_threshold en un valor igual a
[scheduler]job-heartbeat-sec
multiplicado por4
.
El servidor web muestra la advertencia "Parece que el programador no está ejecutándose".
El programador informa su señal de monitoreo de funcionamiento con regularidad a la base de datos de Airflow. En función de esta información, el servidor web de Airflow determina si el programador está activo.
A veces, si el programador está bajo una carga pesada, es posible que no pueda informar su señal de monitoreo de funcionamiento cada [scheduler]scheduler-heartbeat-sec.
En tal caso, el servidor web de Airflow podría mostrar la siguiente advertencia:
The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.
Soluciones posibles:
Aumenta los recursos de CPU y memoria del programador.
Optimiza tus DAG para que su análisis y programación sean más rápidos y no consuman demasiados recursos del programador.
Evita usar variables globales en los DAG de Airflow: variables de entorno de Cloud Composer y variables de Airflow.
Aumenta el valor de [scheduler]scheduler-health-check-threshold para que el servidor web espere más tiempo antes de informar la falta de disponibilidad del programador.
Soluciones alternativas para los problemas que se encuentran durante el reabastecimiento de DAG
A veces, es posible que desees volver a ejecutar DAG que ya se ejecutaron. Puedes hacerlo con la herramienta de línea de comandos de Airflow de la siguiente manera:
gcloud composer environments run \
ENVIRONMENT_NAME \
--location LOCATION \
dags backfill -- -B \
-s START_DATE \
-e END_DATE \
DAG_NAME
Para volver a ejecutar solo las tareas fallidas de un DAG específico, usa también el argumento --rerun-failed-tasks
.
Reemplaza lo siguiente:
ENVIRONMENT_NAME
por el nombre del entorno.LOCATION
por la región en la que se encuentra el entorno.START_DATE
con un valor para el parámetro DAGstart_date
, en el formatoYYYY-MM-DD
.END_DATE
con un valor para el parámetro DAGend_date
, en el formatoYYYY-MM-DD
.DAG_NAME
por el nombre del DAG.
A veces, la operación de reabastecimiento puede generar una situación de interbloqueo en la que no es posible reabastecer porque hay un bloqueo en una tarea. Por ejemplo:
2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill
En algunos casos, puedes usar las siguientes soluciones alternativas para superar los interbloqueos:
Para inhabilitar el programador en miniatura, anula
[core]schedule-after-task-execution
enFalse
.Ejecuta reabastecimientos para períodos más estrechos. Por ejemplo, configura
START_DATE
yEND_DATE
para especificar un período de solo 1 día.