Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本页仅介绍与 DAG 文件处理相关的问题。如需了解与调度任务相关的问题,请参阅排查 Airflow 调度程序问题。
问题排查工作流
检查 DAG 处理器日志
如果您的 DAG 较为复杂,DAG 处理器可能无法解析所有 DAG。这可能会导致许多问题,这些问题具有以下症状。
症状:
如果 DAG 处理器在解析 DAG 时遇到问题,则可能会导致出现此处列出的问题组合。与静态 DAG 相比,如果 DAG 是动态生成的,这些问题的影响可能会更大。
DAG 在 Airflow 界面和 DAG 界面中不可见。
DAG 不会安排执行时间。
DAG 处理器日志中存在错误,例如:
dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR - Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
或
dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR - Processor for /home/airflow/gcs/dags/dag-example.py exited with return code 1.
DAG 处理器出现问题,导致重启。
已安排执行的 Airflow 任务会被取消,并且无法解析的 DAG 的 DAG 运行作业可能会被标记为
failed
。例如:airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1 manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag 'dag-example'. Marking it as removed.
解决方案:
增加与 DAG 解析相关的参数:
将 dagbag-import-timeout 增加到至少 120 秒(如有需要,可以更长)。
将 dag-file-processor-timeout 增加到至少 180 秒(如果需要,可以更长)。此值必须高于
dagbag-import-timeout
。
更正或移除会导致 DAG 处理器出现问题的 DAG。
检查 DAG 解析时间
如需验证问题是否发生在 DAG 解析时间,请执行以下步骤。
控制台
在 Google Cloud 控制台中,您可以使用 Monitoring 页面和 Logs 标签页来检查 DAG 解析时间。
使用 Cloud Composer“监控”页面检查 DAG 解析时间:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。系统随即会打开监控页面。
在 Monitoring 标签页中,查看 DAG 运行部分中的所有 DAG 文件的总解析时间图表,并找出可能的问题。
使用 Cloud Composer 日志标签页检查 DAG 解析时间:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。系统随即会打开监控页面。
前往日志标签页,然后从所有日志导航树中选择 DAG 处理器管理器部分。
查看
dag-processor-manager
日志并找出可能存在的问题。
gcloud
使用 dags report
命令可查看所有 DAG 的解析时间。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags report
您需要进行如下替换:
ENVIRONMENT_NAME
替换为环境的名称。LOCATION
替换为环境所在的区域。
该命令的输出类似于以下内容:
Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file | duration | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py | 0:00:00.038334 | 2 | 10 | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1 | 1 | airflow_monitoring
查找表格中列出的每个 DAG 的时长值。如果值较大,则可能表示某个 DAG 未以最佳方式实施。从输出表中,您可以确定哪些 DAG 的解析时间较长。
DAG 解析时排查问题
以下部分介绍了 DAG 解析时一些常见问题的症状和可能的修复。
受限线程数量
仅允许 DAG 处理器管理器使用有限数量的线程可能会影响您的 DAG 解析时间。
如需解决此问题,请替换以下 Airflow 配置选项:
-
部分 键 值 备注 scheduler
parsing_processes
NUMBER_OF_CPUs_IN_SCHEDULER - 1
将 NUMBER_OF_CPUs_IN_SCHEDULER
替换为调度程序中的 CPU 数量
。
让 DAG 处理器忽略不必要的文件
您可以通过跳过 DAGs 文件夹中的不必要文件来提高 DAG 处理器的性能。DAG 处理器会忽略 .airflowignore
文件中指定的文件和文件夹。
如需让 DAG 处理器忽略不必要的文件,请执行以下操作:
- 创建
.airflowignore
文件。 - 在此文件中,列出应忽略的文件和文件夹。
- 将此文件上传到环境的存储桶中的
/dags
文件夹。
如需详细了解 .airflowignore
文件格式,请参阅 Airflow 文档。
Airflow 处理暂停的 DAG
Airflow 用户会暂停 DAG 以避免执行。这样可以节省 Airflow 工作器处理周期。
Airflow 会继续解析已暂停的 DAG。如果您确实想要提高 DAG 处理器的性能,请使用 .airflowignore
或从 DAGs 文件夹中删除已暂停的 DAG。
常见问题
以下部分介绍了一些常见解析问题的症状和可能的修复措施。
DAG 加载导入超时
具体情况:
- 在 Airflow 网页界面中,DAG 列表页面顶部出现一个红色提醒框,其中显示
Broken DAG: [/path/to/dagfile] Timeout
。 在 Cloud Monitoring 中:
airflow-scheduler
日志包含类似于以下内容的条目:ERROR - Process timed out
ERROR - Failed to import: /path/to/dagfile
AirflowTaskTimeout: Timeout
修复:
替换 dag_file_processor_timeout
Airflow 配置选项,并留出更多时间来进行 DAG 解析:
部分 | 键 | 值 |
---|---|---|
core |
dag_file_processor_timeout |
新超时值 |
DAG 在 Airflow 界面或 DAG 界面中不可见,并且调度器不会对其进行调度
DAG 处理器会解析每个 DAG,之后调度器才能对这些 DAG 进行调度,并且 DAG 才能在 Airflow 界面或 DAG 界面中显示。
以下 Airflow 配置选项用于定义解析 DAG 的超时设置:
[core]dagbag_import_timeout
定义了 DAG 处理器解析单个 DAG 所需的时间。[core]dag_file_processor_timeout
定义了 DAG 处理器在解析所有 DAG 时可以花费的总时间。
如果 DAG 未显示在 Airflow 界面或 DAG 界面中,请执行以下操作:
检查 DAG 处理器日志,确认 DAG 处理器能否正确处理您的 DAG。如果出现问题,您可能会在 DAG 处理器或调度程序日志中看到以下日志条目:
[2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for /usr/local/airflow/dags/example_dag.py with PID 21903 started at 2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
检查调度程序日志,了解调度程序是否正常运行。如果出现问题,您可能会在调度程序日志中看到以下日志条目:
DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it Process timed out, PID: 68496
解决方案:
修正所有 DAG 解析错误。DAG 处理器会解析多个 DAG,在极少数情况下,一个 DAG 的解析错误可能会对其他 DAG 的解析产生负面影响。
如果 DAG 的解析时间超过
[core]dagbag_import_timeout
中定义的秒数,请延长此超时时间。如果解析所有 DAG 所需的时间超过
[core]dag_file_processor_timeout
中定义的秒数,请延长此超时时间。如果 DAG 需要很长时间才能解析,也可能表示它未以最佳方式实现。例如,如果它读取许多环境变量,或者执行对外部服务或 Airflow 数据库的调用。请尽可能避免在 DAG 的全局部分执行此类操作。
增加调度器的 CPU 和内存资源,以便其更快地运行。
增加 DAG 处理器进程的数量,以加快解析速度。您可以通过增加
[scheduler]parsing_process
的值来实现此目的。