使用 SMT 建立訂閱

本文說明如何使用單一訊息轉換 (SMT) 建立 Pub/Sub 訂閱項目。

訂閱 SMT 可讓您直接在 Pub/Sub 中輕鬆修改訊息資料和屬性。這項功能可在訊息傳送至訂閱者用戶端前,執行資料清理、篩選或格式轉換作業。

如要使用 SMT 建立訂閱項目,您可以使用 Google Cloud 控制台、Google Cloud CLI、用戶端程式庫或 Pub/Sub API。

事前準備

必要角色和權限

如要取得使用 SMT 建立訂閱項目所需的權限,請要求管理員為您授予專案的 Pub/Sub 編輯者 (roles/pubsub.editor) 身分與存取權管理角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

這個預先定義的角色包含使用 SMT 建立訂閱項目所需的權限。如要查看確切的必要權限,請展開「必要權限」部分:

所需權限

如要使用 SMT 建立訂閱項目,必須具備下列權限:

  • 授予專案的建立訂閱項目權限: pubsub.subscriptions.create

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

視訂閱類型而定,您可能需要額外的權限。如要查看確切的權限清單,請參閱討論如何建立特定訂閱項目的文件。舉例來說,如果您要使用 SMT 建立 BigQuery 訂閱項目,請參閱「建立 BigQuery 訂閱項目」頁面。

如果您在主題以外的專案中建立訂閱項目,則必須將 roles/pubsub.subscriber 角色授予主體,該主體位於包含主題的專案中,且該專案包含訂閱項目。

您可以在專案層級和個別資源層級設定存取權控管。

使用 SMT 建立訂閱

使用 SMT 建立訂閱項目前,請先詳閱「訂閱項目的屬性」相關說明文件。

以下範例假設您想使用這個使用者定義函式 (UDF) SMT 建立訂閱項目。如要進一步瞭解 UDF,請參閱 UDF 總覽

function redactSSN(message, metadata) {
  const data = JSON.parse(message.data);
  delete data['ssn'];
  message.data = JSON.stringify(data);
  return message;
}

控制台

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 的「Subscriptions」頁面。

    前往「訂閱項目」頁面

  2. 按一下「Create Subscription」 (建立訂閱項目)

    「Create subscription」(建立訂閱) 頁面隨即開啟。

  3. 在「Subscription ID」欄位中輸入訂閱項目 ID。如要進一步瞭解如何命名訂閱項目,請參閱命名規範

  4. 在「轉換」下方,點選「新增轉換」

  5. 輸入函式名稱。例如 redactSSN

  6. 如果不想立即在訂閱方案中使用 SMT,請按一下「停用轉換」選項。這麼做仍會儲存 SMT,但系統不會在訊息透過訂閱項目傳送時執行 SMT。

  7. 輸入新的轉換。例如:

    function redactSSN(message, metadata) {
      const data = JSON.parse(message.data);
      delete data['ssn'];
      message.data = JSON.stringify(data);
      return message;
    }
  8. Pub/Sub 提供驗證函式,可讓您驗證 SMT。按一下「驗證」,驗證轉換作業。

  9. 如要新增其他轉換,請按一下「新增轉換」

  10. 如要依特定順序排列所有 SMT,可以使用向上和向下箭頭。如要移除 SMT,請按一下刪除按鈕。
  11. Pub/Sub 提供測試函式,可讓您檢查在範例訊息上執行 SMT 的結果。如要測試 SMT,請按一下「Test transform」

  12. 在「Test transform」視窗中,選取要測試的函式。

  13. 在「輸入訊息」視窗中輸入範例訊息。

  14. 如要新增訊息屬性,請按一下「新增屬性」,然後輸入一或多個鍵/值組合。

  15. 按一下「測試」。系統會顯示在訊息上套用 SMT 的結果。

  16. 關閉視窗,停止測試範例訊息的 SMT。

  17. 按一下「建立」建立訂閱項目。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Pub/Sub 提供驗證函式,可讓您驗證 SMT。執行 gcloud pubsub message-transforms validate 指令:

    gcloud pubsub message-transforms validate --message-transform-file=TRANSFORM_FILE

    更改下列內容:

    • TRANSFORM_FILE:包含單一 SMT 的 YAML 或 JSON 檔案路徑。

      以下是 YAML 轉換檔案範例:

      javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

  3. Pub/Sub 提供測試函式,可讓您檢查在範例訊息上執行一或多個 SMT 的結果。執行 gcloud pubsub message-transforms test 指令:

    gcloud pubsub message-transforms test --message-transforms-file=TRANSFORMS_FILE

    更改下列內容:

    • TRANSFORMS_FILE:包含一個或多個 SMT 的 YAML 或 JSON 檔案路徑。

      以下是 YAML 轉換檔案範例:

      - javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

  4. 如要建立訂閱項目,請執行 gcloud pubsub subscriptions create 指令:

    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=TOPIC_NAME \
        --message-transforms-file=TRANSFORMS_FILE

    請依指示取代下列項目:

    • SUBSCRIPTION_ID:您要建立的訂閱項目 ID 或名稱。如要瞭解命名訂閱項目的規範,請參閱「資源名稱」。訂閱項目名稱無法變更。

    • TOPIC_NAME:要訂閱的主題名稱,格式為 projects/PROJECT_ID/topics/TOPIC_ID

    • TRANSFORMS_FILE:包含一或多個 SMT 的 YAML 或 JSON 檔案路徑。

      以下是 YAML 轉換檔案範例:

      - javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

