图 2. 可以使用 TriggerDagRunOperator 从 DAG 中触发 DAG(点击可放大)
在此工作流中,dag_1 和 dag_2 块表示一系列任务,这些任务分组到 Cloud Composer 环境中的不同 DAG 中。
此工作流的实现需要两个独立的 DAG 文件。控制 DAG 文件如下所示:
fromairflowimportDAGfromairflow.operators.dummyimportDummyOperatorfromairflow.operators.trigger_dagrunimportTriggerDagRunOperatorfromairflow.utils.datesimportdays_agowithDAG(dag_id="controller_dag_to_trigger_other_dags",default_args={"owner":"airflow"},start_date=days_ago(1),schedule_interval="@once",)asdag:start=DummyOperator(task_id="start")trigger_1=TriggerDagRunOperator(task_id="dag_1",trigger_dag_id="dag-to-trigger",# Ensure this equals the dag_id of the DAG to triggerconf={"message":"Hello World"},)trigger_2=TriggerDagRunOperator(task_id="dag_2",trigger_dag_id="dag-to-trigger",# Ensure this equals the dag_id of the DAG to triggerconf={"message":"Hello World"},)some_other_task=DummyOperator(task_id="some-other-task")end=DummyOperator(task_id="end")start >> trigger_1 >> some_other_task >> trigger_2 >> end
任务 op-1 和 op-2 分组到一个 ID 为 taskgroup_1 的块中。此工作流的实现类似于以下代码:
fromairflow.models.dagimportDAGfromairflow.operators.bashimportBashOperatorfromairflow.operators.dummyimportDummyOperatorfromairflow.utils.datesimportdays_agofromairflow.utils.task_groupimportTaskGroupwithDAG(dag_id="taskgroup_example",start_date=days_ago(1))asdag:start=DummyOperator(task_id="start")withTaskGroup("taskgroup_1",tooltip="task group #1")assection_1:task_1=BashOperator(task_id="op-1",bash_command=":")task_2=BashOperator(task_id="op-2",bash_command=":")withTaskGroup("taskgroup_2",tooltip="task group #2")assection_2:task_3=BashOperator(task_id="op-3",bash_command=":")task_4=BashOperator(task_id="op-4",bash_command=":")some_other_task=DummyOperator(task_id="some-other-task")end=DummyOperator(task_id="end")start >> section_1 >> some_other_task >> section_2 >> end
[[["易于理解","easyToUnderstand","thumb-up"],["解决了我的问题","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["很难理解","hardToUnderstand","thumb-down"],["信息或示例代码不正确","incorrectInformationOrSampleCode","thumb-down"],["没有我需要的信息/示例","missingTheInformationSamplesINeed","thumb-down"],["翻译问题","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],["最后更新时间 (UTC):2025-08-04。"],[[["\u003cp\u003eThis document outlines three methods for grouping tasks within Airflow pipelines in Cloud Composer 3: grouping tasks in the DAG graph, triggering child DAGs from a parent DAG, and using the \u003ccode\u003eTaskGroup\u003c/code\u003e operator.\u003c/p\u003e\n"],["\u003cp\u003eGrouping tasks in the DAG graph involves defining relationships between tasks, demonstrated by the example \u003ccode\u003estart >> [task_1, task_2]\u003c/code\u003e, which executes \u003ccode\u003etask_1\u003c/code\u003e and \u003ccode\u003etask_2\u003c/code\u003e concurrently after \u003ccode\u003estart\u003c/code\u003e.\u003c/p\u003e\n"],["\u003cp\u003eParent DAGs can trigger child DAGs using the \u003ccode\u003eTriggerDagRunOperator\u003c/code\u003e, requiring separate DAG files for the parent and each child, with the \u003ccode\u003etrigger_dag_id\u003c/code\u003e in the parent matching the child's \u003ccode\u003edag_id\u003c/code\u003e.\u003c/p\u003e\n"],["\u003cp\u003eThe \u003ccode\u003eTaskGroup\u003c/code\u003e operator allows for visually grouping tasks together within a DAG, where tasks within the \u003ccode\u003eTaskGroup\u003c/code\u003e block are still part of the main DAG, as demonstrated with the \u003ccode\u003etaskgroup_1\u003c/code\u003e and \u003ccode\u003etaskgroup_2\u003c/code\u003e examples.\u003c/p\u003e\n"],["\u003cp\u003eThe use of SubDAGs for grouping tasks is strongly discouraged due to frequent performance and functional issues, and is instead recommended to use the other three methods that are listed.\u003c/p\u003e\n"]]],[],null,[]]