创建主题

在 Pub/Sub 中,主题是代表消息信息流的命名资源。您必须先创建一个主题,然后才能向其发布消息或订阅该主题。Pub/Sub 支持两种主题:标准主题和导入主题。

本文档介绍了如何创建 Pub/Sub 标准主题。如需详细了解导入主题以及如何创建导入主题,请参阅导入主题简介

如需创建主题,您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API。

准备工作

管理主题所需的角色和权限

如需获得创建和管理主题所需的权限,请让管理员向您授予主题或项目的 Pub/Sub Editor(roles/pubsub.editor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建和管理主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需创建和管理主题,需要具备以下权限:

  • 创建主题: pubsub.topics.create
  • 删除主题: pubsub.topics.delete
  • 将订阅与主题分离: pubsub.topics.detachSubscription
  • 获取主题: pubsub.topics.get
  • 列出某个主题: pubsub.topics.list
  • 发布到主题: pubsub.topics.publish
  • 更新主题: pubsub.topics.update
  • 获取主题的 IAM 政策: pubsub.topics.getIamPolicy
  • 为主题配置 IAM 政策 pubsub.topics.setIamPolicy

您也可以使用自定义角色或其他预定义角色来获取这些权限。

您可以在项目级别和个别资源级别配置访问权限控制。您可以在一个项目中创建订阅,并将其附加到位于其他项目中的主题。确保您对每个项目拥有所需的权限。

主题的属性

创建或更新主题时,您必须指定其属性。

添加默认订阅

向 Pub/Sub 主题添加默认订阅。 您可以在创建主题后为该主题创建其他订阅。默认订阅具有以下属性:

  • -sub 的订阅 ID
  • 拉取传送类型
  • 消息保留时长为 7 天
  • 在处于非活跃状态 31 天后过期
  • 确认时限为 10 秒
  • 立即重试政策

使用架构

架构是消息数据字段必须遵循的格式。架构是发布者和订阅者之间的合同,由 Pub/Sub 强制执行。主题架构有助于标准化消息类型和权限,以便组织中的不同团队能够使用这些消息类型和权限。Pub/Sub 会为消息类型和权限创建中央授权。如需创建带架构的主题,请参阅架构概览

启用注入

启用此属性后,您可以将来自外部来源的流式数据注入到主题中,以便使用 Google Cloud 的功能。如需创建用于提取数据的导入主题,请参阅以下内容:

启用消息保留功能

指定 Pub/Sub 主题在发布后保留消息的时间。超过消息保留时长后,Pub/Sub 可以随意舍弃消息,无论其确认状态为何。系统会对发布到主题的所有消息收取消息存储费用

  • 默认 = 未启用
  • 最小值 = 10 分钟
  • 最大值 = 31 天

将消息数据导出到 BigQuery 中

启用此属性后,您可以创建 BigQuery 订阅,以便在收到消息时将其写入现有 BigQuery 表。您无需配置单独的订阅方客户端。如需详细了解 BigQuery 订阅,请参阅 BigQuery 订阅

将消息数据导出到 Cloud Storage

启用此属性后,您可以创建 Cloud Storage 订阅,以便在收到消息时将其写入现有的 Cloud Storage 表。您无需配置单独的订阅方客户端。如需详细了解 Cloud Storage 订阅,请参阅 Cloud Storage 订阅

由 Google 管理的加密密钥

指定该主题是使用 Google 管理的加密密钥加密的。 默认情况下,Pub/Sub 使用 Google 管理的密钥加密消息,因此选择此选项可保持默认行为。Google 会自动处理密钥管理和轮替,确保您的消息始终受到最强大的加密保护。此选项无需进一步配置。如需详细了解 Google 管理的加密密钥,请参阅使用 Google 拥有且由 Google 管理的密钥的默认加密

Cloud KMS 密钥

指定主题是使用客户管理的加密密钥 (CMEK) 加密的还是未加密。默认情况下,Pub/Sub 使用 Google 拥有且由 Google 管理的密钥加密消息。如果您指定此选项,Pub/Sub 将通过 CMEK 使用信封加密模式。在此方法中,Cloud KMS 不会对消息进行加密。而是由 Cloud KMS 对 Pub/Sub 为每个主题创建的数据加密密钥 (DEK) 进行加密。Pub/Sub 会使用为主题生成的最新 DEK 对消息进行加密。在消息即将传送给订阅者之前,Pub/Sub 会对消息进行解密。如需详细了解如何创建密钥,请参阅配置消息加密

创建主题

您必须先创建一个主题,然后才能向其发布消息或订阅该主题。

控制台

如需创建主题,请按以下步骤操作:

  1. 在 Google Cloud 控制台中,转到 Pub/Sub 主题页面。

    转到“主题”

  2. 点击创建主题

  3. 主题 ID 字段中,输入主题 ID。 如需详细了解如何命名主题,请参阅命名准则

  4. 保留添加默认订阅选项。

  5. 可选。请勿选择其他选项。

  6. 点击创建主题

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 如需创建主题,请运行 gcloud pubsub topics create 命令:

    gcloud pubsub topics create TOPIC_ID

REST

如需创建主题,请使用 projects.topics.create 方法:

必须使用 Authorization 标头中的访问令牌对请求进行身份验证。如需获取当前应用默认凭据的访问令牌,请运行以下命令:gcloud auth application-default print-access-token

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID
Authorization: Bearer ACCESS_TOKEN
    

其中:

  • PROJECT_ID 是项目 ID。
  • TOPIC_ID 是主题 ID。
  • 回答:

    {
     "name": "projects/PROJECT_ID/topics/TOPIC_ID"
    }

    C++

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C++ 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C++ API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

    namespace pubsub = ::google::cloud::pubsub;
    namespace pubsub_admin = ::google::cloud::pubsub_admin;
    [](pubsub_admin::TopicAdminClient client, std::string project_id,
       std::string topic_id) {
      auto topic = client.CreateTopic(
          pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
      // Note that kAlreadyExists is a possible error when the library retries.
      if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
        std::cout << "The topic already exists\n";
        return;
      }
      if (!topic) throw std::move(topic).status();
    
      std::cout << "The topic was successfully created: " << topic->DebugString()
                << "\n";
    }

    C#

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

    
    using Google.Cloud.PubSub.V1;
    using Grpc.Core;
    using System;
    
    public class CreateTopicSample
    {
        public Topic CreateTopic(string projectId, string topicId)
        {
            PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
            var topicName = TopicName.FromProjectTopic(projectId, topicId);
            Topic topic = null;
    
            try
            {
                topic = publisher.CreateTopic(topicName);
                Console.WriteLine($"Topic {topic.Name} created.");
            }
            catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
            {
                Console.WriteLine($"Topic {topicName} already exists.");
            }
            return topic;
        }
    }

    Go

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/pubsub"
    )
    
    func create(w io.Writer, projectID, topicID string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    	defer client.Close()
    
    	t, err := client.CreateTopic(ctx, topicID)
    	if err != nil {
    		return fmt.Errorf("CreateTopic: %w", err)
    	}
    	fmt.Fprintf(w, "Topic created: %v\n", t)
    	return nil
    }
    

    Java

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

    
    import com.google.cloud.pubsub.v1.TopicAdminClient;
    import com.google.pubsub.v1.Topic;
    import com.google.pubsub.v1.TopicName;
    import java.io.IOException;
    
    public class CreateTopicExample {
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String topicId = "your-topic-id";
    
        createTopicExample(projectId, topicId);
      }
    
      public static void createTopicExample(String projectId, String topicId) throws IOException {
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
          TopicName topicName = TopicName.of(projectId, topicId);
          Topic topic = topicAdminClient.createTopic(topicName);
          System.out.println("Created topic: " + topic.getName());
        }
      }
    }

    Node.js

    /**
     * TODO(developer): Uncomment this variable before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function createTopic(topicNameOrId) {
      // Creates a new topic
      await pubSubClient.createTopic(topicNameOrId);
      console.log(`Topic ${topicNameOrId} created.`);
    }

    Node.js

    /**
     * TODO(developer): Uncomment this variable before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    import {PubSub} from '@google-cloud/pubsub';
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function createTopic(topicNameOrId: string) {
      // Creates a new topic
      await pubSubClient.createTopic(topicNameOrId);
      console.log(`Topic ${topicNameOrId} created.`);
    }

    PHP

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 PHP 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub PHP API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

    use Google\Cloud\PubSub\PubSubClient;
    
    /**
     * Creates a Pub/Sub topic.
     *
     * @param string $projectId  The Google project ID.
     * @param string $topicName  The Pub/Sub topic name.
     */
    function create_topic($projectId, $topicName)
    {
        $pubsub = new PubSubClient([
            'projectId' => $projectId,
        ]);
        $topic = $pubsub->createTopic($topicName);
    
        printf('Topic created: %s' . PHP_EOL, $topic->name());
    }

    Python

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

    from google.cloud import pubsub_v1
    
    # TODO(developer)
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    
    topic = publisher.create_topic(request={"name": topic_path})
    
    print(f"Created topic: {topic.name}")

    Ruby

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

    # topic_id = "your-topic-id"
    
    pubsub = Google::Cloud::Pubsub.new
    
    topic = pubsub.create_topic topic_id
    
    puts "Topic #{topic.name} created."

    组织政策限制条件

    组织政策可以限制主题创建,例如,政策可以限制在 Compute Engine 区域中存储消息。为避免出现主题创建错误,请在创建主题之前检查并根据需要更新组织政策。

    如果您的项目是新创建的,请等待几分钟,让组织政策初始化,然后再创建主题。

    转到组织政策

    如需了解详情,请参阅配置消息存储政策

    后续步骤