Run a data analytics DAG in Google Cloud

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

This tutorial shows how to use Cloud Composer to create an Apache Airflow DAG. The DAG joins data from a BigQuery public dataset and a CSV file stored in a Cloud Storage bucket and then runs a Dataproc Serverless batch job to process the joined data.

The BigQuery public dataset in this tutorial is ghcn_d, an integrated database of climate summaries across the globe. The CSV file contains information about the dates and names of US holidays from 1997 to 2021.

The question we want to answer using the DAG is: "How warm was it in Chicago on Thanksgiving for the past 25 years?"

Objectives

  • Create a Cloud Composer environment in the default configuration
  • Create an empty BigQuery dataset
  • Create a new Cloud Storage bucket
  • Create and run a DAG that includes the following tasks:
    • Load an external dataset from Cloud Storage to BigQuery
    • Join two datasets in BigQuery
    • Run a data analytics PySpark job

Before you begin

Enable APIs

Enable the following APIs:

Console

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.

Enable the APIs

gcloud

Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:

gcloud services enable dataproc.googleapis.com  composer.googleapis.com  bigquery.googleapis.com  storage.googleapis.com

Grant permissions

Grant the following roles and permissions to your user account:

Create and prepare your Cloud Composer environment

  1. Create a Cloud Composer environment with default parameters:

  2. Grant the following roles to the service account used in your Cloud Composer environment in order for the Airflow workers to successfully run DAG tasks:

    • BigQuery User (roles/bigquery.user)
    • BigQuery Data Owner (roles/bigquery.dataOwner)
    • Service Account User (roles/iam.serviceAccountUser)
    • Dataproc Editor (roles/dataproc.editor)
    • Dataproc Worker (roles/dataproc.worker)
  1. Create an empty BigQuery dataset with the following parameters:

    • Name: holiday_weather
    • Region: US
  2. Create a new Cloud Storage bucket in the US multiregion.

  3. Run the following command to enable private Google access on the default subnet in the region where you would like to run Dataproc Serverless to fulfill networking requirements. We recommend using the same region as your Cloud Composer environment.

    gcloud compute networks subnets update default \
        --region DATAPROC_SERVERLESS_REGION \
        --enable-private-ip-google-access
    

Data processing using Dataproc Serverless

Explore the example PySpark Job

The code shown below is an example PySpark job that converts temperature from tenths of a degree in Celsius to degrees Celsius. This job converts temperature data from the dataset into a different format.

import sys


from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import col


if __name__ == "__main__":
    BUCKET_NAME = sys.argv[1]
    READ_TABLE = sys.argv[2]
    WRITE_TABLE = sys.argv[3]

    # Create a SparkSession, viewable via the Spark UI
    spark = SparkSession.builder.appName("data_processing").getOrCreate()

    # Load data into dataframe if READ_TABLE exists
    try:
        df = spark.read.format("bigquery").load(READ_TABLE)
    except Py4JJavaError as e:
        raise Exception(f"Error reading {READ_TABLE}") from e

    # Convert temperature from tenths of a degree in celsius to degrees celsius
    df = df.withColumn("value", col("value") / 10)
    # Display sample of rows
    df.show(n=20)

    # Write results to GCS
    if "--dry-run" in sys.argv:
        print("Data will not be uploaded to BigQuery")
    else:
        # Set GCS temp location
        temp_path = BUCKET_NAME

        # Saving the data to BigQuery using the "indirect path" method and the spark-bigquery connector
        # Uses the "overwrite" SaveMode to ensure DAG doesn't fail when being re-run
        # See https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
        # for other save mode options
        df.write.format("bigquery").option("temporaryGcsBucket", temp_path).mode(
            "overwrite"
        ).save(WRITE_TABLE)
        print("Data written to BigQuery")

Upload supporting files to Cloud Storage

To upload the PySpark file and the dataset stored in holidays.csv:

  1. Save data_analytics_process.py to your local machine.

  2. Save holidays.csv to your local machine.

  3. In the Google Cloud console go to the Cloud Storage browser page:

    Go to Cloud Storage browser

  4. Click the name of the bucket you created earlier.

  5. In the Objects tab for the bucket, click the Upload files button, select data_analytics_process.py and holidays.csv in the dialog that appears, and click Open.

Data analytics DAG

Explore the example DAG

The DAG uses multiple operators to transform and unify the data:

  • The GCSToBigQueryOperator ingests the holidays.csv file from Cloud Storage to a new table in the BigQuery holidays_weather dataset you created earlier.

  • The DataprocCreateBatchOperator creates and runs a PySpark batch job using Dataproc Serverless.

  • The BigQueryInsertJobOperator joins the data from holidays.csv on the "Date" column with weather data from the BigQuery public dataset ghcn_d. The BigQueryInsertJobOperator tasks are dynamically generated using a for loop, and these tasks are in a TaskGroup for better readability in the Graph View of the Airflow UI.

import datetime

from airflow import models
from airflow.providers.google.cloud.operators import dataproc
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"

# BigQuery configs
BQ_DESTINATION_DATASET_NAME = "holiday_weather"
BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
BQ_NORMALIZED_TABLE_NAME = "holidays_weather_normalized"

# Dataproc configs
BUCKET_NAME = "{{var.value.gcs_bucket}}"
PROCESSING_PYTHON_FILE = f"gs://{BUCKET_NAME}/data_analytics_process.py"

