Schedule and trigger Airflow DAGs

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

This page explains how scheduling and DAG triggering works in Airflow, how to define a schedule for a DAG, and how to trigger a DAG manually or pause it.

About Airflow DAGs in Cloud Composer

Airflow DAGs in Cloud Composer are executed in one or more Cloud Composer environments in your project. You upload source files of your Airflow DAGs to a Cloud Storage bucket associated with an environment. The environment's instance of Airflow then parses these files and schedules DAG runs, as defined by each DAG's schedule. During a DAG run, Airflow schedules and executes individual tasks that make up a DAG in a sequence defined by the DAG.

To learn more about Airflow's core concepts such as Airflow DAGs, DAG runs, tasks, or operators, see the Core Concepts page in the Airflow documentation.

About DAG scheduling in Airflow

Airflow provides the following concepts for its scheduling mechanism:

Logical date

Represents a date for which a particular DAG run is executed.

This is not the actual date when Airflow runs a DAG, but a period of time that a particular DAG run must process. For example, for a DAG that is scheduled to run every day at 12:00, the logical date would also be 12:00 on a specific day. Since it runs twice per day, the period of time it must process is the past 12 hours. At the same time, the logic defined in the DAG itself might not use the logical date or the time interval at all. For example, a DAG might run the same script once per day without using the value of the logical date.

In Airflow versions earlier than 2.2, this date is called execution date.

Run date

Represents a date when a particular DAG run is executed.

For example, for a DAG that is scheduled to run every day at 12:00 the actual execution of the DAG might happen on 12:05, some time after the logical date passes.

Schedule interval

Represents when and how often a DAG must be executed, in terms of logical dates.

For example, a daily schedule means that a DAG is executed once per day, and the logical dates for its DAG runs have 24 hour intervals.

Start date

Specifies when you want Airflow to start scheduling your DAG.

Tasks in your DAG can have individual start dates, or you can specify a single start date for all tasks. Based on the minimum start date for tasks in your DAG and on the schedule interval, Airflow schedules DAG runs.

Catchup, backfill, and retries

Mechanisms for executing DAG runs for past dates.

Catchup executes DAG runs that didn't run yet, for example, if the DAG was paused for a long period of time and then unpaused. You can use backfill to execute DAG runs for a certain range of dates. Retries specify how many attempts Airflow must make when executing tasks from a DAG.

Scheduling works in the following way:

  1. After the start date passes, Airflow waits for the next occurrence of the schedule interval.

  2. Airflow schedules the first DAG run to happen at the end of this schedule interval.

    For example, if a DAG is scheduled to run every hour and the start date is at 12:00 today, then the first DAG run happens at 13:00 today.

The Schedule an Airflow DAG section in this document describes how to set up scheduling for your DAGs using these concepts. For more information about DAG runs and scheduling, see DAG Runs in the Airflow documentation.

About ways to trigger a DAG

Airflow provides the following ways to trigger a DAG:

  • Trigger on a schedule. Airflow triggers the DAG automatically based on the schedule specified for it in the DAG file.

  • Trigger manually. You can trigger a DAG manually from Google Cloud console, Airflow UI, or by running an Airflow CLI command from Google Cloud CLI.

  • Trigger in response to events. The standard way to trigger a DAG in response to events is to use a sensor.

Other ways to trigger DAGs:

Before you begin

  • Make sure that your account has a role that can manage objects in the environment buckets and view and trigger DAGs. For more information, see Access control.

Schedule an Airflow DAG

You define a schedule for a DAG in the DAG file. Edit the DAG's definition in the following way:

  1. Locate and edit the DAG file on your computer. If you don't have the DAG file, you can download its copy from the environment's bucket. For a new DAG, you can define all parameters when you create the DAG file.

  2. In the schedule_interval parameter, define the schedule. You can use a Cron expression, such as 0 0 * * *, or a preset, such as @daily. For more information, see Cron and Time Intervals in the Airflow documentation.

    Airflow determines logical dates for DAG runs based on the schedule that you set.

  3. In the start_date parameter, define the start date.

    Airflow determines the logical date of the first DAG run using this parameter.

  4. (Optional) In the catchup parameter, define if Airflow must execute all previous runs of this DAG from the start date to the current date that weren't executed yet.

    DAG runs executed during the catchup will have their logical date in the past and their run date will reflect the time when the DAG run was actually executed.

  5. (Optional) In the retries parameter, define how many times Airflow must retry tasks that failed (each DAG consists of one or more individual tasks). By default, tasks in Cloud Composer are retried two times.

  6. Upload the new version of the DAG to the environment's bucket.

  7. Wait until Airflow successfully parses the DAG. For example, you can check the list of DAGs in your environment in the Google Cloud console or in the Airflow UI.

The following example DAG definition runs twice a day at 00:00 and 12:00. Its start date is set to January 1, 2024, but Airflow does not run it for past dates after you upload or pause it because the catchup is disabled.

The DAG contains one task named insert_query_job, which inserts a row into a table with the BigQueryInsertJobOperator operator. This operator is one of Google Cloud BigQuery Operators, which you can use to manage datasets and tables, run queries, and validate data. If a particular execution of this task fails, Airflow retries it four more times with the default retry interval. The logical date for these retries remains the same.

