DAG 내 태스크 그룹화

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

이 페이지에서는 다음 설계 패턴을 사용하여 Airflow 파이프라인에서 태스크를 그룹화하는 방법을 설명합니다.

  • DAG 그래프의 태스크 그룹화
  • 상위 DAG에서 하위 DAG 트리거
  • TaskGroup 연산자로 태스크 그룹화

DAG 그래프의 태스크 그룹화

파이프라인의 특정 단계에서 태스크를 그룹화하려면 DAG 파일의 태스크 사이의 관계를 사용하면 됩니다.

다음 예시를 참조하세요.

브랜치 태스크를 보여주는 Airflow 태스크 그래프
그림 1. Airflow DAG에서 태스크를 그룹화할 수 있습니다 (확대하려면 클릭).

이 워크플로에서 op-1 태스크와 op-2 태스크는 초기 태스크 start 후에 함께 실행됩니다. 이렇게 하려면 태스크를 start >> [task_1, task_2] 문과 함께 그룹화하면 됩니다.

다음 예는 이 DAG의 전체 구현을 보여줍니다.

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "all_tasks_in_one_dag"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    start = DummyOperator(task_id="start")

    task_1 = BashOperator(task_id="op-1", bash_command=":", dag=dag)

    task_2 = BashOperator(task_id="op-2", bash_command=":", dag=dag)

    some_other_task = DummyOperator(task_id="some-other-task")

    task_3 = BashOperator(task_id="op-3", bash_command=":", dag=dag)

    task_4 = BashOperator(task_id="op-4", bash_command=":", dag=dag)

    end = DummyOperator(task_id="end")

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

상위 DAG에서 하위 DAG 트리거

TriggerDagRunOperator 연산자를 사용하여 다른 DAG에서 하나의 DAG를 트리거할 수 있습니다.

다음 예시를 참조하세요.

DAG 그래프의 일부로 트리거된 하위 DAG를 보여주는 Airflow 태스크 그래프
그림 2. TriggerDagRunOperator를 사용해 DAG 내에서 DAG를 트리거할 수 있습니다 (확대하려면 클릭).

이 워크플로에서 dag_1 블록과 dag_2 블록은 Cloud Composer 환경의 개별 DAG로 그룹화된 일련의 태스크를 나타냅니다.

이 워크플로를 구현하려면 DAG 파일 두 개가 필요합니다. 제어 DAG 파일은 다음과 같습니다.

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago


with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(task_id="start")

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

제어 DAG에서 트리거하는 하위 DAG의 구현은 다음과 같습니다.

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "dag-to-trigger"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    dag_task = DummyOperator(task_id="dag-task")

DAG가 작동하려면 Cloud Composer 환경에 두 DAG 파일을 업로드해야 합니다.

TaskGroup 연산자로 태스크 그룹화

TaskGroup 연산자를 사용하여 DAG에서 태스크를 함께 그룹화할 수 있습니다. TaskGroup 블록 내에 정의된 태스크는 여전히 기본 DAG의 일부입니다.

다음 예시를 참조하세요.

두 개의 태스크 그룹을 보여주는 Airflow 태스크 그래프
그림 3. TaskGroup 연산자를 사용하여 UI에서 태스크를 시각적으로 그룹화할 수 있습니다 (확대하려면 클릭).

op-1 태스크와 op-2 태스크는 ID가 taskgroup_1인 블록으로 그룹화됩니다. 이 워크플로 구현은 다음 코드와 비슷합니다.

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
    start = DummyOperator(task_id="start")

    with TaskGroup("taskgroup_1", tooltip="task group #1") as section_1:
        task_1 = BashOperator(task_id="op-1", bash_command=":")
        task_2 = BashOperator(task_id="op-2", bash_command=":")

    with TaskGroup("taskgroup_2", tooltip="task group #2") as section_2:
        task_3 = BashOperator(task_id="op-3", bash_command=":")
        task_4 = BashOperator(task_id="op-4", bash_command=":")

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> section_1 >> some_other_task >> section_2 >> end

다음 단계