SMT を使用してサブスクリプションを作成する

このドキュメントでは、単一メッセージ変換(SMT)を使用して Pub/Sub サブスクリプションを作成する方法について説明します。

サブスクリプション SMT を使用すると、Pub/Sub 内でメッセージデータと属性を軽量に変更できます。この機能を使用すると、メッセージが定期購読クライアントに配信される前に、データのクリーニング、フィルタリング、形式変換を行うことができます。

SMT を使用してサブスクリプションを作成するには、 Google Cloud コンソール、Google Cloud CLI、クライアント ライブラリ、または Pub/Sub API を使用します。

始める前に

必要なロールと権限

SMT を使用してサブスクリプションを作成するために必要な権限を取得するには、プロジェクトに対する Pub/Sub 編集者 roles/pubsub.editor)IAM ロールを付与するように管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

この事前定義ロールには、SMT を使用してサブスクリプションを作成するために必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

SMT を使用してサブスクリプションを作成するには、次の権限が必要です。

  • プロジェクトに対するサブスクリプションの作成権限を付与します。 pubsub.subscriptions.create

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

定期購入の種類によっては、追加の権限が必要になる場合があります。必要な権限の正確なリストについては、特定の定期購入の作成について説明しているドキュメントをご覧ください。たとえば、SMT を使用して BigQuery サブスクリプションを作成する場合は、BigQuery サブスクリプションを作成するをご覧ください。

トピックとは異なるプロジェクトにサブスクリプションを作成する場合は、トピックを含むプロジェクトのサブスクリプションを含むプロジェクトのプリンシパルに roles/pubsub.subscriber ロールを付与する必要があります。

アクセス制御は、プロジェクト レベルと個々のリソースレベルで構成できます。

SMT を使用してサブスクリプションを作成する

SMT を使用して定期購入を作成する前に、定期購入のプロパティのドキュメントを確認してください。

次のサンプルでは、このユーザー定義関数(UDF)SMT を使用してサブスクリプションを作成することを前提としています。UDF の詳細については、UDF の概要をご覧ください。

function redactSSN(message, metadata) {
  const data = JSON.parse(message.data);
  delete data['ssn'];
  message.data = JSON.stringify(data);
  return message;
}

コンソール

  1. Google Cloud コンソールで、Pub/Sub の [サブスクリプション] ページに移動します。

    サブスクリプションに移動

  2. [サブスクリプションを作成] をクリックします。

    [サブスクリプションを作成] ページが開きます。

  3. [サブスクリプション ID] フィールドにサブスクリプションの ID を入力します。サブスクリプションの命名の詳細については、命名ガイドラインをご覧ください。

  4. [変換] で [変換を追加] をクリックします。

  5. 関数名を入力します。例: redactSSN

  6. サブスクリプションで SMT をすぐに使用しない場合は、[変換を無効にする] オプションをクリックします。SMT は保存されますが、メッセージがサブスクリプションを通過するときに実行されません。

  7. 新しい変換を入力します。次に例を示します。

    function redactSSN(message, metadata) {
      const data = JSON.parse(message.data);
      delete data['ssn'];
      message.data = JSON.stringify(data);
      return message;
    }
  8. Pub/Sub には、SMT を検証できる validate 関数があります。[検証] をクリックして変換を検証します。

  9. 別の変換を追加する場合は、[変換を追加] をクリックします。

  10. すべての SMT を特定の順序に並べ替えるには、上下矢印を使用します。SMT を削除するには、削除ボタンをクリックします。
  11. Pub/Sub には、サンプル メッセージで SMT の実行結果を確認できるテスト関数があります。SMT をテストするには、[変換をテスト] をクリックします。

  12. [変換をテスト] ウィンドウで、テストする関数を選択します。

  13. [メッセージを入力] ウィンドウで、サンプル メッセージを入力します。

  14. メッセージ属性を追加する場合は、[属性を追加する] をクリックして、1 つ以上の Key-Value ペアを入力します。

  15. [Test] をクリックします。メッセージに SMT を適用した結果が表示されます。

  16. ウィンドウを閉じて、サンプル メッセージでの SMT のテストを停止します。

  17. [作成] をクリックしてサブスクリプションを作成します。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Pub/Sub には、SMT を検証できる validate 関数があります。 gcloud pubsub message-transforms validate コマンドを実行します。

    gcloud pubsub message-transforms validate --message-transform-file=TRANSFORM_FILE

    次のように置き換えます。

    • TRANSFORM_FILE: 単一の SMT を含む YAML または JSON ファイルのパス。

      YAML 変換ファイルの例を次に示します。

      javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

  3. Pub/Sub には、サンプル メッセージに対して 1 つ以上の SMT を実行した結果を確認できるテスト関数があります。 gcloud pubsub message-transforms test コマンドを実行します。

    gcloud pubsub message-transforms test --message-transforms-file=TRANSFORMS_FILE

    次のように置き換えます。

    • TRANSFORMS_FILE: 1 つ以上の SMT を含む YAML または JSON ファイルのパス。

      YAML 変換ファイルの例を次に示します。

      - javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

  4. サブスクリプションを作成するには、 gcloud pubsub subscriptions create コマンドを実行します。

    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=TOPIC_NAME \
        --message-transforms-file=TRANSFORMS_FILE

    次のように置き換えます。

    • SUBSCRIPTION_ID: 作成するサブスクリプションの ID または名前。サブスクリプションの命名方法のガイドラインについては、リソース名をご覧ください。サブスクリプションの名前は変更できません。

    • TOPIC_NAME: サブスクライブするトピックの名前(projects/PROJECT_ID/topics/TOPIC_ID の形式)。

    • TRANSFORMS_FILE: 1 つ以上の SMT を含む YAML または JSON ファイルのパス。

      YAML 変換ファイルの例を次に示します。

      - javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

