Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
This page guides you through creating an event-based push architecture by triggering Cloud Composer DAGs in response to Pub/Sub topic changes. Examples in this tutorial demonstrate handling the full cycle of Pub/Sub management, including subscription management, as a part of the DAG process. It is suitable for some of the common use cases when you need to trigger DAGs but don't want to set up extra access permissions.
For example, messages sent through Pub/Sub can be used as a solution if you don't want to provide direct access to a Cloud Composer environment for security reasons. You can configure a Cloud Run function that creates Pub/Sub messages and publishes them on a Pub/Sub topic. You can then create a DAG that pulls Pub/Sub messages and then handles these messages.
In this specific example, you create a Cloud Run function and deploy two DAGs. The first DAG pulls Pub/Sub messages and triggers the second DAG according to the Pub/Sub message content.
This tutorial assumes you are familiar with Python and the Google Cloud console.
Objectives
Costs
This tutorial uses the following billable components of Google Cloud:
- Cloud Composer (also see additional costs)
- Pub/Sub
- Cloud Run functions
After you finish this tutorial, you can avoid continued billing by deleting the resources you created. See Clean up for more detail.
Before you begin
For this tutorial, you need a Google Cloud project. Configure the project in the following way:
In the Google Cloud console, select or create a project:
Make sure that billing is enabled for your project. Learn how to check if billing is enabled on a project.
Make sure that your Google Cloud project user has the following roles to create the necessary resources:
- Service Account User (
roles/iam.serviceAccountUser
) - Pub/Sub Editor (
roles/pubsub.editor
) - Environment and Storage Object Administrator
(
roles/composer.environmentAndStorageObjectAdmin
) - Cloud Run functions Admin (
roles/cloudfunctions.admin
) - Logs Viewer (
roles/logging.viewer
)
- Service Account User (
Make sure that the service account that runs your Cloud Run function has sufficient permissions in your project to access Pub/Sub. By default, Cloud Run functions use the App Engine default service account. This service account has the Editor role, which has sufficient permissions for this tutorial.
Enable APIs for your project
Console
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.
gcloud
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
Enable the Cloud Composer API in your project by adding the following resource definitions to your Terraform script:
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
Replace <PROJECT_ID>
with Project ID
of your project. For example, example-project
.
Create your Cloud Composer environment
Create a Cloud Composer 2 environment.
As a part of this procedure,
you grant the Cloud Composer v2 API Service Agent Extension
(roles/composer.ServiceAgentV2Ext
) role to the Composer Service Agent
account. Cloud Composer uses this account to perform operations
in your Google Cloud project.
Create a Pub/Sub topic
This example triggers a DAG in response to a message pushed to a Pub/Sub topic. Create a Pub/Sub topic to use in this example:
Console
In the Google Cloud console, go to the Pub/Sub Topics page.
Click Create Topic.
In the Topic ID field, enter
dag-topic-trigger
as an ID for your topic.Leave other options at their defaults.
Click Create Topic.
gcloud
To create a topic, run the gcloud pubsub topics create command in Google Cloud CLI:
gcloud pubsub topics create dag-topic-trigger
Terraform
Add the following resource definitions to your Terraform script:
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
Replace <PROJECT_ID>
with Project ID
of your project. For example, example-project
.
Upload your DAGs
Upload DAGs to your environment:
- Save the following DAG file on your local computer.
- Replace
<PROJECT_ID>
with Project ID of your project. For example,example-project
. - Upload the edited DAG file to your environment.
The sample code contains two DAGs: trigger_dag
and target_dag
.
The trigger_dag
DAG subscribes to a Pub/Sub topic, pulls
Pub/Sub messages, and triggers another DAG specified in the DAG ID
of the Pub/Sub message data. In this example, trigger_dag
triggers
the target_dag
DAG, which outputs messages to the task logs.
The trigger_dag
DAG contains the following tasks:
subscribe_task
: Subscribe to a Pub/Sub topic.pull_messages_operator
: Read a Pub/Sub message data withPubSubPullOperator
.trigger_target_dag
: Trigger another DAG (in this example,target_dag
) according to the data in the messages pulled from the Pub/Sub topic.
The target_dag
DAG contains just one task: output_to_logs
. This task
prints messages to the task log with one second delay.
Deploy a Cloud Run function that publishes messages on a Pub/Sub topic
In this section, you deploy a Cloud Run function that publishes messages on a Pub/Sub topic.
Create a Cloud Run function and specify its configuration
Console
In the Google Cloud console, go to the Cloud Run functions page.
Click Create function.
In the Environment field, select 1st gen.
In the Function name field, enter the name for your function:
pubsub-publisher
.In the Trigger type field, select HTTP.
In the Authentication section, select Allow unauthenticated invocations. This option grants unauthenticated users the ability to invoke an HTTP function.
Click Save.
Click Next to move to the Code step.
Terraform
Consider using the Google Cloud console for this step, because there is no straightforward way to manage the function's source code from Terraform.
This example demonstrates how you can upload a Cloud Run function from a local zip archive file by creating a Cloud Storage bucket, storing the file in this bucket, then using the file from the bucket as a source for the Cloud Run function. If you use this approach, Terraform doesn't automatically update the source code of your function, even if you create a new archive file. To re-upload the function code, you can change the file name of the archive.
- Donwload the
pubsub_publisher.py
and therequirements.txt
files. - In the
pubsub_publisher.py
file, replace<PROJECT_ID>
with the Project ID of your project. For example,example-project
. - Create a zip archive named
pubsub_function.zip
with thepbusub_publisner.py
and therequirements.txt
file. - Save the zip archive to a directory where your Terraform script is stored.
- Add the following resource definitions to your Terraform script and
replace
<PROJECT_ID>
with the Project ID of your project.
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
Specify Cloud Run function code parameters
Console
In the Code step, In the Runtime field, select the language runtime your function uses. In this example, select Python 3.10.
In the Entry point field, enter
pubsub_publisher
. This is the code that is executed when your Cloud Run function runs. The value of this flag must be a function name or a fully-qualified class name that exists in your source code.
Terraform
Skip this step. Cloud Run function parameters are already defined in
the google_cloudfunctions_function
resource.
Upload your Cloud Run function code
Console
In the Source code field, select the appropriate option for how you supply the function source code. In this tutorial, add your function code using the Cloud Run functions Inline Editor. As an alternative, you can upload a ZIP file, or use Cloud Source Repositories.
- Put the following code example into the main.py file.
- Replace
<PROJECT_ID>
with Project ID of your project. For example,example-project
.
Terraform
Skip this step. Cloud Run function parameters are already defined in
the google_cloudfunctions_function
resource.
Specify your Cloud Run function dependencies
Console
Specify the function dependencies in the requirements.txt metadata file:
When you deploy your function, Cloud Run functions downloads and installs
dependencies declared in the requirements.txt file, one line per package.
This file must be in the same directory as the main.py file that contains
your function code. For more details, see
Requirements Files
in pip
documentation.
Terraform
Skip this step. Cloud Run function dependencies are defined in
the requirements.txt
file in the pubsub_function.zip
archive.
Deploy your Cloud Run function
Console
Click Deploy. When deployment finishes successfully, the function appears with a green check mark on the Cloud Run functions page in the Google Cloud console.
Make sure that the service account that runs your Cloud Run function has enough permissions in your project to access Pub/Sub.
Terraform
Initialize Terraform:
terraform init
Review the configuration and verify that the resources that Terraform is going to create or update match your expectations:
terraform plan
To check whether your configuration is valid, run the following command:
terraform validate
Apply the Terraform configuration by running the following command and entering yes at the prompt:
terraform apply
Wait until Terraform displays the "Apply complete!" message.
In the Google Cloud console, navigate to your resources in the UI to make sure that Terraform has created or updated them.
Test your Cloud Run function
To check that your function publishes a message on a Pub/Sub topic and that the example DAGs work as intended:
Check that the DAGs are active:
In the Google Cloud console, go to the Environments page.
In the list of environments, click the name of your environment. The Environment details page opens.
Go to the DAGs tab.
Check values in the State column for DAGs named
trigger_dag
andtarget_dag
. Both DAGs must be in theActive
state.
Push a test Pub/Sub message. You can do it in Cloud Shell:
In the Google Cloud console, go to the Functions page.
Click the name of your function,
pubsub-publisher
.Go to the Testing tab.
In Configure triggering event section, enter the following JSON key-value:
{"message": "target_dag"}
. Don't modify the key-value pair, because this message triggers the test DAG later.In the Test Command section, click Test in Cloud Shell.
In Cloud Shell Terminal, wait until a command appears automatically. Run this command by pressing
Enter
.If the Authorize Cloud Shell message appears, click Authorize.
Check that the message content corresponds to the Pub/Sub message. In this example, the output message must start with
Message b'target_dag' with message_length 10 published to
as a response from your function.
Check that
target_dag
was triggered:Wait at least one minute, so that a new DAG run of
trigger_dag
completes.In the Google Cloud console, go to the Environments page.
In the list of environments, click the name of your environment. The Environment details page opens.
Go to the DAGs tab.
Click
trigger_dag
to go to the DAG details page. On the Runs tab, a list of DAG runs for thetrigger_dag
DAG is displayed.This DAG runs every minute and processes all Pub/Sub messages sent from the function. If no messages were sent, then the
trigger_target
task is marked asSkipped
in the DAG run logs. If DAGs were triggered, then thetrigger_target
task is marked asSuccess
.Look through several recent DAG runs to locate a DAG run where all three tasks (
subscribe_task
,pull_messages_operator
, andtrigger_target
) are inSuccess
statuses.Go back to the DAGs tab and check that the Successful runs column for the
target_dag
DAG lists one successful run.
Summary
In this tutorial, you learned how to use Cloud Run functions to publish messages on a Pub/Sub topic and deploy a DAG that subscribes to a Pub/Sub topic, pulls Pub/Sub messages, and triggers another DAG specified in the DAG ID of the message data.
There are also alternative ways of creating and managing Pub/Sub subscriptions and triggering DAGs that are outside of the scope of this tutorial. For example, you can use Cloud Run functions to trigger Airflow DAGs when a specified event occurs. Have a look at our tutorials to try out the other Google Cloud features for yourself.
Clean up
To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources or keep the project and delete the individual resources.
Delete the project
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Delete individual resources
If you plan to explore multiple tutorials and quickstarts, reusing projects can help you avoid exceeding project quota limits.
Console
- Delete the Cloud Composer environment. You also delete the environment's bucket during this procedure.
- Delete the Pub/Sub topic,
dag-topic-trigger
. Delete the Cloud Run function.
In the Google Cloud console, go to Cloud Run functions.
Click the checkbox for the function that you want to delete,
pubsub-publisher
.Click Delete, and then follow the instructions.
Terraform
- Make sure that your Terraform script doesn't contain entries for resources that are still required by your project. For example, you might want to keep some APIs enabled and IAM permissions still assigned (if you added such definitions to your Terraform script).
- Run
terraform destroy
. - Manually delete the environment's bucket. Cloud Composer doesn't delete it automatically. You can do it from the Google Cloud console or Google Cloud CLI.
What's next
- Testing DAGs
- Testing HTTP Functions
- Deploy a Cloud Run function
- Try out other Google Cloud features for yourself. Have a look at our tutorials.