DAG 프로세서 문제 해결

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

이 페이지에서는 DAG 파일 처리와 관련된 문제만 다룹니다. 태스크 예약 문제는 Airflow 스케줄러 문제 해결을 참고하세요.

문제 해결 워크플로

DAG 프로세서 로그 검사

DAG가 복잡한 경우 DAG 프로세서가 일부 DAG를 파싱하지 않을 수 있습니다. 이로 인해 다음과 같은 증상이 나타나는 많은 문제가 발생할 수 있습니다.

증상

  • DAG를 파싱할 때 DAG 프로세서에 문제가 발생하면 여기에 나열된 문제가 복합적으로 발생할 수 있습니다. DAG가 동적으로 생성되는 경우 이러한 문제는 정적 DAG에 비해 더 큰 영향을 미치기도 합니다.

  • DAG가 Airflow UI 및 DAG UI에 표시되지 않습니다.

  • DAG의 실행이 예약되지 않습니다.

  • DAG 프로세서 로그에 다음과 같은 오류가 있습니다.

    dag-processor-manager [2023-04-21 21:10:44,510] {manager.py:1144} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py with PID 68311 started
    at 2023-04-21T21:09:53.772793+00:00 has timed out, killing it.
    

    또는

    dag-processor-manager [2023-04-26 06:18:34,860] {manager.py:948} ERROR -
    Processor for /home/airflow/gcs/dags/dag-example.py exited with return
    code 1.
    
  • DAG 프로세서에 다시 시작되는 문제가 발생합니다.

  • 실행이 예약된 Airflow 태스크는 취소되고 파싱에 실패한 DAG의 DAG 실행은 failed로 표시될 수 있습니다. 예를 들면 다음과 같습니다.

    airflow-scheduler Failed to get task '<TaskInstance: dag-example.task1--1
    manual__2023-04-17T10:02:03.137439+00:00 [removed]>' for dag
    'dag-example'. Marking it as removed.
    

해결책:

  • DAG 파싱과 관련된 매개변수를 늘립니다.

  • DAG 프로세서에 문제를 일으키는 DAG를 수정하거나 삭제합니다.

DAG 파싱 시간 검사

DAG 파싱 시간에 문제가 발생하는지 확인하려면 다음 단계를 따르세요.

콘솔

Google Cloud 콘솔에서 Monitoring 페이지 및 Logs 탭을 사용하여 DAG 파싱 시간을 검사할 수 있습니다.

Cloud Composer Monitoring 페이지에서 DAG 파싱 시간을 검사합니다.

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    [환경으로 이동][console-list-env]

  2. 환경 목록에서 환경 이름을 클릭합니다. Monitoring 페이지가 열립니다.

  3. Monitoring 탭에서 DAG 실행 섹션에 있는 모든 DAG 파일의 총 파싱 시간 차트를 검토하고 발생 가능한 문제가 있는지 식별합니다.

    Composer Monitoring 탭의 DAG 실행 섹션에 해당 환경의 DAG에 대한 상태 측정항목이 표시됩니다.

Cloud Composer 로그 탭에서 DAG 파싱 시간을 검사합니다.

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    [환경으로 이동][console-list-env]

  2. 환경 목록에서 환경 이름을 클릭합니다. Monitoring 페이지가 열립니다.

  3. 로그 탭으로 이동하고 모든 로그 탐색 트리에서 DAG 프로세서 관리자 섹션을 선택합니다.

  4. dag-processor-manager 로그를 검토하고 발생 가능한 문제를 식별합니다.

    DAG 파싱 시간이 표시된 DAG 프로세서 로그

gcloud

dags report 명령어를 사용하여 모든 DAG의 파싱 시간을 확인합니다.

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags report

다음과 같이 바꿉니다.

  • ENVIRONMENT_NAME: 환경 이름
  • LOCATION을 환경이 위치한 리전으로 바꿉니다.

명령어 출력은 다음과 비슷하게 표시됩니다.

Executing within the following Kubernetes cluster namespace: composer-2-0-31-airflow-2-3-3
file                  | duration       | dag_num | task_num | dags
======================+================+=========+==========+===================
/manydagsbig.py       | 0:00:00.038334 | 2       | 10       | serial-0,serial-0
/airflow_monitoring.py| 0:00:00.001620 | 1       | 1        | airflow_monitoring

표에 나열된 각 DAG의 duration 값을 찾습니다. 큰 값은 DAG 중 하나가 최적의 방식으로 구현되지 않았음을 나타낼 수 있습니다. 출력 테이블에서 파싱 시간이 긴 DAG를 식별할 수 있습니다.

DAG 파싱 시간에 문제 해결

다음 섹션에서는 DAG 파싱 시간에 발생하는 몇 가지 일반적인 문제들에 대한 증상과 가능한 해결 방법을 설명합니다.

스레드 수 제한

DAG 프로세서 관리자가 제한된 스레드 수만 사용하도록 허용하면 DAG 파싱 시간에 영향을 줄 수 있습니다.

