将架构与主题相关联

本文档介绍了如何为 Pub/Sub 主题关联架构。

准备工作

所需的角色和权限

如需获得关联和管理架构所需的权限,请让您的管理员为您授予项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含关联和管理架构所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需关联和管理架构,需要具备以下权限:

  • 创建架构: pubsub.schemas.create
  • 将架构附加到主题: pubsub.schemas.attach
  • 提交架构修订版本: pubsub.schemas.commit
  • 删除架构或架构修订版本: pubsub.schemas.delete
  • 获取架构或架构修订版本: pubsub.schemas.get
  • 列表架构: pubsub.schemas.list
  • 列出架构修订版本: pubsub.schemas.listRevisions
  • 回滚架构: pubsub.schemas.rollback
  • 验证消息: pubsub.schemas.validate
  • 获取架构的 IAM 政策: pubsub.schemas.getIamPolicy
  • 为架构配置 IAM 政策 pubsub.schemas.setIamPolicy

您也可以使用自定义角色或其他预定义角色来获取这些权限。

您可以向主账号(例如用户、群组、网域或服务账号)授予角色和权限。您可以在一个项目中创建架构,并将其附加到位于其他项目中的主题。确保您对每个项目拥有所需的权限。

将架构与主题相关联的准则

您可以在创建或修改主题时将架构与主题相关联。以下是将架构与主题相关联的准则:

  • 您可以将架构与一个或多个主题相关联。

    架构与主题关联后,该主题从发布者收到的每条消息都必须遵循该架构。

  • 将架构与主题相关联时,您还必须指定要发布的消息的编码(BINARYJSON)。如果将 JSON 与 Avro 架构搭配使用,请仔细注意联合编码规则

  • 如果与主题关联的架构有修订版本,则消息必须与编码匹配,并根据可用范围内的修订版本进行验证。如果未通过验证,消息将无法发布。

    系统会按创建时间的倒序尝试修订版本。如需创建架构修订版本,请参阅提交架构修订版本

消息架构的验证逻辑

将架构与主题相关联时,如果架构有修订版本,您可以指定要使用的修订版本子集范围。如果您未指定范围,则系统会使用整个范围进行验证。

如果您未指定允许的第一个修订版本,则系统会使用架构最早的现有修订版本进行验证。如果您未将某个修订版本指定为允许的上一个修订版本,则系统会使用架构的最新现有修订版本。

我们以架构 S 为例,该架构附加到主题 T 中。

架构 S 具有按顺序创建的修订 ID ABCD,其中 A 是第一个或最早的修订版本。所有架构都不相同,也不是现有架构的回滚。

  • 如果您仅将允许的第一个修订版本字段设置为 B,则系统会拒绝仅符合架构 A 的消息,而接受符合架构 BCD 的消息。

  • 如果您仅将允许的最后修订版本字段设置为 C,则系统会接受符合架构 ABC 的消息,并拒绝仅符合架构 D 的消息。

  • 如果您将允许的第一个修订版本字段设置为 B,并将允许的最后一个修订版本字段设置为 C,则系统会接受符合架构 BC 的消息。

  • 您还可以将第一个修订版本和最后一个修订版本设置为相同的修订版本 ID。在这种情况下,系统只接受符合该修订版的消息。

在创建主题时创建并关联架构

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库创建带架构的主题。

