使用 SMT 创建订阅

本文档介绍了如何使用单个消息转换 (SMT) 创建 Pub/Sub 订阅。

订阅 SMT 允许直接在 Pub/Sub 中对消息数据和属性进行轻量级修改。此功能可在消息传送到订阅方客户端之前执行数据清理、过滤或格式转换。

如需创建包含 SMT 的订阅,您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API。

准备工作

所需的角色和权限

如需获得使用 SMT 创建订阅所需的权限,请让您的管理员为您授予项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含使用 SMT 创建订阅所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

若要使用 SMT 创建订阅,您需要具备以下权限:

  • 向项目授予创建订阅权限: pubsub.subscriptions.create

您也可以使用自定义角色或其他预定义角色来获取这些权限。

根据订阅类型,您可能需要额外的权限。如需查看确切的权限列表,请参阅介绍如何创建特定订阅的文档。例如,如果您要使用 SMT 创建 BigQuery 订阅,请参阅创建 BigQuery 订阅页面。

如果您在与主题不同的项目中创建订阅,则必须在包含主题的项目中向包含订阅的项目的正文授予 roles/pubsub.subscriber 角色。

您可以在项目级别和各个资源级别配置访问权限控制。

使用 SMT 创建订阅

在使用 SMT 创建订阅之前,请先查看订阅的属性文档。

以下示例假定您要使用此用户定义的函数 (UDF) SMT 创建订阅。如需详细了解 UDF,请参阅 UDF 概览

function redactSSN(message, metadata) {
  const data = JSON.parse(message.data);
  delete data['ssn'];
  message.data = JSON.stringify(data);
  return message;
}

控制台

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 订阅页面。

    前往“订阅”页面

  2. 点击创建订阅

    系统随即会打开创建订阅页面。

  3. 订阅 ID 字段中,输入订阅的 ID。 如需详细了解如何命名订阅,请参阅命名准则

  4. 转换下,点击添加转换

  5. 输入函数名称。例如:redactSSN

  6. 如果您不想立即将 SMT 与订阅搭配使用,请点击停用转换选项。这样仍然会保存 SMT,但在消息通过您的订阅传输时,系统不会执行该 SMT。

  7. 输入新的转换。例如:

    function redactSSN(message, metadata) {
      const data = JSON.parse(message.data);
      delete data['ssn'];
      message.data = JSON.stringify(data);
      return message;
    }
  8. Pub/Sub 提供了一个验证函数,可用于验证 SMT。点击验证以验证转换。

  9. 如果您想添加其他转换,请点击添加转换

  10. 如需按特定顺序排列所有 SMT,您可以使用向上箭头和向下箭头。如需移除 SMT,请点击“删除”按钮。
  11. Pub/Sub 提供了一个测试函数,可让您检查对示例消息运行 SMT 的结果。如需测试 SMT,请点击测试转换

  12. Test transform 窗口中,选择要测试的函数。

  13. 输入消息窗口中,输入示例消息。

  14. 如果您想添加消息属性,请点击添加属性,然后输入一个或多个键值对。

  15. 点击测试。系统会显示对消息应用 SMT 的结果。

  16. 关闭该窗口可停止对示例邮件测试 SMT。

  17. 点击创建以创建订阅。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Pub/Sub 提供了一个验证函数,可用于验证 SMT。运行 gcloud pubsub message-transforms validate 命令:

    gcloud pubsub message-transforms validate --message-transform-file=TRANSFORM_FILE

    替换以下内容:

    • TRANSFORM_FILE:包含单个 SMT 的 YAML 或 JSON 文件的路径。

      下面是一个 YAML 转换文件示例:

      javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

  3. Pub/Sub 提供了一个测试函数,可让您检查对示例消息运行一个或多个 SMT 的结果。运行 gcloud pubsub message-transforms test 命令:

    gcloud pubsub message-transforms test --message-transforms-file=TRANSFORMS_FILE

    替换以下内容:

    • TRANSFORMS_FILE:包含一个或多个 SMT 的 YAML 或 JSON 文件的路径。

      以下是 YAML 转换文件的示例:

      - javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

  4. 如需创建订阅,请运行 gcloud pubsub subscriptions create 命令:

    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=TOPIC_NAME \
        --message-transforms-file=TRANSFORMS_FILE

    请替换以下内容:

    • SUBSCRIPTION_ID:您要创建的订阅的 ID 或名称。如需有关如何命名订阅的准则,请参阅资源名称。订阅的名称不可变。

    • TOPIC_NAME:要订阅的主题的名称,格式为 projects/PROJECT_ID/topics/TOPIC_ID

    • TRANSFORMS_FILE:包含一个或多个 SMT 的 YAML 或 JSON 文件的路径。

      下面是一个 YAML 转换文件示例:

      - javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

Java

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.JavaScriptUDF;
import com.google.pubsub.v1.MessageTransform;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithSmtExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithSmtExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithSmtExample(
      String projectId, String topicId, String subscriptionId) throws IOException {

    // UDF that removes the 'ssn' field, if present
    String code =
        "function redactSSN(message, metadata) {"
            + "  const data = JSON.parse(message.data);"
            + "  delete data['ssn'];"
            + "  message.data = JSON.stringify(data);"
            + "  return message;"
            + "}";
    String functionName = "redactSSN";

    JavaScriptUDF udf =
        JavaScriptUDF.newBuilder().setCode(code).setFunctionName(functionName).build();
    MessageTransform transform = MessageTransform.newBuilder().setJavascriptUdf(udf).build();

    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Add the UDF message transform
                  .addMessageTransforms(transform)
                  .build());

      System.out.println("Created subscription with SMT: " + subscription.getAllFields());
    }
  }
}

Python

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

from google.cloud import pubsub_v1
from google.pubsub_v1.types import JavaScriptUDF, MessageTransform

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

code = """function redactSSN(message, metadata) {
            const data = JSON.parse(message.data);
            delete data['ssn'];
            message.data = JSON.stringify(data);
            return message;
            }"""
udf = JavaScriptUDF(code=code, function_name="redactSSN")
transforms = [MessageTransform(javascript_udf=udf)]

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "message_transforms": transforms,
        }
    )
    print(f"Created subscription with SMT: {subscription}")

Go

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package subscriptions

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

// createSubscriptionWithSMT creates a subscription with a single message transform function applied.
func createSubscriptionWithSMT(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	code := `function redactSSN(message, metadata) {
			const data = JSON.parse(message.data);
			delete data['ssn'];
			message.data = JSON.stringify(data);
			return message;
		}`
	transform := pubsub.MessageTransform{
		Transform: pubsub.JavaScriptUDF{
			FunctionName: "redactSSN",
			Code:         code,
		},
	}
	cfg := pubsub.SubscriptionConfig{
		Topic:             topic,
		MessageTransforms: []pubsub.MessageTransform{transform},
	}
	sub, err := client.CreateSubscription(ctx, subID, cfg)
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription with message transform: %v\n", sub)
	return nil
}

SMT 如何与其他订阅功能交互

如果您的订阅同时使用 SMT 和 Pub/Sub 的内置过滤器,则系统会先应用过滤器,然后再应用 SMT。这具有以下含义:

  • 如果 SMT 更改了消息属性,Pub/Sub 过滤条件不会应用于新一组属性。
  • 您的 SMT 不会应用于被 Pub/Sub 过滤器滤除的任何消息。

如果 SMT 过滤掉了消息,请注意对监控订阅积压的影响。如果您将订阅馈送到 Dataflow 流水线,请勿使用 SMT 过滤消息,因为这会干扰 Dataflow 的自动扩缩。

后续步骤