使用 Cloud Functions 函数和 Pub/Sub 消息触发 DAG

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本页面将引导您创建一个基于事件的推送架构,方法是触发 Cloud Composer DAG 以响应 Pub/Sub 主题更改。本教程中的示例演示了如何处理 Pub/Sub 管理的完整周期,包括作为 DAG 流程的一部分的订阅管理。如果您需要触发 DAG,但不想设置额外的访问权限,那么此功能非常适合一些常见的使用场景。

例如,如果您出于安全考虑而不希望提供对 Cloud Composer 环境的直接访问权限,则可以通过 Pub/Sub 发送的消息作为解决方案。您可以配置一个 Cloud Run 函数,用于创建 Pub/Sub 消息并将其发布到 Pub/Sub 主题。然后,您可以创建一个拉取 Pub/Sub 消息并处理这些消息的 DAG。

在此特定示例中,您将创建一个 Cloud Run 函数并部署两个 DAG。第一个 DAG 会拉取 Pub/Sub 消息,并根据 Pub/Sub 消息内容触发第二个 DAG。

本教程假定您熟悉 Python 和 Google Cloud 控制台。

目标

费用

本教程使用 Google Cloud的以下收费组件:

完成本教程后,您可以通过删除您创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

在本教程中,您需要一个 Google Cloud 项目。 按如下方式配置项目:

  1. 在 Google Cloud 控制台中,选择或创建一个项目

    前往“项目选择器”

  2. 确保您的项目已启用结算功能。 了解如何检查项目是否已启用结算功能

  3. 确保您的 Google Cloud 项目用户拥有以下角色,以便创建必要的资源:

    • Service Account User (roles/iam.serviceAccountUser)
    • Pub/Sub Editor (roles/pubsub.editor)
    • Environment and Storage Object Administrator (roles/composer.environmentAndStorageObjectAdmin)
    • Cloud Run functions Admin (roles/cloudfunctions.admin)
    • Logs Viewer (roles/logging.viewer)
  4. 确保运行 Cloud Run 函数的服务账号在您的项目中拥有足够的权限来访问 Pub/Sub。默认情况下,Cloud Run 函数使用 App Engine 默认服务账号。此服务账号具有 Editor 角色,该角色具有本教程所需的足够权限。

为您的项目启用 API

控制台

Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.

Enable the APIs

gcloud

Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:

gcloud services enable composer.googleapis.com cloudfunctions.googleapis.com pubsub.googleapis.com

Terraform

通过将以下资源定义添加到 Terraform 脚本中,在项目中启用 Cloud Composer API:

resource "google_project_service" "composer_api" {
  project = "<PROJECT_ID>"
  service = "composer.googleapis.com"
  // Disabling Cloud Composer API might irreversibly break all other
  // environments in your project.
  // This parameter prevents automatic disabling
  // of the API when the resource is destroyed.
  // We recommend to disable the API only after all environments are deleted.
  disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
  check_if_service_has_usage_on_destroy = true
}

resource "google_project_service" "pubsub_api" {
  project = "<PROJECT_ID>"
  service = "pubsub.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "functions_api" {
  project = "<PROJECT_ID>"
  service = "cloudfunctions.googleapis.com"
  disable_on_destroy = false
}

<PROJECT_ID> 替换为您的项目的项目 ID。例如 example-project

创建 Cloud Composer 环境

创建 Cloud Composer 2 环境

在此过程的最后一步,您将 Cloud Composer v2 API Service Agent Extension (roles/composer.ServiceAgentV2Ext) 角色授予 Composer Service Agent 账号。Cloud Composer 使用此账号在您的 Google Cloud 项目中执行操作。

创建 Pub/Sub 主题

此示例会响应推送到 Pub/Sub 主题的消息来触发 DAG。创建要在本示例中使用的 Pub/Sub 主题:

控制台

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 主题页面。

    前往“Pub/Sub 主题”

  2. 点击创建主题

  3. 主题 ID 字段中,输入 dag-topic-trigger 作为主题的 ID。

  4. 将其他选项保留为默认值。

  5. 点击创建主题

gcloud

如需创建主题,请在 Google Cloud CLI 中运行 gcloud pubsub topics create 命令:

gcloud pubsub topics create dag-topic-trigger

Terraform

将以下资源定义添加到 Terraform 脚本中:

resource "google_pubsub_topic" "trigger" {
  project                    = "<PROJECT_ID>"
  name                       = "dag-topic-trigger"
  message_retention_duration = "86600s"
}