控制台

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 主题页面。

    打开“主题”

  2. 点击创建主题

  3. 主题 ID 字段中,输入主题 ID。

    如需为主题命名,请参阅准则

  4. 选中使用架构复选框。

    保留其余字段的默认设置。

    您可以创建架构,也可以使用现有架构。

  5. 如果您要创建架构,请按以下步骤操作: `

    1. 选择 Pub/Sub 架构部分,选择创建新架构

    系统会在辅助标签页中显示创建架构页面。

    请按照创建架构中的步骤操作。

    1. 返回创建主题标签页,然后点击刷新

    2. 选择 Pub/Sub 架构字段中,搜索您的架构。

    3. 选择消息编码为 JSON二进制

    您刚刚创建的架构具有修订版本 ID。您可以创建其他架构修订版本,如提交架构修订版本中所述。

  6. 如果您要关联已创建的架构,请按以下步骤操作:

    1. 选择 Pub/Sub 架构部分,选择一个现有架构。

    2. 选择消息编码为 JSON二进制

  7. 可选:如果所选架构有修订版本,请针对修订版本范围使用允许的第一个修订版本允许的最后一个修订版本下拉菜单。

根据您的要求,您可以同时指定这两个字段、只指定其中一个字段,也可以保留默认设置。

  1. 保留其余字段的默认设置。

  2. 点击创建以保存主题,并将其分配给所选架构。

gcloud

如需创建分配了先前所创建架构的主题,请运行 gcloud pubsub topics create 命令:

gcloud pubsub topics create TOPIC_ID \
        --message-encoding=ENCODING_TYPE \
        --schema=SCHEMA_ID \
        --first-revision-id=FIRST_REVISION_ID \
        --last-revision-id=LAST_REVISION_ID \

其中:

  • TOPIC_ID 是您要创建的主题的 ID。
  • ENCODING_TYPE 是根据架构验证过的消息的编码。此值必须设置为 JSONBINARY
  • SCHEMA_ID 是现有架构的 ID。
  • FIRST_REVISION_ID 是用于进行验证的最早修订版本的 ID。
  • LAST_REVISION_ID 是用于进行验证的最新修订版本的 ID。

--first-revision-id--last-revision-id 是可选的。

您也可以从其他 Google Cloud 项目分配架构:

gcloud pubsub topics create TOPIC_ID \
        --message-encoding=ENCODING_TYPE \
        --schema=SCHEMA_ID \
        --schema-project=SCHEMA_PROJECT \
        --project=TOPIC_PROJECT

其中:

  • SCHEMA_PROJECT 是架构的 Google Cloud 项目的项目 ID。
  • TOPIC_PROJECT 是主题的 Google Cloud 项目的项目 ID。

REST

如需创建主题,请使用 projects.topics.create 方法:

请求:

必须使用 Authorization 标头中的访问令牌对请求进行身份验证。如需获取当前应用默认凭据的访问令牌,请运行以下命令:gcloud auth application-default print-access-token

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID
Authorization: Bearer ACCESS_TOKEN

请求正文:

{
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
  }
}

其中:

  • PROJECT_ID 是项目 ID。
  • TOPIC_ID 是主题 ID。
  • SCHEMA_NAME 是应该根据其验证发布消息的架构的名称。格式为:projects/PROJECT_ID/schemas/SCHEMA_ID
  • ENCODING_TYPE 是根据架构验证过的消息的编码。其必须设置为 JSONBINARY
  • FIRST_REVISION_ID 是要验证的旧版 ID。
  • LAST_REVISION_ID 是用于进行验证的最新修订版本的 ID。

firstRevisionIdlastRevisionId 是可选的。

回答:

{
  "name": "projects/PROJECT_ID/topics/TOPIC_ID",
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
  }
}

如果请求中未提供 firstRevisionIdlastRevisionId,则会同时省略这两个值。

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string schema_id, std::string const& encoding) {
  google::pubsub::v1::Topic request;
  request.set_name(pubsub::Topic(project_id, std::move(topic_id)).FullName());
  request.mutable_schema_settings()->set_schema(
      pubsub::Schema(std::move(project_id), std::move(schema_id)).FullName());
  request.mutable_schema_settings()->set_encoding(
      encoding == "JSON" ? google::pubsub::v1::JSON
                         : google::pubsub::v1::BINARY);
  auto topic = client.CreateTopic(request);

  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;

public class CreateTopicWithSchemaSample
{
    public Topic CreateTopicWithSchema(string projectId, string topicId, string schemaId, Encoding encoding)
    {
        PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        Topic topic = new Topic
        {
            TopicName = topicName,
            SchemaSettings = new SchemaSettings
            {
                SchemaAsSchemaName = SchemaName.FromProjectSchema(projectId, schemaId),
                Encoding = encoding
            }
        };

        Topic receivedTopic = null;
        try
        {
            receivedTopic = publisher.CreateTopic(topic);
            Console.WriteLine($"Topic {topic.Name} created.");
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            Console.WriteLine($"Topic {topicName} already exists.");
        }
        return receivedTopic;
    }
}

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createTopicWithSchemaRevisions(w io.Writer, projectID, topicID, schemaID, firstRevisionID, lastRevisionID string, encoding pubsub.SchemaEncoding) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// schemaID := "my-schema-id"
	// firstRevisionID := "my-revision-id"
	// lastRevisionID := "my-revision-id"
	// encoding := pubsub.EncodingJSON
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	tc := &pubsub.TopicConfig{
		SchemaSettings: &pubsub.SchemaSettings{
			Schema:          fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
			FirstRevisionID: firstRevisionID,
			LastRevisionID:  lastRevisionID,
			Encoding:        encoding,
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, tc)
	if err != nil {
		return fmt.Errorf("CreateTopicWithConfig: %w", err)
	}
	fmt.Fprintf(w, "Created topic with schema revision: %#v\n", t)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档


import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.SchemaName;
import com.google.pubsub.v1.SchemaSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithSchemaRevisionsExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Use an existing schema.
    String schemaId = "your-schema-id";
    // Choose either BINARY or JSON message serialization in this topic.
    Encoding encoding = Encoding.BINARY;
    // Set the minimum and maximum revsion ID
    String firstRevisionId = "your-revision-id";
    String lastRevisionId = "your-revision-id";

    createTopicWithSchemaRevisionsExample(
        projectId, topicId, schemaId, firstRevisionId, lastRevisionId, encoding);
  }

  public static void createTopicWithSchemaRevisionsExample(
      String projectId,
      String topicId,
      String schemaId,
      String firstRevisionid,
      String lastRevisionId,
      Encoding encoding)
      throws IOException {
    TopicName topicName = TopicName.of(projectId, topicId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    SchemaSettings schemaSettings =
        SchemaSettings.newBuilder()
            .setSchema(schemaName.toString())
            .setFirstRevisionId(firstRevisionid)
            .setLastRevisionId(lastRevisionId)
            .setEncoding(encoding)
            .build();

    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setSchemaSettings(schemaSettings)
                  .build());

      System.out.println("Created topic with schema: " + topic.getName());
    } catch (AlreadyExistsException e) {
      System.out.println(schemaName + "already exists.");
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const schemaName = 'YOUR_SCHEMA_NAME_OR_ID';
// const encodingType = 'BINARY';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithSchema(
  topicNameOrId,
  schemaNameOrId,
  encodingType
) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const fullName = await schema.getName();

  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    schemaSettings: {
      schema: fullName,
      encoding: encodingType,
    },
  });
  console.log(`Topic ${topicNameOrId} created with schema ${fullName}.`);
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const schemaName = 'YOUR_SCHEMA_NAME_OR_ID';
// const encodingType = 'BINARY';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithSchema(
  topicNameOrId: string,
  schemaNameOrId: string,
  encodingType: 'BINARY' | 'JSON'
) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const fullName = await schema.getName();

  // Creates a new topic with a schema. Note that you might also
  // pass Encodings.Json or Encodings.Binary here.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    schemaSettings: {
      schema: fullName,
      encoding: encodingType,
    },
  });
  console.log(`Topic ${topicNameOrId} created with schema ${fullName}.`);
}

PHP

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\Schema;

/**
 * Create a topic with a schema.
 *
 * @param string $projectId
 * @param string $topicId
 * @param string $schemaId
 * @param string $encoding
 */
function create_topic_with_schema($projectId, $topicId, $schemaId, $encoding)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $schema = $pubsub->schema($schemaId);

    $topic = $pubsub->createTopic($topicId, [
        'schemaSettings' => [
            // The schema may be provided as an instance of the schema type,
            // or by using the schema ID directly.
            'schema' => $schema,
            // Encoding may be either `BINARY` or `JSON`.
            // Provide a string or a constant from Google\Cloud\PubSub\V1\Encoding.
            'encoding' => $encoding,
        ]
    ]);

    printf('Topic %s created', $topic->name());
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.api_core.exceptions import AlreadyExists, InvalidArgument
from google.cloud.pubsub import PublisherClient, SchemaServiceClient
from google.pubsub_v1.types import Encoding

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# schema_id = "your-schema-id"
# first_revision_id = "your-revision-id"
# last_revision_id = "your-revision-id"
# Choose either BINARY or JSON as valid message encoding in this topic.
# message_encoding = "BINARY"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)

if message_encoding == "BINARY":
    encoding = Encoding.BINARY
elif message_encoding == "JSON":
    encoding = Encoding.JSON
else:
    encoding = Encoding.ENCODING_UNSPECIFIED

try:
    response = publisher_client.create_topic(
        request={
            "name": topic_path,
            "schema_settings": {
                "schema": schema_path,
                "encoding": encoding,
                "first_revision_id": first_revision_id,
                "last_revision_id": last_revision_id,
            },
        }
    )
    print(f"Created a topic:\n{response}")

except AlreadyExists:
    print(f"{topic_id} already exists.")
except InvalidArgument:
    print("Please choose either BINARY or JSON as a valid message encoding type.")

Ruby

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

# topic_id = "your-topic-id"
# schema_id = "your-schema-id"
# Choose either BINARY or JSON as valid message encoding in this topic.
# message_encoding = :binary

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.create_topic topic_id, schema_name: schema_id, message_encoding: message_encoding

puts "Topic #{topic.name} created."

修改与主题关联的架构

您可以修改主题以附加架构、移除架构,或更新用于验证消息的修订版本范围。一般来说,如果您计划对所用架构进行更改,可以提交新修订版本,并更新为主题使用的修订版本范围。

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库修改与主题关联的架构。

控制台

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 主题页面。

    打开“主题”

  2. 点击某个主题的主题 ID

  3. 在主题详情页面中,点击修改

  4. 您可以对架构进行以下更改。

    更改可能需要几分钟时间才能生效。

    • 如果您想从主题中移除架构,请在修改主题页面中,取消选中使用架构复选框。

    • 如果您想更改架构,请在架构部分中选择架构的名称。

    根据需要更新其他字段。

    • 如果您想更新修订版本范围,请使用修订版本范围允许的第一个修订版本允许的最后一个修订版本下拉菜单。

    根据您的要求,您可以同时指定这两个字段、只指定其中一个字段,也可以保留默认设置。

  5. 点击更新以保存更改。

gcloud

gcloud pubsub topics update TOPIC_ID \
        --message-encoding=ENCODING_TYPE \
        --schema=SCHEMA_NAME \
        --first-revision-id=FIRST_REVISION_ID \
        --last-revision-id=LAST_REVISION_ID \

其中:

  • TOPIC_ID 是您要创建的主题的 ID。
  • ENCODING_TYPE 是根据架构验证过的消息的编码。此值必须设置为 JSONBINARY
  • SCHEMA_NAME 是现有架构的名称。
  • FIRST_REVISION_ID 是用于进行验证的最早修订版本的 ID。
  • LAST_REVISION_ID 是用于进行验证的最新修订版本的 ID。

--first-revision-id--last-revision-id 是可选的。

REST

如需更新主题,请使用 projects.topics.patch 方法:

请求:

必须使用 Authorization 标头中的访问令牌对请求进行身份验证。如需获取当前应用默认凭据的访问令牌,请运行以下命令:gcloud auth application-default print-access-token

PATCH https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID
Authorization: Bearer ACCESS_TOKEN

请求正文:

{
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
    "update_mask":
  }
}

其中:

  • PROJECT_ID 是项目 ID。
  • TOPIC_ID 是主题 ID。
  • SCHEMA_NAME 是应该根据其验证发布消息的架构的名称。格式为:projects/PROJECT_ID/schemas/SCHEMA_ID
  • ENCODING_TYPE 是根据架构验证过的消息的编码。其必须设置为 JSONBINARY
  • FIRST_REVISION_ID 是要验证的旧版 ID。
  • LAST_REVISION_ID 是用于进行验证的最新修订版本的 ID。

firstRevisionIdlastRevisionId 是可选的。

回答:

{
  "name": "projects/PROJECT_ID/topics/TOPIC_ID",
  "schemaSettings": {
    "schema": "SCHEMA_NAME",
    "encoding": "ENCODING_TYPE"
    "firstRevisionId": "FIRST_REVISION_ID"
    "lastRevisionId": "LAST_REVISION_ID"
  }
}

firstRevisionIdlastRevisionId 在更新后均未设置。

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string const& first_revision_id,
   std::string const& last_revision_id) {
  google::pubsub::v1::UpdateTopicRequest request;
  auto* request_topic = request.mutable_topic();
  request_topic->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  request_topic->mutable_schema_settings()->set_first_revision_id(
      first_revision_id);
  request_topic->mutable_schema_settings()->set_last_revision_id(
      last_revision_id);
  *request.mutable_update_mask()->add_paths() =
      "schema_settings.first_revision_id";
  *request.mutable_update_mask()->add_paths() =
      "schema_settings.last_revision_id";
  auto topic = client.UpdateTopic(request);

  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func updateTopicSchema(w io.Writer, projectID, topicID, firstRevisionID, lastRevisionID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// firstRevisionID := "my-revision-id"
	// lastRevisionID := "my-revision-id"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	t := client.Topic(topicID)

	// This updates the first / last revision ID for the topic's schema.
	// To clear the schema entirely, use a zero valued (empty) SchemaSettings.
	tc := pubsub.TopicConfigToUpdate{
		SchemaSettings: &pubsub.SchemaSettings{
			FirstRevisionID: firstRevisionID,
			LastRevisionID:  lastRevisionID,
		},
	}

	gotTopicCfg, err := t.Update(ctx, tc)
	if err != nil {
		fmt.Fprintf(w, "topic.Update err: %v\n", gotTopicCfg)
		return err
	}
	fmt.Fprintf(w, "Updated topic with schema: %#v\n", gotTopicCfg)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.SchemaSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicSchemaExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // This is an existing topic that has schema settings attached to it.
    String topicId = "your-topic-id";
    // Set the minimum and maximum revsion ID
    String firstRevisionId = "your-revision-id";
    String lastRevisionId = "your-revision-id";

    UpdateTopicSchemaExample.updateTopicSchemaExample(
        projectId, topicId, firstRevisionId, lastRevisionId);
  }

  public static void updateTopicSchemaExample(
      String projectId, String topicId, String firstRevisionid, String lastRevisionId)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {

      TopicName topicName = TopicName.of(projectId, topicId);

      // Construct the schema settings with the changes you want to make.
      SchemaSettings schemaSettings =
          SchemaSettings.newBuilder()
              .setFirstRevisionId(firstRevisionid)
              .setLastRevisionId(lastRevisionId)
              .build();

      // Construct the topic with the schema settings you want to change.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setSchemaSettings(schemaSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder()
              .addPaths("schema_settings.first_revision_id")
              .addPaths("schema_settings.last_revision_id")
              .build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println("Updated topic with schema: " + topic.getName());
    }
  }
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.api_core.exceptions import InvalidArgument, NotFound
from google.cloud.pubsub import PublisherClient

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# first_revision_id = "your-revision-id"
# last_revision_id = "your-revision-id"

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

try:
    response = publisher_client.update_topic(
        request={
            "topic": {
                "name": topic_path,
                "schema_settings": {
                    "first_revision_id": first_revision_id,
                    "last_revision_id": last_revision_id,
                },
            },
            "update_mask": "schemaSettings.firstRevisionId,schemaSettings.lastRevisionId",
        }
    )
    print(f"Updated a topic schema:\n{response}")

except NotFound:
    print(f"{topic_id} not found.")
except InvalidArgument:
    print("Schema settings are not valid.")
0

后续步骤