Airflow DAG 예약 및 트리거

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

이 페이지에서는 Airflow에서 예약 및 DAG 트리거가 작동하는 방식, DAG의 일정을 정의하는 방법, DAG를 수동으로 트리거하거나 일시중지하는 방법을 설명합니다.

Cloud Composer의 Airflow DAG 정보

Cloud Composer의 Airflow DAG는 프로젝트의 Cloud Composer 환경 하나 이상에서 실행됩니다. 환경과 연결된 Cloud Storage 버킷에 Airflow DAG의 소스 파일을 업로드합니다. 그러면 Airflow의 환경 인스턴스에서 이러한 파일을 파싱하고 각 DAG의 일정에 따라 DAG 실행을 예약합니다. DAG 실행 중에 Airflow는 DAG에서 정의된 순서대로 DAG를 구성하는 개별 태스크를 예약하고 실행합니다.

Airflow DAG, DAG 실행, 태스크 또는 연산자와 같은 Airflow의 핵심 개념에 대한 자세한 내용은 Airflow 문서의 핵심 개념 페이지를 참조하세요.

Airflow의 DAG 예약 정보

Airflow는 예약 메커니즘에 다음과 같은 개념을 제공합니다.

논리적 날짜

특정 DAG 실행이 실행된 날짜를 나타냅니다.

이는 Airflow가 DAG를 실행하는 실제 날짜가 아니라 특정 DAG 실행이 처리해야 하는 기간입니다. 예를 들어 매일 12:00에 실행되도록 예약된 DAG의 경우 논리적 날짜도 특정 날짜의 12:00입니다. 하루에 두 번 실행되므로 처리해야 하는 기간은 지난 12시간입니다. 동시에 DAG 자체에 정의된 로직에서는 논리적 날짜나 시간 간격을 전혀 사용하지 않을 수 있습니다. 예를 들어 DAG는 논리적 날짜 값을 사용하지 않고 하루에 한 번 같은 스크립트를 실행할 수 있습니다.

Airflow 버전 2.2 미만에서는 이 날짜를 실행 날짜라고 합니다.

실행일

특정 DAG 실행이 실행된 날짜를 나타냅니다.

예를 들어 매일 12:00에 실행되도록 예약된 DAG의 경우 DAG는 실제로 논리적 날짜가 다소 지난 다음인 12:05에 실행될 수 있습니다.

일정 간격

논리적 날짜 측면에서 DAG를 실행해야 하는 시점과 빈도를 나타냅니다.

예를 들어 일일 일정은 DAG가 하루에 한 번 실행되고 DAG 실행의 논리적 날짜 간격은 24시간임을 의미합니다.

시작일

Airflow에서 DAG 예약을 시작할 시간을 지정합니다.

DAG의 태스크에 개별 시작 날짜를 지정하거나 모든 태스크에 대해 단일 시작 날짜를 지정할 수 있습니다. DAG의 태스크에 대한 최소 시작 날짜 및 예약 간격을 기준으로 Airflow가 DAG 실행을 예약합니다.

캐치업, 백필, 재시도

이전 날짜의 DAG 실행을 실행하기 위한 메커니즘

캐치업은 아직 실행되지 않은 DAG 실행을 실행합니다. 예를 들어 DAG가 장시간 일시중지되었다가 재개된 경우입니다. 백필을 사용하여 특정 기간 동안 DAG 실행을 실행할 수 있습니다. 재시도는 DAG에서 태스크를 실행할 때 Airflow에서 시도해야 하는 횟수를 지정합니다.

예약은 다음 방식으로 작동합니다.

  1. 시작 날짜가 지나면 Airflow는 예약 간격의 다음 발생까지 기다립니다.

  2. Airflow는 이 예약 간격이 끝날 때 첫 번째 DAG 실행이 실행되도록 예약합니다.

    예를 들어 DAG가 매시간 실행되도록 예약되었고 시작 날짜가 오늘 12:00이면 첫 번째 DAG 실행은 오늘 13:00에 실행됩니다.

이 문서의 Airflow DAG 예약 섹션에서는 이러한 개념을 사용하여 DAG 예약을 설정하는 방법을 설명합니다. DAG 실행과 예약에 대한 자세한 내용은 Airflow 문서의 DAG 실행을 참조하세요.

DAG 트리거 방법에 대한 정보

Airflow는 다음과 같은 DAG 트리거 방법을 제공합니다.

  • 일정에 따라 트리거 Airflow는 DAG 파일에 지정된 일정에 따라 DAG를 자동으로 트리거합니다.

  • 수동으로 트리거. Google Cloud 콘솔, Airflow UI에서 또는 Google Cloud CLI에서 Airflow CLI 명령어를 실행하여 DAG를 수동으로 트리거할 수 있습니다.

  • 이벤트에 대한 응답으로 트리거. 이벤트에 대한 응답으로 DAG를 트리거하는 표준화된 방법은 센서 사용입니다.

