Airflow 调度器问题排查

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本页面介绍 Airflow 调度程序常见问题和问题排查步骤和信息。

确定问题的来源

如需开始进行问题排查,请确定问题是出现在 DAG 解析时间还是在执行时处理任务的时候。如需详细了解解析时间和执行时间,请参阅 DAG 解析时间与 DAG 执行时间之间的差异

检查 DAG 处理问题

  1. 检查 DAG 处理器日志

  2. 检查 DAG 解析时间

监控正在运行和已加入队列的任务

如需检查是否有任务卡在队列中,请执行以下步骤。

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

  3. 转到监控标签页。

  4. Monitoring 标签页中,查看 DAG 运行部分中的 Airflow 任务图表,以确定潜在问题。Airflow 任务是指在 Airflow 中处于排队状态的任务,它们可以进入 Celery 或 Kubernetes Executor 代理队列。“Celery 队列中的任务数”是指已进入 Celery 代理队列的任务实例数。

DAG 解析时排查问题

以下部分介绍了 DAG 解析时一些常见问题的症状和可能的修复。

任务的数量和时间分布

已知 Airflow 在安排大量小任务时会出现问题。在这种情况下,您应该选择使用较少的整合任务。

同时调度大量 DAG 或任务也可能是问题的来源。如需避免此问题,请随着时间推移更平均地分配您的任务。

在正在运行和已加入队列的任务中排查问题

以下部分介绍正在运行和已加入队列的任务的一些常见问题的症状和可能的修复。

未执行 DAG 运行作业

具体情况:

动态设置 DAG 的安排日期可能会导致各种意外副作用。例如:

  • DAG 执行时间始终在未来,并且 DAG 永远不会执行。

  • 过去的 DAG 运行会被标记为已执行且成功,即使未执行也是如此。

如需了解详情,请参阅 Apache Airflow 文档

可能的解决方案:

  • 请按照 Apache Airflow 文档中的建议操作。

  • 为 DAG 设置静态 start_date。您可以选择使用 catchup=False 停用针对过去日期运行 DAG。

  • 除非您了解此方法的副作用,否则请避免使用 datetime.now()days_ago(<number of days>)

任务队列过长

在某些情况下,调度器的任务队列可能会过长。如需了解如何优化工作器和 celcel 参数,请参阅将 Cloud Composer 环境与您的业务一起扩缩

使用 Airflow 调度程序的时间表功能

从 Airflow 2.2 开始,您可以使用名为 TimeTable 的新功能为 DAG 定义时间表。

您可以使用以下任一方法定义时间表:

受限集群资源

本部分仅适用于 Cloud Composer 1。

如果环境的 GKE 集群太小,无法处理所有 DAG 和任务,您可能会遇到性能问题。在这种情况下,请尝试以下解决方案之一:

  • 使用性能更高的机器类型创建一个新环境,并将 DAG 迁移到该环境中。
  • 创建更多 Cloud Composer 环境并在它们之间拆分 DAG。
  • 按照升级 GKE 节点的机器类型中的说明更改 GKE 节点的机器类型。由于此过程容易出错,因此这是最不建议的选项。
  • 升级在您的环境中运行 Airflow 数据库的 Cloud SQL 实例的机器类型,例如,使用 gcloud composer environments update 命令。Airflow 数据库性能不佳可能是调度器较慢的原因。

避免在维护窗口内进行任务调度

您可以为环境定义特定的维护窗口。在这些时间段内,会出现 Cloud SQL 和 GKE 的维护事件。

在 DAG 中使用“wait_for_downstream”

如果您将 DAG 中的参数 wait_for_downstream 设置为 True,那么如果您希望某个任务成功,则该任务的所有直接下行任务都必须成功。这意味着,由于执行之前的 DAG 运行中的任务,属于特定 DAG 运行的任务的执行可能会变慢。如需了解详情,请参阅 Airflow 文档

排队时间过长的任务将被取消并重新安排

如果 Airflow 任务在队列中保留的时间过长,则调度器会再次重新安排它以执行(在低于 2.3.1 的 Airflow 版本中,如果任务符合重试条件,系统还会将其标记为失败并重试)。