Java

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Java 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.JavaScriptUDF;
import com.google.pubsub.v1.MessageTransform;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithSmtExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithSmtExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithSmtExample(
      String projectId, String topicId, String subscriptionId) throws IOException {

    // UDF that removes the 'ssn' field, if present
    String code =
        "function redactSSN(message, metadata) {"
            + "  const data = JSON.parse(message.data);"
            + "  delete data['ssn'];"
            + "  message.data = JSON.stringify(data);"
            + "  return message;"
            + "}";
    String functionName = "redactSSN";

    JavaScriptUDF udf =
        JavaScriptUDF.newBuilder().setCode(code).setFunctionName(functionName).build();
    MessageTransform transform = MessageTransform.newBuilder().setJavascriptUdf(udf).build();

    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Add the UDF message transform
                  .addMessageTransforms(transform)
                  .build());

      System.out.println("Created subscription with SMT: " + subscription.getAllFields());
    }
  }
}

Python

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Python 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

from google.cloud import pubsub_v1
from google.pubsub_v1.types import JavaScriptUDF, MessageTransform

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

code = """function redactSSN(message, metadata) {
            const data = JSON.parse(message.data);
            delete data['ssn'];
            message.data = JSON.stringify(data);
            return message;
            }"""
udf = JavaScriptUDF(code=code, function_name="redactSSN")
transforms = [MessageTransform(javascript_udf=udf)]

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "message_transforms": transforms,
        }
    )
    print(f"Created subscription with SMT: {subscription}")

Go

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Go 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package subscriptions

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

// createSubscriptionWithSMT creates a subscription with a single message transform function applied.
func createSubscriptionWithSMT(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	code := `function redactSSN(message, metadata) {
			const data = JSON.parse(message.data);
			delete data['ssn'];
			message.data = JSON.stringify(data);
			return message;
		}`
	transform := pubsub.MessageTransform{
		Transform: pubsub.JavaScriptUDF{
			FunctionName: "redactSSN",
			Code:         code,
		},
	}
	cfg := pubsub.SubscriptionConfig{
		Topic:             topic,
		MessageTransforms: []pubsub.MessageTransform{transform},
	}
	sub, err := client.CreateSubscription(ctx, subID, cfg)
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription with message transform: %v\n", sub)
	return nil
}

SMT と他の定期購入機能の連携

サブスクリプションで SMT と Pub/Sub の組み込みフィルタの両方を使用する場合、フィルタは SMT の前に適用されます。結果として次のようになります。

  • SMT がメッセージ属性を変更した場合、Pub/Sub フィルタは新しい属性セットには適用されません。
  • Pub/Sub フィルタによって除外されたメッセージには SMT は適用されません。

SMT でメッセージがフィルタされる場合は、サブスクリプションのバックログのモニタリングへの影響に注意してください。サブスクリプションを Dataflow パイプラインにフィードする場合は、SMT を使用してメッセージを除外しないでください。これは、Dataflow の自動スケーリングを中断します。

次のステップ