DAG를 트리거하는 다른 방법으로 다음과 같은 방법이 있습니다.

시작하기 전에

  • 계정에 환경 버킷의 객체를 관리하고 DAG를 보고 트리거할 수 있는 역할이 있는지 확인합니다. 자세한 내용은 액세스 제어를 참조하세요.

Airflow DAG 예약

DAG 파일에서 DAG 일정을 정의합니다. 다음과 같은 방법으로 DAG 정의를 수정합니다.

  1. 컴퓨터에서 DAG 파일을 찾아 수정합니다. DAG 파일이 없으면 환경 버킷에서 사본을 다운로드할 수 있습니다. 새 DAG의 경우 DAG 파일을 만들 때 모든 파라미터를 정의할 수 있습니다.

  2. schedule_interval 파라미터에서 일정을 정의합니다. 크론 표현식(예: 0 0 * * *) 또는 사전 설정(예: @daily)을 사용할 수 있습니다. 자세한 내용은 Airflow 문서의 크론 및 시간 간격을 참조하세요.

    Airflow는 설정된 일정을 기준으로 DAG 실행의 논리적 날짜를 결정합니다.

  3. start_date 파라미터에서 시작 날짜를 정의합니다.

    Airflow는 이 파라미터를 사용하여 첫 번째 DAG 실행의 논리적 날짜를 결정합니다.

  4. (선택사항) catchup 파라미터에서 Airflow가 시작 날짜부터 현재 날짜까지 아직 실행되지 않은 이 DAG의 이전 실행을 모두 실행해야 하는지 정의합니다.

    캐치업 중에 실행된 DAG 실행의 논리적 날짜는 과거이고 실행 날짜는 DAG 실행이 실제로 실행된 시간을 반영합니다.

  5. (선택사항) retries 파라미터에서 Airflow가 실패한 태스크를 다시 시도해야 하는 횟수를 정의합니다(각 DAG는 개별 태스크 하나 이상으로 구성됨). 기본적으로 Cloud Composer의 태스크는 두 번 재시도됩니다.

  6. 환경 버킷에 새 버전의 DAG를 업로드합니다.

  7. Airflow에서 DAG를 성공적으로 파싱할 때까지 기다립니다. 예를 들어 Google Cloud 콘솔이나 Airflow UI에서 환경의 DAG 목록을 확인할 수 있습니다.

다음 DAG 정의 예시는 하루에 두 번 00:00 및 12:00에 실행됩니다. 시작 날짜는 2024년 1월 1일로 설정되어 있지만 캐치업이 중지되어 있으므로 개발자가 업로드하거나 중지한 후에는 Airflow가 이전 날짜에 대해 실행하지 않습니다.

DAG에는 BigQueryInsertJobOperator 연산자를 사용하여 테이블에 행 하나를 삽입하는 insert_query_job이라는 태스크 하나가 포함되어 있습니다. 이 연산자는 개발자가 데이터 세트와 테이블을 관리하고 쿼리를 실행하고 데이터를 검증하는 데 사용할 수 있는 Google Cloud BigQuery 연산자 중 하나입니다. 이 태스크의 특정 실행이 실패하면 Airflow는 기본 재시도 간격으로 4번 더 재시도합니다. 이러한 재시도의 논리적 날짜는 동일하게 유지됩니다.