如需观察此类情况的症状,一种方法是查看队列任务数量的图表(Cloud Composer 界面中的“监控”标签页),如果此图表中的峰值在约两小时内没有下降,则任务很可能会重新调度(没有日志),然后调度程序日志中会出现“Adopted tasks were still pending ...” 日志条目。在这种情况下,您可能会在 Airflow 任务日志中看到“Log file not found...”消息,因为该任务未执行。

一般来说,预期此行为,并且计划任务的下一个实例应该根据时间表执行。如果您在 Cloud Composer 环境中观察到许多此类情况,则可能表示您的环境中没有足够的 Airflow 工作器来处理所有计划任务。

解决方法:如需解决此问题,您需要确保 Airflow 工作器始终具有容量来运行已加入队列的任务。例如,您可以增加工作器数量或提高工作器并发 (worker_concurrency) 值。您还可以调整并行性或池,以防止队列任务超出您具有的容量。

有时,过时的任务可能会阻止特定 DAG 的执行

在一般情况下,Airflow 调度器应该能够处理这样的情况:队列中存在过时任务,并且由于某些原因导致无法正确执行它们(例如,过时任务所属的 DAG 被删除)。

如果调度器未清除这些过时的任务,则您可能需要手动删除这些任务。例如,您可以在 Airflow 界面中执行此操作 - 您可以导航到菜单 > 浏览器 > 任务实例,找到属于过时 DAG 的已加入队列的任务并将其删除。

如需解决此问题,请将您的环境升级到 Cloud Composer 2.1.12 或更高版本。

Cloud Composer 方法 [scheduler]min_file_process_interval 参数

Cloud Composer 更改了 Airflow 调度器使用 [scheduler]min_file_process_interval 的方式。

Airflow 1

对于使用 Airflow 1 的 Cloud Composer,用户可以将 [scheduler]min_file_process_interval 的值设置为 0 到 600 秒。高于 600 秒的值将产生与将 [scheduler]min_file_process_interval 设置为 600 秒相同的结果。

Airflow 2

在 Airflow 2 中,[scheduler]min_file_process_interval 只能与 1.19.9 和 2.0.26 或更高版本搭配使用

  • 低于 1.19.9 和 2.0.26 的 Cloud Composer 版本

    在这些版本中,系统会忽略 [scheduler]min_file_process_interval

  • Cloud Composer 版本 1.19.9 或 2.0.26 或更高版本

    在调度所有 DAG 的次数达到一定数目后,Airflow 调度器会重启,并且 [scheduler]num_runs 参数用于控制调度器执行此操作的次数。当调度程序到达 [scheduler]num_runs 调度循环时,它会重启 - 调度程序是无状态组件,此类重启是对调度程序可能遇到的任何问题的自动修复机制。如果未指定,系统会应用 [scheduler]num_runs 的默认值,即 5000。

    [scheduler]min_file_process_interval 可用于配置 DAG 解析的频率,但此参数不得超过调度器在调度 DAG 时执行 [scheduler]num_runs 循环所需的时间。

扩缩 Airflow 配置

Airflow 提供了 Airflow 配置选项,可用于控制 Airflow 可以同时执行的任务数量和 DAG 数量。如需设置这些配置选项,请根据您的环境替换相应值。

  • 工作器并发

    参数 [celery]worker_concurrency控制 Airflow 工作器可以同时执行的最大任务数。如果将此参数的值乘以您的 Cloud Composer 环境中的 Airflow 工作器的数量,将得出在您的环境中,在给定的时刻可执行的最大任务数。此数字受 [core]parallelism Airflow 配置选项的限制,之后将进一步说明此内容。

    在 Cloud Composer 2 环境中,[celery]worker_concurrency 的默认值会自动计算

    • 对于 Airflow 版本 2.3.3 及更高版本,[celery]worker_concurrency 会设置为 32、12 * worker_CPU 和 8 * worker_memory 中的最小值。

    • 对于 Airflow 版本 2.2.5 或更低版本,[celery]worker_concurrency 设为 12 * 工作器的 CPU 数量。

  • 活跃 DAG 运行次数上限

    [core]max_active_runs_per_dag Airflow 配置选项控制每个 DAG 的活跃 DAG 运行次数的上限。如果次数达到上限,则调度器不会继续创建更多 DAG 运行。

    如果此参数设置不正确,您可能会遇到调度器限制 DAG 执行的问题,因为它无法在给定时刻创建更多 DAG 运行实例。

  • 每个 DAG 的已执行任务数上限

    [core]max_active_tasks_per_dag Airflow 配置选项可控制每个 DAG 中可以并发运行的任务实例数上限。它是 DAG 级别的参数。

    如果此参数设置不正确,您可能会遇到单个 DAG 实例执行缓慢的问题,因为在给定时间可执行的 DAG 任务数有限。

    解决方案:提高 [core]max_active_tasks_per_dag

  • 最大并行数量和池大小

    [core]parallelism Airflow 配置选项可控制在满足任务的所有依赖项后,Airflow 调度器可将多少任务添加到执行程序的队列中。

    这是整个 Airflow 设置的全局参数。

    任务会排入队列并在池中执行。Cloud Composer 环境仅使用一个池。此池的大小可控制调度器在给定时间可将多少任务加入执行队列。如果池太小,则即使尚未达到阈值([core]parallelism 配置选项和 [celery]worker_concurrency 配置选项之和乘以 Airflow 工作器数量所得的值),相应调度器也无法将任务加入执行队列。

    您可以在 Airflow 界面中配置池大小(菜单 > 管理 > )。将池大小调整为您的环境中预期的最大并行数量。

    通常,[core]parallelism 的设置为工作器数量上限和 [celery]worker_concurrency 的乘积。