<PROJECT_ID> 替换为您的项目的项目 ID。例如 example-project

上传 DAG

将 DAG 上传到您的环境:

  1. 将以下 DAG 文件保存到您的本地计算机上。
  2. <PROJECT_ID> 替换为您的项目的项目 ID。例如 example-project
  3. 将修改后的 DAG 文件上传到您的环境。
from __future__ import annotations

from datetime import datetime
import time

from airflow import DAG
from airflow import XComArg
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.pubsub import (
    PubSubCreateSubscriptionOperator,
    PubSubPullOperator,
)

PROJECT_ID = "<PROJECT_ID>"
TOPIC_ID = "dag-topic-trigger"
SUBSCRIPTION = "trigger_dag_subscription"


def handle_messages(pulled_messages, context):
    dag_ids = list()
    for idx, m in enumerate(pulled_messages):
        data = m.message.data.decode("utf-8")
        print(f"message {idx} data is {data}")
        dag_ids.append(data)
    return dag_ids


# This DAG will run minutely and handle pub/sub messages by triggering target DAG
with DAG(
    "trigger_dag",
    start_date=datetime(2021, 1, 1),
    schedule_interval="* * * * *",
    max_active_runs=1,
    catchup=False,
) as trigger_dag:
    # If subscription exists, we will use it. If not - create new one
    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        subscription=SUBSCRIPTION,
    )

    subscription = subscribe_task.output

    # Proceed maximum 50 messages in callback function handle_messages
    # Here we acknowledge messages automatically. You can use PubSubHook.acknowledge to acknowledge in downstream tasks
    # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge
    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        project_id=PROJECT_ID,
        ack_messages=True,
        messages_callback=handle_messages,
        subscription=subscription,
        max_messages=50,
    )

    # Here we use Dynamic Task Mapping to trigger DAGs according to messages content
    # https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
    trigger_target_dag = TriggerDagRunOperator.partial(task_id="trigger_target").expand(
        trigger_dag_id=XComArg(pull_messages_operator)
    )

    (subscribe_task >> pull_messages_operator >> trigger_target_dag)


def _some_heavy_task():
    print("Do some operation...")
    time.sleep(1)
    print("Done!")


