DAG のタスクをグループ化する

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このページでは、次の設計パターンを使用して Airflow パイプラインでタスクをグループ化する方法について説明します。

  • DAG グラフ内のタスクのグループ化。
  • 親 DAG からの子 DAG のトリガー。
  • TaskGroup 演算子を使用したタスクのグループ化(Airflow 2 のみ)。

DAG グラフ内のタスクのグループ化。

パイプラインの特定のフェーズでタスクをグループ化するには、DAG ファイルのタスク間の関係を使用します。

次に例を示します。

タスクは Airflow DAG でグループ化できます。

このワークフローでは、タスク op-1op-2 が最初のタスク start の後に一緒に実行されます。これを実現するには、ステートメント start >> [task_1, task_2] を使用してタスクをグループ化します。

次のコードは、前述の DAG の完全な実装を示しています。

Airflow 2

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "all_tasks_in_one_dag"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    start = DummyOperator(task_id="start")

    task_1 = BashOperator(task_id="op-1", bash_command=":", dag=dag)

    task_2 = BashOperator(task_id="op-2", bash_command=":", dag=dag)

    some_other_task = DummyOperator(task_id="some-other-task")

    task_3 = BashOperator(task_id="op-3", bash_command=":", dag=dag)

    task_4 = BashOperator(task_id="op-4", bash_command=":", dag=dag)

    end = DummyOperator(task_id="end")

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

Airflow 1


from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "all_tasks_in_one_dag"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "@once"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    start = DummyOperator(task_id="start")

    task_1 = BashOperator(task_id="op-1", bash_command=":", dag=dag)

    task_2 = BashOperator(task_id="op-2", bash_command=":", dag=dag)

    some_other_task = DummyOperator(task_id="some-other-task")

    task_3 = BashOperator(task_id="op-3", bash_command=":", dag=dag)

    task_4 = BashOperator(task_id="op-4", bash_command=":", dag=dag)

    end = DummyOperator(task_id="end")

    start >> [task_1, task_2] >> some_other_task >> [task_3, task_4] >> end

親 DAG からの子 DAG のトリガー

TriggerDagRunOperator を使用して、別の DAG から 1 つの DAG をトリガーできます。Airflow のバージョンに応じて、TriggerDagRunOperator 演算子は別のモジュールにあります。

次に例を示します。

TriggerDagRunOperator を使用して DAG 内で DAG をトリガーできます

このワークフローでは、ブロック dag_1dag_2 は、Cloud Composer 環境の別の DAG でグループ化された一連のタスクを表します。

このワークフローを実装するには、2 つの個別の DAG ファイルが必要です。制御 DAG ファイルは次のようになります。

Airflow 2

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago


with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(task_id="start")

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

Airflow 1

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago


with DAG(
    dag_id="controller_dag_to_trigger_other_dags",
    default_args={"owner": "airflow"},
    start_date=days_ago(1),
    schedule_interval="@once",
) as dag:
    start = DummyOperator(task_id="start")

    trigger_1 = TriggerDagRunOperator(
        task_id="dag_1",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )
    trigger_2 = TriggerDagRunOperator(
        task_id="dag_2",
        trigger_dag_id="dag-to-trigger",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
    )

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> trigger_1 >> some_other_task >> trigger_2 >> end

子 DAG の実装は、次に示すように、制御 DAG によってトリガーされます。

Airflow 2

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = "dag-to-trigger"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    dag_task = DummyOperator(task_id="dag-task")

Airflow 1

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago


DAG_NAME = "dag-to-trigger"

args = {"owner": "airflow", "start_date": days_ago(1), "schedule_interval": "None"}

with DAG(dag_id=DAG_NAME, default_args=args) as dag:
    dag_task = DummyOperator(task_id="dag-task")

ワークフローが機能するには、Cloud Composer 環境に両方の DAG ファイルをアップロードする必要があります。

TaskGroup 演算子を使用したタスクのグループ化

Airflow 2 では、TaskGroup 演算子を使用して DAG 内でタスクをグループ化できます。TaskGroup ブロック内で定義されたタスクは、引き続きメイン DAG の一部です。

次に例を示します。

Airflow 2 の TaskGroup 演算子を使用すると、UI でタスクを視覚的にグループ化できます。

タスク op-1op-2 は、ID taskgroup_1 のブロックにグループ化されています。このワークフローの実装は次のコードのようになります。

from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.utils.task_group import TaskGroup

with DAG(dag_id="taskgroup_example", start_date=days_ago(1)) as dag:
    start = DummyOperator(task_id="start")

    with TaskGroup("taskgroup_1", tooltip="task group #1") as section_1:
        task_1 = BashOperator(task_id="op-1", bash_command=":")
        task_2 = BashOperator(task_id="op-2", bash_command=":")

    with TaskGroup("taskgroup_2", tooltip="task group #2") as section_2:
        task_3 = BashOperator(task_id="op-3", bash_command=":")
        task_4 = BashOperator(task_id="op-4", bash_command=":")

    some_other_task = DummyOperator(task_id="some-other-task")

    end = DummyOperator(task_id="end")

    start >> section_1 >> some_other_task >> section_2 >> end