在达到 dagrun_timeout 后将任务标记为失败

如果 DAG 运行作业未在 dagrun_timeout(DAG 参数)内完成,调度程序会将未完成的任务(正在运行、已调度和已加入队列的任务)标记为失败。

解决方案:

Airflow 数据库承受负载压力的症状

有时,您可能会在 Airflow 调度程序日志中看到以下警告日志条目:

Scheduler heartbeat got an exception: (_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at 'reading initial communication packet', system error: 0")"

您可能还会在 Airflow 工作器日志中观察到类似的症状:

对于 MySQL:

(_mysql_exceptions.OperationalError) (2006, "Lost connection to MySQL server at
'reading initial communication packet', system error: 0")"

对于 PostgreSQL:

psycopg2.OperationalError: connection to server at ... failed

此类错误或警告可能是 Airflow 数据库因调度器或其他 Airflow 组件(例如工作器、触发器和 Web 服务器)同时执行的打开连接数量或查询数量过多而导致的。

可能的解决方案:

Web 服务器显示“调度器似乎未运行”警告

调度程序会定期向 Airflow 数据库报告其心跳。Airflow Web 服务器会根据这些信息确定调度程序是否处于活动状态。

有时,如果调度程序负载过重,则可能无法每隔 [scheduler]scheduler-heartbeat-sec 报告一次心跳。

在这种情况下,Airflow Web 服务器可能会显示以下警告:

The scheduler does not appear to be running. Last heartbeat was received <X>
seconds ago.

可能的解决方案:

针对在回填 DAG 期间遇到的问题的解决方法

有时,您可能需要重新运行已执行的 DAG。您可以使用 Airflow 命令行工具按如下方式执行此操作:

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location LOCATION \
   dags backfill -- -B \
   -s START_DATE \
   -e END_DATE \
   DAG_NAME

如需仅重新运行特定 DAG 的失败任务,还应使用 --rerun-failed-tasks 参数。

您需要进行如下替换:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • START_DATE,其中包含 start_date DAG 参数的值,格式为 YYYY-MM-DD
  • END_DATE,其中包含 end_date DAG 参数的值,格式为 YYYY-MM-DD
  • DAG_NAME 替换为 DAG 名称。

回填操作有时可能会产生死锁情况,即由于任务上存在锁定,无法进行回填。例如:

2022-11-08 21:24:18.198 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.201 CET -------- --------- -------- ------------
2022-11-08 21:24:18.202 CET 2022-11-08 21:24:18.203 CET These tasks are deadlocked:
2022-11-08 21:24:18.203 CET DAG ID Task ID Run ID Try number
2022-11-08 21:24:18.204 CET ----------------------- ----------- ----------------------------------- ------------
2022-11-08 21:24:18.204 CET <DAG name> <Task name> backfill__2022-10-27T00:00:00+00:00 1
2022-11-08 21:24:19.249 CET Command exited with return code 1
...
2022-11-08 21:24:19.348 CET Failed to execute job 627927 for task backfill

在某些情况下,您可以使用以下权宜解决方法来解决死锁问题:

后续步骤