Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
This tutorial is a modification of Run a Data Analytics DAG in Google Cloud that shows how to connect your Cloud Composer environment to Microsoft Azure to utilize data stored there. It shows how to use Cloud Composer to create an Apache Airflow DAG. The DAG joins data from a BigQuery public dataset and a CSV file stored in an Azure Blob Storage and then runs a Dataproc Serverless batch job to process the joined data.
The BigQuery public dataset in this tutorial is ghcn_d, an integrated database of climate summaries across the globe. The CSV file contains information about the dates and names of US holidays from 1997 to 2021.
The question we want to answer using the DAG is: "How warm was it in Chicago on Thanksgiving for the past 25 years?"
Objectives
- Create a Cloud Composer environment in the default configuration
- Create a blob in Azure
- Create an empty BigQuery dataset
- Create a new Cloud Storage bucket
- Create and run a DAG that includes the following tasks:
- Load an external dataset from Azure Blob Storage to Cloud Storage
- Load an external dataset from Cloud Storage to BigQuery
- Join two datasets in BigQuery
- Run a data analytics PySpark job
Before you begin
Enable APIs
Enable the following APIs:
Console
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
Grant permissions
Grant the following roles and permissions to your user account:
Grant roles for managing Cloud Composer environments and environment buckets.
Grant the BigQuery Data Owner (
roles/bigquery.dataOwner
) role to create a BigQuery dataset.Grant the Storage Admin (
roles/storage.admin
) role to create a Cloud Storage bucket.
Create and prepare your Cloud Composer environment
Create a Cloud Composer environment with default parameters:
- Choose a US-based region.
- Choose the latest Cloud Composer version.
Grant the following roles to the service account used in your Cloud Composer environment in order for the Airflow workers to successfully run DAG tasks:
- BigQuery User (
roles/bigquery.user
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - Service Account User (
roles/iam.serviceAccountUser
) - Dataproc Editor (
roles/dataproc.editor
) - Dataproc Worker (
roles/dataproc.worker
)
- BigQuery User (
Create and modify related resources in Google Cloud
Install the
apache-airflow-providers-microsoft-azure
PyPI package in your Cloud Composer environment.Create an empty BigQuery dataset with the following parameters:
- Name:
holiday_weather
- Region:
US
- Name:
Create a new Cloud Storage bucket in the
US
multiregion.Run the following command to enable private Google access on the default subnet in the region where you would like to run Dataproc Serverless to fulfill networking requirements. We recommend using the same region as your Cloud Composer environment.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
Create related resources in Azure
Create a storage account with the default settings.
Get the access key and connection string for your storage account.
Create a container with default options in your newly created storage account.
Grant the Storage Blob Delegator role for the container created in the previous step.
Upload holidays.csv to create a block blob with default options in Azure portal.
Create a SAS token for the block blob you created in the previous step in the Azure portal.
- Signing method: User delegation key
- Permissions: Read
- Allowed IP address: None
- Allowed protocols: HTTPS only
Connect to Azure from Cloud Composer
Add your Microsoft Azure connection using the Airflow UI:
Go to Admin > Connections.
Create a new connection with the following configuration:
- Connection Id:
azure_blob_connection
- Connection Type:
Azure Blob Storage
- Blob Storage Login: your storage account name
- Blob Storage Key: the access key for your storage account
- Blob Storage Account Connection String: your storage account connection string
- SAS Token: the SAS token generated from your blob
- Connection Id:
Data processing using Dataproc Serverless
Explore the example PySpark Job
The code shown below is an example PySpark job that converts temperature from tenths of a degree in Celsius to degrees Celsius. This job converts temperature data from the dataset into a different format.
Upload the PySpark file to Cloud Storage
To upload the PySpark file to Cloud Storage:
Save data_analytics_process.py to your local machine.
In the Google Cloud console go to the Cloud Storage browser page:
Click the name of the bucket you created earlier.
In the Objects tab for the bucket, click the Upload files button, select
data_analytics_process.py
in the dialog that appears, and click Open.
Data analytics DAG
Explore the example DAG
The DAG uses multiple operators to transform and unify the data:
The
AzureBlobStorageToGCSOperator
transfers the holidays.csv file from your Azure block blob to your Cloud Storage bucket.The
GCSToBigQueryOperator
ingests the holidays.csv file from Cloud Storage to a new table in the BigQueryholidays_weather
dataset you created earlier.The
DataprocCreateBatchOperator
creates and runs a PySpark batch job using Dataproc Serverless.The
BigQueryInsertJobOperator
joins the data from holidays.csv on the "Date" column with weather data from the BigQuery public dataset ghcn_d. TheBigQueryInsertJobOperator
tasks are dynamically generated using a for loop, and these tasks are in aTaskGroup
for better readability in the Graph View of the Airflow UI.
Use the Airflow UI to add variables
In Airflow, variables are an universal way to store and retrieve arbitrary settings or configurations as a simple key value store. This DAG uses Airflow variables to store common values. To add them to your environment:
Go to Admin > Variables.
Add the following variables:
gcp_project
: your project ID.gcs_bucket
: the name of the bucket you created earlier (without thegs://
prefix).gce_region
: the region where you want your Dataproc job that meets Dataproc Serverless networking requirements. This is the region where you enabled private Google access earlier.dataproc_service_account
: the service account for your Cloud Composer environment. You can find this service account on the environment configuration tab for your Cloud Composer environment.azure_blob_name
: the name of the blob you created earlier.azure_container_name
: the name of the container you created earlier.
Upload the DAG to your environment's bucket
Cloud Composer schedules DAGs that are located in the
/dags
folder in your environment's bucket. To upload the DAG using the
Google Cloud console:
On your local machine, save azureblobstoretogcsoperator_tutorial.py.
In Google Cloud console, go to the Environments page.
In the list of environments, in the DAG folder column click the DAGs link. The DAGs folder of your environment opens.
Click Upload files.
Select
azureblobstoretogcsoperator_tutorial.py
on your local machine and click Open.
Trigger the DAG
In your Cloud Composer environment, click the DAGs tab.
Click into DAG id
azure_blob_to_gcs_dag
.Click Trigger DAG.
Wait about five to ten minutes until you see a green check indicating the tasks have been completed successfully.
Validate the DAG's success
In Google Cloud console, go to the BigQuery page.
In the Explorer panel, click your project name.
Click
holidays_weather_joined
.Click preview to view the resulting table. Note that the numbers in the value column are in tenths of a degree Celsius.
Click
holidays_weather_normalized
.Click preview to view the resulting table. Note that the numbers in the value column are in degree Celsius.
Cleanup
Delete individual resources that you created for this tutorial:
Delete the Cloud Storage bucket that you created for this tutorial.
Delete the Cloud Composer environment, including manually deleting the environment's bucket.