이 행의 SQL 쿼리는 Airflow 템플릿을 사용하여 DAG의 논리적 날짜와 이름을 행에 작성합니다.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_example_scheduling_dag",
  start_date=datetime.datetime(2024, 1, 1),
  schedule_interval='0 */12 * * *',
  catchup=False
  ) as dag:

  insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    retries=4,
    configuration={
        "query": {
            # schema: date (string), description (string)
            # example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
            "query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location="us-central1"
  )

  insert_query_job

이 DAG를 테스트하려면 수동으로 트리거한 후 태스크 실행 로그를 보면 됩니다.

추가 예약 매개변수 예시

다음 예약 파라미터 예시에서는 여러 파라미터 조합에서 예약이 어떻게 작동하는지를 보여줍니다.

  • start_datedatetime(2024, 4, 4, 16, 25)이고 schedule_interval30 16 * * *이면 첫 번째 DAG 실행이 2024년 4월 5일 16:30에 수행됩니다.

  • start_datedatetime(2024, 4, 4, 16, 35)이고 schedule_interval30 16 * * *이면 첫 번째 DAG 실행이 2024년 4월 6일 16:30에 수행됩니다. 시작 날짜가 2024년 4월 4일의 예약 간격 이후이기 때문에 DAG 실행이 2024년 4월 5일에 수행되지 않습니다. 대신 일정 간격이 2024년 4월 5일 16:35에 종료되므로, 다음 DAG 실행이 그 다음 날 16:30으로 예약됩니다.

  • start_datedatetime(2024, 4, 4)이고 schedule_interval@daily이면 첫 번째 DAG 실행이 2024년 4월 5일 00:00에 예약됩니다.

  • start_datedatetime(2024, 4, 4, 16, 30)이고 schedule_interval0 * * * *이면 첫 번째 DAG 실행이 2024년 4월 4일 18:00로 예약됩니다. 지정된 날짜 및 시간이 지나면 Airflow가 매시간 0분에 DAG 실행이 수행되도록 예약합니다. 이렇게 수행될 때 가장 가까운 시점은 17:00입니다. 이때 Airflow는 예약 간격의 끝, 즉 18:00에 DAG 실행이 수행되도록 예약합니다.

수동으로 DAG 트리거

Airflow DAG를 수동으로 트리거하면 Airflow는 DAG 파일에 지정된 일정과 관계없이 DAG를 한 번 실행합니다.

콘솔

DAG UI는 Cloud Composer 1.17.8 이상 버전에서 지원됩니다.

Google Cloud 콘솔에서 DAG를 트리거하려면 다음 안내를 따르세요.

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경을 선택해 세부정보를 표시합니다.

  3. 환경 세부정보 페이지에서 DAG 탭으로 이동합니다.

  4. DAG 이름을 클릭합니다.

  5. DAG 세부정보 페이지에서 DAG 트리거를 클릭합니다. 새 DAG 실행이 생성됩니다.

Airflow UI

Airflow UI에서 DAG를 트리거하려면 다음 안내를 따르세요.

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. Airflow 웹 서버 열에서 해당 환경의 Airflow 링크를 따릅니다.

  3. 적절한 권한이 있는 Google 계정으로 로그인합니다.

  4. Airflow 웹 인터페이스의 DAG 페이지에 있는 해당 DAG의 링크 열에서 DAG 트리거 버튼을 클릭합니다.

  5. (선택사항) DAG 실행 구성을 지정합니다.

  6. 트리거를 클릭합니다.

gcloud

Airflow 1.10.12 이하에서는 trigger_dag Airflow CLI 명령어를 실행하세요.

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    trigger_dag -- DAG_ID

Airflow 2를 포함하여 Airflow 1.10.14 이상에서는 dags trigger Airflow CLI 명령어를 실행합니다.

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

다음을 바꿉니다.

  • ENVIRONMENT_NAME: 환경의 이름입니다.
  • LOCATION: 환경이 위치한 리전
  • DAG_ID: DAG 이름입니다.

Cloud Composer 환경에서 Airflow CLI 실행에 대한 상세 설명은 Airflow CLI 명령어 실행을 참조하세요.

사용 가능한 Airflow CLI 명령어에 대한 상세 설명은 gcloud composer environments run 명령어 참조를 확인하세요.

DAG 실행 로그 및 세부정보 보기

Google Cloud 콘솔에서 다음 작업을 수행할 수 있습니다.

또한 Cloud Composer는 Airflow 자체 웹 인터페이스인 Airflow UI에 대한 액세스 권한을 제공합니다.

DAG 일시중지

콘솔

DAG UI는 Cloud Composer 1.17.8 이상 버전에서 지원됩니다.

Google Cloud 콘솔에서 DAG를 일시중지하려면 다음 안내를 따르세요.

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경을 선택해 세부정보를 표시합니다.

  3. 환경 세부정보 페이지에서 DAG 탭으로 이동합니다.

  4. DAG 이름을 클릭합니다.

  5. DAG 세부정보 페이지에서 DAG 일시중지를 클릭합니다.

Airflow UI

Airflow UI에서 DAG를 일시중지하려면 다음 안내를 따르세요.

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

환경으로 이동

  1. Airflow 웹 서버 열에서 해당 환경의 Airflow 링크를 따릅니다.

  2. 적절한 권한이 있는 Google 계정으로 로그인합니다.

  3. Airflow 웹 인터페이스의 DAG 페이지에서 DAG 이름 옆에 있는 전환 버튼을 클릭합니다.

gcloud

Airflow 1.10.12 이하에서는 pause Airflow CLI 명령어를 실행하세요.

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    pause -- DAG_ID

Airflow 2를 포함하여 Airflow 1.10.14 이상에서는 dags pause Airflow CLI 명령어를 실행합니다.

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags pause -- DAG_ID

다음을 바꿉니다.

  • ENVIRONMENT_NAME: 환경의 이름입니다.
  • LOCATION: 환경이 위치한 리전
  • DAG_ID: DAG 이름입니다.

Cloud Composer 환경에서 Airflow CLI 실행에 대한 상세 설명은 Airflow CLI 명령어 실행을 참조하세요.

사용 가능한 Airflow CLI 명령어에 대한 상세 설명은 gcloud composer environments run 명령어 참조를 확인하세요.

다음 단계