The SQL query for this row uses Airflow templates to write DAG's logical date and name to the row.

import datetime

from airflow.models.dag import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

with DAG(
  "bq_example_scheduling_dag",
  start_date=datetime.datetime(2024, 1, 1),
  schedule_interval='0 */12 * * *',
  catchup=False
  ) as dag:

  insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    retries=4,
    configuration={
        "query": {
            # schema: date (string), description (string)
            # example row: "20240101T120000", "DAG run: <DAG: bq_example_scheduling_dag>"
            "query": "INSERT example_dataset.example_table VALUES ('{{ ts_nodash }}', 'DAG run: {{ dag }}' )",
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location="us-central1"
  )

  insert_query_job

To test this DAG, you can trigger it manually and then view the task execution logs.

More scheduling parameter examples

The following scheduling parameter examples illustrate how scheduling works with different combinations of parameters:

  • If start_date is datetime(2024, 4, 4, 16, 25) and schedule_interval is 30 16 * * *, then the first DAG run happens at 16:30 on 5 April, 2024.

  • If start_date is datetime(2024, 4, 4, 16, 35) and schedule_interval is 30 16 * * *, then the first DAG run happens at 16:30 on 6 April, 2024. Because the start date is after the schedule interval on 4 April, 2024, the DAG run does not happen on 5 April, 2024. Instead, the schedule interval ends at 16:35 on 5 April, 2024, so the next DAG run is scheduled for 16:30 on the following day.

  • If start_date is datetime(2024, 4, 4), and the schedule_interval is @daily, then the first DAG run is scheduled for 00:00 on April 5, 2024.

  • If start_date is datetime(2024, 4, 4, 16, 30), and the schedule_interval is 0 * * * *, then the first DAG run is scheduled for 18:00 on April 4, 2024. After the specified date and time passes, Airflow schedules a DAG run to happen at the minute 0 of every hour. The nearest point in time when this happens is 17:00. At this time, Airflow schedules a DAG run to happen at the end of the schedule interval, that is, at 18:00.

Trigger a DAG manually

When you manually trigger an Airflow DAG, Airflow runs the DAG once, independently from the schedule specified in the DAG file.

Console

DAG UI is supported in Cloud Composer 1.17.8 and later versions.

To trigger a DAG from Google Cloud console:

  1. In the Google Cloud console, go to the Environments page.

    Go to Environments

  2. Select an environment to view its details.

  3. On the Environment details page, go to the DAGs tab.

  4. Click the name of a DAG.

  5. On the DAG details page, click Trigger DAG. A new DAG run is created.

Airflow UI

To trigger a DAG from the Airflow UI:

  1. In the Google Cloud console, go to the Environments page.

    Go to Environments

  2. In the Airflow webserver column, follow the Airflow link for your environment.

  3. Sign in with the Google Account that has the appropriate permissions.

  4. In the Airflow web interface, on the DAGs page, in the Links column for your DAG, click the Trigger Dag button.

  5. (Optional) Specify the DAG run configuration.

  6. Click Trigger.

gcloud

In Airflow 1.10.12 or earlier, run the trigger_dag Airflow CLI command:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    trigger_dag -- DAG_ID

In Airflow 1.10.14 or later, including Airflow 2, run the dags trigger Airflow CLI command:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags trigger -- DAG_ID

Replace the following:

  • ENVIRONMENT_NAME: the name of your environment.
  • LOCATION: the region where the environment is located.
  • DAG_ID: the name of the DAG.

For more information about running Airflow CLI commands in Cloud Composer environments, see Running Airflow CLI commands.

For more information about the available Airflow CLI commands, see the gcloud composer environments run command reference.

View DAG run logs and details

In the Google Cloud console, you can:

In addition, Cloud Composer provides access to the Airflow UI, which is Airflow's own web interface.

Pause a DAG

Console

DAG UI is supported in Cloud Composer 1.17.8 and later versions.

To pause a DAG from Google Cloud console:

  1. In the Google Cloud console, go to the Environments page.

    Go to Environments

  2. Select an environment to view its details.

  3. On the Environment details page, go to the DAGs tab.

  4. Click the name of a DAG.

  5. On the DAG details page, click Pause DAG.

Airflow UI

To pause a DAG from the Airflow UI:

  1. In the Google Cloud console, go to the Environments page.

Go to Environments

  1. In the Airflow webserver column, follow the Airflow link for your environment.

  2. Sign in with the Google Account that has the appropriate permissions.

  3. In the Airflow web interface, on the DAGs page, click the toggle next to the DAG's name.

gcloud

In Airflow 1.10.12 or earlier, run the pause Airflow CLI command:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    pause -- DAG_ID

In Airflow 1.10.14 or later, including Airflow 2, run the dags pause Airflow CLI command:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags pause -- DAG_ID

Replace the following:

  • ENVIRONMENT_NAME: the name of your environment.
  • LOCATION: the region where the environment is located.
  • DAG_ID: the name of the DAG.

For more information about running Airflow CLI commands in Cloud Composer environments, see Running Airflow CLI commands.

For more information about the available Airflow CLI commands, see the gcloud composer environments run command reference.

What's next