Testing Airflow DAGs

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Before deploying DAGs to production, you can execute Airflow CLI sub-commands to parse DAG code in the same context under which the DAG is executed.

Testing during DAG creation

You can run a single task instance locally and view the log output. Viewing the output enables you to check for syntax and task errors. Testing locally does not check dependencies or communicate status to the database.

We recommend that you put the DAGs in a data/test folder in your test environment.

Create a test directory

In your environment's bucket, create a test directory and copy your DAGs to it.

gcloud storage cp BUCKET_NAME/dags \
  BUCKET_NAME/data/test --recursive

Replace the following:

  • BUCKET_NAME: the name of the bucket associated with your Cloud Composer environment.

Example:

gcloud storage cp gs://us-central1-example-environment-a12bc345-bucket/dags \
  gs://us-central1-example-environment-a12bc345-bucket/data/test --recursive

For more information about uploading DAGs, see Add and update DAGs.

Check for syntax errors

To check for syntax errors in DAGs that you uploaded to the /data/test folder, enter the following gcloud command:

Airflow 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  dags list -- --subdir /home/airflow/gcs/data/test

Airflow 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  list_dags -- -sd /home/airflow/gcs/data/test

Replace the following:

  • ENVIRONMENT_NAME: the name of the environment.
  • ENVIRONMENT_LOCATION: the region where the environment is located.

Check for task errors

To check for task-specific errors in DAGs that you uploaded to the /data/test folder, run the following gcloud command:

Airflow 2

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  tasks test -- --subdir /home/airflow/gcs/data/test \
  DAG_ID TASK_ID \
  DAG_EXECUTION_DATE

Airflow 1

gcloud composer environments run \
  ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  test -- -sd /home/airflow/gcs/data/test DAG_ID \
  TASK_ID DAG_EXECUTION_DATE

Replace the following:

  • ENVIRONMENT_NAME: the name of the environment.
  • ENVIRONMENT_LOCATION: the region where the environment is located.
  • DAG_ID: the ID of the DAG.
  • TASK_ID: the ID of the task.
  • DAG_EXECUTION_DATE: the execution date of the DAG. This date is used for templating purposes. Regardless of the date you specify here, the DAG runs immediately.

Example:

Airflow 2

gcloud composer environments run \
  example-environment \
  --location us-central1 \
  tasks test -- --subdir /home/airflow/gcs/data/test \
  hello_world print_date 2021-04-22

Airflow 1

gcloud composer environments run example-environment \
  --location us-central1 \
  test -- -sd /home/airflow/gcs/data/test \
  hello_world print_date 2021-04-22

Updating and testing a deployed DAG

To test updates to your DAGs in your test environment:

  1. Copy the deployed DAG that you want to update to data/test.
  2. Update the DAG.
  3. Test the DAG.
    1. Check for syntax errors.
    2. Check for task-specific errors.
  4. Make sure the DAG runs successfully.
  5. Turn off the DAG in your test environment.
    1. Go to the Airflow UI > DAGs page.
    2. If the DAG you're modifying runs constantly, turn off the DAG.
    3. To expedite outstanding tasks, click the task and Mark Success.
  6. Deploy the DAG to your production environment.
    1. Turn off the DAG in your production environment.
    2. Upload the updated DAG to the dags/ folder in your production environment.

FAQs for testing DAGs

How do I isolate DAG runs in my production and test environments?

For example, Airflow has a global repository of source code in the dags/ folder that all DAG runs share. You want to update source code in production or test without interfering with running DAGs.

Airflow does not provide strong DAG isolation. We recommend that you maintain separate production and test Cloud Composer environments to prevent your test DAGs from interfering with your production DAGs.

How do I avoid DAG interference when I run integration tests from different GitHub branches

Use unique task names to prevent interference. For example, you can prefix your task IDs with the branch name.

What is a best practice for integration testing with Airflow?

We recommend that you use a dedicated environment for integration testing with Airflow. One way to signal the DAG run success is to write into a file in a Cloud Storage folder and then check the content in your own integration test cases.

How do I collaborate efficiently with other DAG contributors?

Each contributor can have a subdirectory in the data/ folder for development.

DAGs added to the data/ folder are not picked up automatically by the Airflow scheduler or web server

DAG contributors can create manual DAG runs by using the gcloud composer environments run command and the test sub-command with the --subdir flag to specify the contributor's development directory.

For example:

Airflow 2

gcloud composer environments run test-environment-name \
  tasks test -- dag-id task-id execution-date \
  --subdir /home/airflow/gcs/data/alice_dev

Airflow 1

gcloud composer environments run test-environment-name \
  test -- dag-id task-id execution-date \
  --subdir /home/airflow/gcs/data/alice_dev

How do I keep my deployment and production environments in sync?

To manage access:

To deploy from development to production:

  • Ensure consistent configuration, such as environment variables and PyPI packages.

  • Ensure consistent DAG arguments. To avoid hard-coding, we recommend that you use Airflow macros and variables.

    For example:

    Airflow 2

    gcloud composer environments run test-environment-name \
      variables set -- DATA_ENDPOINT_KEY DATA_ENDPOINT_VALUE
    

    Airflow 1

    gcloud composer environments run test-environment-name \
      variables -- --set DATA_ENDPOINT_KEY DATA_ENDPOINT_VALUE
    

What's next