Java

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Java。詳情請參閱 Pub/Sub Java API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.JavaScriptUDF;
import com.google.pubsub.v1.MessageTransform;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithSmtExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithSmtExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithSmtExample(
      String projectId, String topicId, String subscriptionId) throws IOException {

    // UDF that removes the 'ssn' field, if present
    String code =
        "function redactSSN(message, metadata) {"
            + "  const data = JSON.parse(message.data);"
            + "  delete data['ssn'];"
            + "  message.data = JSON.stringify(data);"
            + "  return message;"
            + "}";
    String functionName = "redactSSN";

    JavaScriptUDF udf =
        JavaScriptUDF.newBuilder().setCode(code).setFunctionName(functionName).build();
    MessageTransform transform = MessageTransform.newBuilder().setJavascriptUdf(udf).build();

    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Add the UDF message transform
                  .addMessageTransforms(transform)
                  .build());

      System.out.println("Created subscription with SMT: " + subscription.getAllFields());
    }
  }
}

Python

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Python。詳情請參閱 Pub/Sub Python API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

from google.cloud import pubsub_v1
from google.pubsub_v1.types import JavaScriptUDF, MessageTransform

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

code = """function redactSSN(message, metadata) {
            const data = JSON.parse(message.data);
            delete data['ssn'];
            message.data = JSON.stringify(data);
            return message;
            }"""
udf = JavaScriptUDF(code=code, function_name="redactSSN")
transforms = [MessageTransform(javascript_udf=udf)]

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "message_transforms": transforms,
        }
    )
    print(f"Created subscription with SMT: {subscription}")

Go

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Go。詳情請參閱 Pub/Sub Go API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package subscriptions

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

// createSubscriptionWithSMT creates a subscription with a single message transform function applied.
func createSubscriptionWithSMT(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	code := `function redactSSN(message, metadata) {
			const data = JSON.parse(message.data);
			delete data['ssn'];
			message.data = JSON.stringify(data);
			return message;
		}`
	transform := pubsub.MessageTransform{
		Transform: pubsub.JavaScriptUDF{
			FunctionName: "redactSSN",
			Code:         code,
		},
	}
	cfg := pubsub.SubscriptionConfig{
		Topic:             topic,
		MessageTransforms: []pubsub.MessageTransform{transform},
	}
	sub, err := client.CreateSubscription(ctx, subID, cfg)
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription with message transform: %v\n", sub)
	return nil
}

單一登入服務與其他訂閱功能的交互作用

如果訂閱項目同時使用 SMT 和 Pub/Sub 的內建篩選器,系統會先套用篩選器,再套用 SMT。這會造成下列影響:

  • 如果 SMT 變更訊息屬性,Pub/Sub 篩選器就不會套用至新屬性組合。
  • 您的 SMT 不會套用至 Pub/Sub 篩選器篩除的任何訊息。

如果 SMT 篩除訊息,請注意這會對監控訂閱待處理郵件造成的影響。如果您將訂閱項目提供至 Dataflow 管道,請勿使用 SMT 篩除訊息,因為這會中斷 Dataflow 的自動調整大小功能。

後續步驟