이 문제를 해결하려면 다음 Airflow 구성 옵션을 재정의하세요.

  • parsing_processes 매개변수를 재정의합니다.

    섹션 참고
    scheduler parsing_processes NUMBER_OF_CPUs_IN_SCHEDULER - 1 NUMBER_OF_CPUs_IN_SCHEDULER를 스케줄러의 CPU 수
    로 바꿉니다.

DAG 프로세서가 불필요한 파일을 무시하도록 설정

DAG 폴더에서 불필요한 파일을 건너뛰어 DAG 프로세서 성능을 향상시킬 수 있습니다. DAG 프로세서는 .airflowignore 파일에 지정된 파일 및 폴더를 무시합니다.

DAG 프로세서가 불필요한 파일을 무시하도록 설정하려면 다음 안내를 따르세요.

  1. .airflowignore 파일을 만듭니다.
  2. 이 파일에서 무시할 파일 및 폴더를 나열합니다.
  3. 환경 버킷의 /dags 폴더에 이 파일을 업로드합니다.

.airflowignore 파일 형식에 대한 자세한 내용은 Airflow 문서를 참조하세요.

Airflow에서 일시중지된 DAG 처리

Airflow 사용자는 실행 방지를 위해 DAG를 일시중지합니다. 이렇게 하면 Airflow 작업자의 처리 주기가 줄어듭니다.

Airflow에서 일시중지된 DAG를 계속 파싱합니다. DAG 프로세서의 성능을 실질적으로 개선하려면 .airflowignore를 사용하거나 DAG 폴더에서 일시중지된 DAG를 삭제합니다.

일반적인 문제

다음 섹션에서는 몇 가지 일반적인 파싱 문제의 증상과 가능한 해결 방법을 설명합니다.

DAG 로드 가져오기 시간 초과

증상

  • Airflow 웹 인터페이스의 DAG 목록 페이지 위에서 경고 알림 상자에 Broken DAG: [/path/to/dagfile] Timeout이 표시됩니다.
  • Cloud Monitoring: airflow-scheduler 로그에 다음과 비슷한 항목이 포함됩니다.

    • ERROR - Process timed out
    • ERROR - Failed to import: /path/to/dagfile
    • AirflowTaskTimeout: Timeout

해결:

dag_file_processor_timeout Airflow 구성 옵션을 재정의하고 DAG 파싱 제한 시간을 늘립니다.

섹션
core dag_file_processor_timeout 새 제한 시간 값

DAG가 Airflow UI 또는 DAG UI에 표시되지 않으며 스케줄러가 예약하지 않음

DAG 프로세서는 DAG가 스케줄러를 통해 예약될 수 있고 Airflow UI 또는 DAG UI에 표시되기 전에 각 DAG를 파싱합니다.

다음 Airflow 구성 옵션은 DAG 파싱을 위한 제한 시간을 정의합니다.

DAG가 Airflow UI 또는 DAG UI에 표시되지 않는 경우:

  • DAG 프로세서 로그를 확인하여 DAG 프로세서가 DAG를 올바르게 처리할 수 있는지 확인합니다. 문제가 발생하면 DAG 프로세서 또는 스케줄러 로그에 다음 로그 항목이 표시될 수 있습니다.

    [2020-12-03 03:06:45,672] {dag_processing.py:1334} ERROR - Processor for
    /usr/local/airflow/dags/example_dag.py with PID 21903 started at
    2020-12-03T03:05:55.442709+00:00 has timed out, killing it.
    
  • 스케줄러 로그를 확인하여 스케줄러가 올바르게 작동하는지 확인합니다. 문제가 발생하면 스케줄러 로그에 다음 로그 항목이 표시될 수 있습니다.

    DagFileProcessorManager (PID=732) last sent a heartbeat 240.09 seconds ago! Restarting it
    Process timed out, PID: 68496
    

솔루션:

  • 모든 DAG 파싱 오류를 수정합니다. DAG 프로세서는 여러 DAG를 파싱하며 드물기는 하지만 한 DAG의 오류를 파싱하면 다른 DAG의 파싱에 부정적인 영향을 줄 수 있습니다.

  • DAG 파싱이 [core]dagbag_import_timeout에 정의된 시간(초)보다 오래 걸리면 이 제한 시간을 늘립니다.

  • 모든 DAG의 파싱이 [core]dag_file_processor_timeout에 정의된 시간(초)보다 오래 걸리면 이 제한 시간을 늘립니다.

  • DAG를 파싱하는 데 시간이 오래 걸린다면 최적의 방식으로 구현되지 않은 것일 수도 있습니다. 예를 들어 환경 변수를 여러 개 읽거나 외부 서비스 또는 Airflow 데이터베이스를 호출하는 경우입니다. 가능하면 DAG의 전역 섹션에서 이러한 작업을 수행하지 마세요.

  • 스케줄러의 CPU 및 메모리 리소스를 늘려서 더 빠르게 작업할 수 있습니다.

  • 스케줄러의 수를 조정합니다.

  • 파싱을 더 빠르게 수행할 수 있도록 DAG 프로세서 프로세스의 수를 늘립니다. 이렇게 하려면 [scheduler]parsing_process 값을 늘리면 됩니다.

  • DAG 파싱 실행 빈도를 줄입니다.

  • Airflow 데이터베이스의 부하를 줄입니다.

다음 단계