# Simple target DAG
with DAG(
    "target_dag",
    start_date=datetime(2022, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
    catchup=False,
) as target_dag:
    some_heavy_task = PythonOperator(
        task_id="some_heavy_task", python_callable=_some_heavy_task
    )

    (some_heavy_task)

示例代码包含两个 DAG:trigger_dagtarget_dag

trigger_dag DAG 订阅 Pub/Sub 主题,拉取 Pub/Sub 消息,并触发 Pub/Sub 消息数据的 DAG ID 中指定的另一个 DAG。在此示例中,trigger_dag 会触发 target_dag DAG,后者会将消息输出到任务日志。

trigger_dag DAG 包含以下任务:

  • subscribe_task:订阅 Pub/Sub 主题。
  • pull_messages_operator:使用 PubSubPullOperator 读取 Pub/Sub 消息数据。
  • trigger_target_dag:根据从 Pub/Sub 主题拉取的消息中的数据触发另一个 DAG(在本例中为 target_dag)。

target_dag DAG 仅包含一项任务:output_to_logs。此任务会以 1 秒的延迟将消息打印到任务日志中。

部署在 Pub/Sub 主题上发布消息的 Cloud Run 函数

在本部分中,您将部署一个在 Pub/Sub 主题上发布消息的 Cloud Run 函数。

创建 Cloud Run 函数并指定其配置

控制台

  1. 在 Google Cloud 控制台中,前往 Cloud Run functions 页面。

    前往 Cloud Run functions

  2. 点击创建函数

  3. 环境字段中,选择第 1 代

  4. 函数名称字段中,输入函数的名称:pubsub-publisher

  5. 触发器类型字段中,选择 HTTP

  6. 身份验证部分中,选择允许未通过身份验证的调用。此选项可授予未经身份验证的用户调用 HTTP 函数的权限。

  7. 点击保存。

  8. 点击下一步以继续执行代码步骤。

Terraform

请考虑在此步骤中使用 Google Cloud 控制台,因为没有直接从 Terraform 管理函数源代码的方法。

此示例演示了如何通过以下方式从本地 zip 归档文件上传 Cloud Run 函数:创建 Cloud Storage 存储桶,将文件存储在此存储桶中,然后使用存储桶中的文件作为 Cloud Run 函数的来源。如果您使用此方法,即使您创建了新的归档文件,Terraform 也不会自动更新函数的源代码。如需重新上传函数代码,您可以更改归档文件的名称。

  1. 下载 pubsub_publisher.pyrequirements.txt 文件。
  2. pubsub_publisher.py 文件中,将 <PROJECT_ID> 替换为您的项目的项目 ID。例如 example-project
  3. 创建一个名为 pubsub_function.zip 的 ZIP 归档,其中包含 pbusub_publisner.pyrequirements.txt 文件。
  4. 将 ZIP 归档保存到存储 Terraform 脚本的目录中。
  5. 将以下资源定义添加到您的 Terraform 脚本中,并将 <PROJECT_ID> 替换为您的项目 ID。
resource "google_storage_bucket" "cloud_function_bucket" {
  project        = <PROJECT_ID>
  name           = "<PROJECT_ID>-cloud-function-source-code"
  location       = "US"
  force_destroy  = true
  uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "cloud_function_source" {
  name   = "pubsub_function.zip"
  bucket = google_storage_bucket.cloud_function_bucket.name
  source = "./pubsub_function.zip"
}

resource "google_cloudfunctions_function" "pubsub_function" {
  project = <PROJECT_ID>
  name    = "pubsub-publisher"
  runtime = "python310"
  region  = "us-central1"

  available_memory_mb   = 128
  source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
  source_archive_object = "pubsub_function.zip"
  timeout               = 60
  entry_point           = "pubsub_publisher"
  trigger_http          = true
}

指定 Cloud Run 函数代码参数

控制台

  1. 代码步骤中,在运行时字段中,选择函数要使用的语言运行时。在此示例中,选择 Python 3.10

  2. 入口点字段中,输入 pubsub_publisher。这是在 Cloud Run 函数运行时执行的代码。此标志的值必须是源代码中存在的函数名称或完全限定类名称。

Terraform

跳过此步骤。Cloud Run 函数参数已在 google_cloudfunctions_function 资源中定义。

上传 Cloud Run 函数代码

控制台

源代码字段中,选择有关如何提供函数源代码的适当选项。 在本教程中,您将使用 Cloud Run 函数的内嵌编辑器添加函数代码。或者,您也可以上传 ZIP 文件或使用 Cloud Source Repositories。

  1. 将以下代码示例放入 main.py 文件中。
  2. <PROJECT_ID> 替换为您的项目的项目 ID。例如 example-project
from google.cloud import pubsub_v1

project = "<PROJECT_ID>"
topic = "dag-topic-trigger"


def pubsub_publisher(request):
    """Publish message from HTTP request to Pub/Sub topic.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text with message published into Pub/Sub topic
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    request_json = request.get_json()
    print(request_json)
    if request.args and "message" in request.args:
        data_str = request.args.get("message")
    elif request_json and "message" in request_json:
        data_str = request_json["message"]
    else:
        return "Message content not found! Use 'message' key to specify"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project, topic)

    # The required data format is a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    message_length = len(data_str)
    future = publisher.publish(topic_path, data, message_length=str(message_length))
    print(future.result())

    return f"Message {data} with message_length {message_length} published to {topic_path}."

Terraform

跳过此步骤。Cloud Run 函数参数已在 google_cloudfunctions_function 资源中定义。

指定 Cloud Run 函数依赖项

控制台

requirements.txt 元数据文件中指定函数依赖项:

requests-toolbelt==1.0.0
google-auth==2.38.0
google-cloud-pubsub==2.28.0

部署函数时,Cloud Run functions 会下载并安装 requirements.txt 文件中声明的依赖项,每个软件包对应一行内容。此文件必须位于包含您的函数代码的 main.py 文件所在的目录中。如需了解详情,请参阅 pip 文档中的要求文件

Terraform

跳过此步骤。Cloud Run 函数依赖项在 pubsub_function.zip 归档文件中的 requirements.txt 文件中定义。

部署 Cloud Run 函数

控制台

点击部署。部署成功完成后,Google Cloud 控制台的 Cloud Run functions 页面中会显示带有绿色对勾标记的函数。

确保运行 Cloud Run 函数的服务账号在您的项目中拥有足够的权限来访问 Pub/Sub。

Terraform

  1. 初始化 Terraform:

    terraform init
    
  2. 查看配置并验证 Terraform 将创建或更新的资源是否符合您的预期:

    terraform plan
    
  3. 如需检查配置是否有效,请运行以下命令:

    terraform validate
    
  4. 通过运行以下命令并在提示符处输入 yes 来应用 Terraform 配置:

    terraform apply
    

等待 Terraform 显示“应用完成!”消息。

在 Google Cloud 控制台的界面中找到资源,以确保 Terraform 已创建或更新它们。

测试 Cloud Run 函数

如需检查函数是否在 Pub/Sub 主题上发布消息,以及示例 DAG 是否按预期工作,请执行以下操作:

  1. 检查 DAG 是否处于有效状态:

    1. 在 Google Cloud 控制台中,前往环境页面。

      转到“环境”

    2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

    3. 前往 DAG 标签页。

    4. 检查名为 trigger_dagtarget_dag 的 DAG 的状态列中的值。这两个 DAG 必须处于 Active 状态。

  2. 推送测试 Pub/Sub 消息。您可以在 Cloud Shell 中执行此操作:

    1. 在 Google Cloud 控制台中,前往 Functions 页面。

      前往 Cloud Run functions

    2. 点击函数的名称 pubsub-publisher

    3. 前往测试标签页。

    4. 配置触发事件部分,输入以下 JSON 键值对:{"message": "target_dag"}。请勿修改键值对,因为此消息会在稍后触发测试 DAG。

    5. 测试命令部分,点击在 Cloud Shell 中测试

    6. Cloud Shell 终端中,等待系统自动显示命令。按 Enter 运行此命令。

    7. 如果系统显示为 Cloud Shell 提供授权消息,请点击授权

    8. 检查消息内容是否与 Pub/Sub 消息相对应。在此示例中,输出消息必须以 Message b'target_dag' with message_length 10 published to 开头,作为函数的响应。

  3. 检查 target_dag 是否已触发:

    1. 等待至少一分钟,以便 trigger_dag 的新 DAG 运行完成。

    2. 在 Google Cloud 控制台中,前往环境页面。

      转到“环境”

    3. 在环境列表中,点击您的环境名称。环境详情页面会打开。

    4. 前往 DAG 标签页。

    5. 点击 trigger_dag 前往 DAG 详情页面。在运行标签页中,系统会显示 trigger_dag DAG 的 DAG 运行列表。

      此 DAG 每分钟运行一次,并处理从该函数发送的所有 Pub/Sub 消息。如果没有发送任何消息,则 trigger_target 任务会在 DAG 运行日志中标记为 Skipped。如果触发了 DAG,则 trigger_target 任务会被标记为 Success

    6. 查看最近的几个 DAG 运行,找到一个 DAG 运行,其中所有三个任务(subscribe_taskpull_messages_operatortrigger_target)都处于 Success 状态。

    7. 返回到 DAG 标签页,并检查 target_dag DAG 的成功运行次数列是否列出了一次成功运行。

摘要

在本教程中,您学习了如何使用 Cloud Run functions 在 Pub/Sub 主题上发布消息,以及如何部署订阅 Pub/Sub 主题、拉取 Pub/Sub 消息并触发消息数据 DAG ID 中指定的另一个 DAG 的 DAG。

此外,还有其他创建和管理 Pub/Sub 订阅以及触发 DAG 的方法,但这些方法不在本教程的介绍范围内。例如,您可以在发生指定事件时使用 Cloud Run 函数触发 Airflow DAG查阅我们的教程,自行试用其他Google Cloud 功能。

清理

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留该项目但删除各个资源。

删除项目

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

删除各个资源

如果您打算探索多个教程和快速入门,重复使用项目可以帮助您避免超出项目配额上限。

控制台

  1. 删除 Cloud Composer 环境。在此过程中,您还可以删除环境的存储桶。
  2. 删除 Pub/Sub 主题dag-topic-trigger
  3. 删除 Cloud Run functions 函数。

    1. 在 Google Cloud 控制台中,前往 Cloud Run functions。

      前往 Cloud Run functions

    2. 点击要删除的函数对应的复选框 pubsub-publisher

    3. 点击删除,然后按照说明操作。

Terraform

  1. 确保您的 Terraform 脚本不包含项目仍需要的资源条目。例如,您可能希望保持某些 API 处于启用状态,并仍分配 IAM 权限(如果您已在 Terraform 脚本中添加此类定义)。
  2. 运行 terraform destroy
  3. 手动删除环境的存储桶。Cloud Composer 不会自动删除它。您可以通过 Google Cloud 控制台或 Google Cloud CLI 执行此操作。

后续步骤