Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
This page demonstrates how to implement a DAG that triggers DAGs in other Cloud Composer environments and projects by using Airflow operators for Cloud Composer.
If you want to trigger DAGs in your environment instead, see Schedule and trigger DAGs.
Configure IAM permissions
If the target environment is located in another project, then the service account of your environment needs roles that allows interacting with environments in that project.
Project | Resource | Principal | Role |
---|---|---|---|
Project where the target environment is located | Project | Environment's service account of the source environment |
Composer Worker role (composer.worker ) |
Project where the target environment is located | Project | Environment's service account of the source environment |
A custom role with the
composer.environment.executeAirflowCommand permission |
Trigger a DAG in another environment
The example DAG described in this section does the following:
- Trigger a DAG in another Cloud Composer environment.
- Checks if a DAG run is completed.
After the DAG run in another environment is completed, the example DAG is marked as successful.
Run Airflow CLI commands with CloudComposerRunAirflowCLICommandOperator
You can use the
CloudComposerRunAirflowCLICommandOperator
operator to run Airflow CLI commands in another Cloud Composer
environment. The example DAG executes the dags trigger
command, which
triggers a DAG.
This operator can run in the deferrable mode, you
can enable it by setting the deferrable
parameter to True
.
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
command="dags trigger -- target_dag",
# You can run this operator in the deferrable mode:
# deferrable=True
)
Check if a DAG run is completed
You can use the CloudComposerDAGRunSensor sensor to checks if a DAG run is completed in another Cloud Composer environment.
This sensor can run in the deferrable mode, you can
enable it by setting the deferrable
parameter to True
.
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id="target-project",
environment_id="target-composer-environment",
region="us-central1",
composer_dag_id="target_dag",
allowed_states=["success"],
# You can run this sensor in the deferrable mode:
# deferrable=True
)
Full example code
The following is the full code example of a DAG that combines the two previously described tasks.
from datetime import datetime, timedelta
from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.cloud_composer import (
CloudComposerRunAirflowCLICommandOperator,
)
from airflow.providers.google.cloud.sensors.cloud_composer import CloudComposerDAGRunSensor
DAG_ID = "trigger_dag_in_another_composer_environment"
TARGET_PROJECT_ID = "example-target-project"
TARGET_REGION = "example-target-region"
TARGET_ENV_ID = "example-target-composer-environment"
TARGET_DAG = "example_target_dag_id"
COMMAND = f"dags trigger -- {TARGET_DAG}"
with DAG(
DAG_ID,
schedule="@once",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["example", "composer"],
) as dag:
run_airflow_cli_cmd = CloudComposerRunAirflowCLICommandOperator(
task_id="run_airflow_cli_cmd",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
command=COMMAND,
# You can run this operator in the deferrable mode:
# deferrable=True
)
dag_run_sensor = CloudComposerDAGRunSensor(
task_id="dag_run_sensor",
project_id=TARGET_PROJECT_ID,
environment_id=TARGET_ENV_ID,
region=TARGET_REGION,
composer_dag_id=TARGET_DAG,
allowed_states=["success"],
execution_range=timedelta(minutes=5),
# You can run this sensor in the deferrable mode:
# deferrable=True
)
run_airflow_cli_cmd >> dag_run_sensor