圖 2. 您可以使用 TriggerDagRunOperator 在 DAG 中觸發 DAG (按一下可放大)
在這項工作流程中,區塊 dag_1 和 dag_2 代表一系列工作,這些工作會在 Cloud Composer 環境中以獨立的 DAG 分組。
實作這個工作流程需要兩個不同的 DAG 檔案。控制的 DAG 檔案如下所示:
fromairflowimportDAGfromairflow.operators.dummyimportDummyOperatorfromairflow.operators.trigger_dagrunimportTriggerDagRunOperatorfromairflow.utils.datesimportdays_agowithDAG(dag_id="controller_dag_to_trigger_other_dags",default_args={"owner":"airflow"},start_date=days_ago(1),schedule_interval="@once",)asdag:start=DummyOperator(task_id="start")trigger_1=TriggerDagRunOperator(task_id="dag_1",trigger_dag_id="dag-to-trigger",# Ensure this equals the dag_id of the DAG to triggerconf={"message":"Hello World"},)trigger_2=TriggerDagRunOperator(task_id="dag_2",trigger_dag_id="dag-to-trigger",# Ensure this equals the dag_id of the DAG to triggerconf={"message":"Hello World"},)some_other_task=DummyOperator(task_id="some-other-task")end=DummyOperator(task_id="end")start >> trigger_1 >> some_other_task >> trigger_2 >> end
任務 op-1 和 op-2 會在 ID taskgroup_1 的區塊中一同分組。這個工作流程的實作內容如下程式碼所示:
fromairflow.models.dagimportDAGfromairflow.operators.bashimportBashOperatorfromairflow.operators.dummyimportDummyOperatorfromairflow.utils.datesimportdays_agofromairflow.utils.task_groupimportTaskGroupwithDAG(dag_id="taskgroup_example",start_date=days_ago(1))asdag:start=DummyOperator(task_id="start")withTaskGroup("taskgroup_1",tooltip="task group #1")assection_1:task_1=BashOperator(task_id="op-1",bash_command=":")task_2=BashOperator(task_id="op-2",bash_command=":")withTaskGroup("taskgroup_2",tooltip="task group #2")assection_2:task_3=BashOperator(task_id="op-3",bash_command=":")task_4=BashOperator(task_id="op-4",bash_command=":")some_other_task=DummyOperator(task_id="some-other-task")end=DummyOperator(task_id="end")start >> section_1 >> some_other_task >> section_2 >> end
[[["容易理解","easyToUnderstand","thumb-up"],["確實解決了我的問題","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["難以理解","hardToUnderstand","thumb-down"],["資訊或程式碼範例有誤","incorrectInformationOrSampleCode","thumb-down"],["缺少我需要的資訊/範例","missingTheInformationSamplesINeed","thumb-down"],["翻譯問題","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],["上次更新時間:2025-06-16 (世界標準時間)。"],[[["This document outlines methods for grouping tasks within Airflow pipelines, covering approaches like structuring relationships in the DAG graph, triggering child DAGs from a parent DAG, and utilizing the `TaskGroup` operator."],["Grouping tasks directly in the DAG graph is achieved by defining relationships between tasks, demonstrated with the syntax `start \u003e\u003e [task_1, task_2]`, which executes `task_1` and `task_2` concurrently after `start`."],["Parent DAGs can trigger child DAGs using the `TriggerDagRunOperator`, requiring the `trigger_dag_id` to match the `dag_id` of the child DAG."],["The `TaskGroup` operator allows for grouping tasks within a DAG, which provides a visual organization in the Airflow UI and simplifies complex workflows."],["It is recommended to avoid using SubDAGs for grouping tasks due to performance and functional issues; the document presents superior alternative methods for structuring workflows."]]],[]]