BATCH_ID = "data-processing-{{ ts_nodash | lower}}"  # Dataproc serverless only allows lowercase characters
BATCH_CONFIG = {
    "runtime_config": {"version": "1.1"},
    "pyspark_batch": {
        "main_python_file_uri": PROCESSING_PYTHON_FILE,
        "args": [
            BUCKET_NAME,
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}",
            f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}",
        ],
    },
    "environment_config": {
        "execution_config": {
            "service_account": "{{var.value.dataproc_service_account}}"
        }
    },
}

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()
)

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    "start_date": yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    "email_on_failure": False,
    "email_on_retry": False,
}

with models.DAG(
    "data_analytics_dag",
    # Continue to run DAG once per day
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args,
) as dag:
    create_batch = dataproc.DataprocCreateBatchOperator(
        task_id="create_batch",
        project_id=PROJECT_NAME,
        region="{{ var.value.gce_region }}",
        batch=BATCH_CONFIG,
        batch_id=BATCH_ID,
    )
    # This data is static and it is safe to use WRITE_TRUNCATE
    # to reduce chance of 409 duplicate errors
    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=BUCKET_NAME,
        source_objects=["holidays.csv"],
        destination_project_dataset_table=f"{BQ_DESTINATION_DATASET_NAME}.holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    with TaskGroup("join_bq_datasets") as bq_join_group:
        for year in range(1997, 2022):
            # BigQuery configs
            BQ_DATASET_NAME = f"bigquery-public-data.ghcn_d.ghcnd_{str(year)}"
            BQ_DESTINATION_TABLE_NAME = "holidays_weather_joined"
            # Specifically query a Chicago weather station
            WEATHER_HOLIDAYS_JOIN_QUERY = f"""
            SELECT Holidays.Date, Holiday, id, element, value
            FROM `{PROJECT_NAME}.holiday_weather.holidays` AS Holidays
            JOIN (SELECT id, date, element, value FROM {BQ_DATASET_NAME} AS Table WHERE Table.element="TMAX" AND Table.id="USW00094846") AS Weather
            ON Holidays.Date = Weather.Date;
            """

            # for demo purposes we are using WRITE_APPEND
            # but if you run the DAG repeatedly it will continue to append
            # Your use case may be different, see the Job docs
            # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job
            # for alternative values for the writeDisposition
            # or consider using partitioned tables
            # https://cloud.google.com/bigquery/docs/partitioned-tables
            bq_join_holidays_weather_data = BigQueryInsertJobOperator(
                task_id=f"bq_join_holidays_weather_data_{str(year)}",
                configuration={
                    "query": {
                        "query": WEATHER_HOLIDAYS_JOIN_QUERY,
                        "useLegacySql": False,
                        "destinationTable": {
                            "projectId": PROJECT_NAME,
                            "datasetId": BQ_DESTINATION_DATASET_NAME,
                            "tableId": BQ_DESTINATION_TABLE_NAME,
                        },
                        "writeDisposition": "WRITE_APPEND",
                    }
                },
                location="US",
            )

        load_external_dataset >> bq_join_group >> create_batch

Use the Airflow UI to add variables

In Airflow, variables are an universal way to store and retrieve arbitrary settings or configurations as a simple key value store. This DAG uses Airflow variables to store common values. To add them to your environment:

  1. Access the Airflow UI from the Cloud Composer console.

  2. Go to Admin > Variables.

  3. Add the following variables:

    • gcp_project: your project ID.

    • gcs_bucket: the name of the bucket you created earlier (without the gs:// prefix).

    • gce_region: the region where you want your Dataproc job that meets Dataproc Serverless networking requirements. This is the region where you enabled private Google access earlier.

    • dataproc_service_account: the service account for your Cloud Composer environment. You can find this service account on the environment configuration tab for your Cloud Composer environment.

Upload the DAG to your environment's bucket

Cloud Composer schedules DAGs that are located in the /dags folder in your environment's bucket. To upload the DAG using the Google Cloud console:

  1. On your local machine, save data_analytics_dag.py.

  2. In Google Cloud console, go to the Environments page.

    Go to Environments

  3. In the list of environments, in the DAG folder column click the DAGs link. The DAGs folder of your environment opens.

  4. Click Upload files.

  5. Select data_analytics_dag.py on your local machine and click Open.

Trigger the DAG

  1. In your Cloud Composer environment, click the DAGs tab.

  2. Click into DAG id data_analytics_dag.

  3. Click Trigger DAG.

  4. Wait about five to ten minutes until you see a green check indicating the tasks have been completed successfully.

Validate the DAG's success

  1. In Google Cloud console, go to the BigQuery page.

    Go to BigQuery

  2. In the Explorer panel, click your project name.

  3. Click holidays_weather_joined.

  4. Click preview to view the resulting table. Note that the numbers in the value column are in tenths of a degree Celsius.

  5. Click holidays_weather_normalized.

  6. Click preview to view the resulting table. Note that the numbers in the value column are in degree Celsius.

Deep dive with Dataproc Serverless (optional)

You can try an advanced version of this DAG with more complex PySpark data processing flow. See Dataproc extension for the Data Analytics Example on GitHub.

Cleanup

Delete individual resources that you created for this tutorial:

What's next