创建主题

在 Pub/Sub 中,主题是代表消息信息流的命名资源。您必须先创建一个主题,然后才能向其发布消息或订阅该主题。Pub/Sub 支持两种类型的主题:标准主题和导入主题。

本文档介绍如何创建 Pub/Sub 标准主题。如果您想详细了解导入主题以及如何创建导入主题,请参阅关于导入主题

如需创建主题,您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API。

准备工作

所需的角色和权限

如需获得创建主题所需的权限,请让您的管理员为您授予项目的 Pub/Sub Editor(roles/pubsub.editor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

创建主题需要以下权限:

  • 授予此权限可在项目上创建主题: pubsub.topics.create

您也可以使用自定义角色或其他预定义角色来获取这些权限。

您可以在项目级层和个别资源级层配置访问权限控制。您可以在一个项目中创建订阅,并将其附加到位于另一个项目中的主题。确保您拥有每个项目所需的权限。

主题的属性

创建或更新主题时,您必须指定其属性。

添加默认订阅

向 Pub/Sub 主题添加默认订阅。 创建主题后,您可以为该主题创建其他订阅。 默认订阅具有以下属性:

  • -sub 的订阅 ID
  • 拉取传送类型
  • 消息保留时长为 7 天
  • 在处于非活跃状态 31 天后过期
  • 10 秒的确认时限
  • 立即重试政策

使用架构

架构是消息数据字段必须遵循的格式。 架构是发布者和订阅者之间的一份合同,Pub/Sub 会强制执行该合同。主题架构有助于标准化消息类型和权限,以便组织中的不同团队使用这些消息。Pub/Sub 会为消息类型和权限创建中央授权。如需创建具有架构的主题,请参阅架构概览

启用注入

启用此属性后,您可以将来自外部来源的流式数据注入到主题中,以便使用 Google Cloud的功能。如需创建用于提取的导入主题,请参阅以下内容:

启用消息保留功能

指定 Pub/Sub 主题在发布后保留消息的时间。超过消息保留时长之后,Pub/Sub 可以随意舍弃消息,无论其确认状态为何。会对发布到主题的所有消息收取消息存储费用

  • 默认值 = 未启用
  • 最小值 = 10 分钟
  • 最大值 = 31 天

将消息数据导出到 BigQuery 中

启用此属性后,您可以创建 BigQuery 订阅,以便在收到消息时将其写入现有 BigQuery 表。您无需配置单独的订阅者客户端。如需详细了解 BigQuery 订阅,请参阅 BigQuery 订阅

将消息数据备份到 Cloud Storage

启用此属性后,您可以创建 Cloud Storage 订阅,以便在收到消息时将其写入现有的 Cloud Storage 表。您无需配置单独的订阅者客户端。如需详细了解 Cloud Storage 订阅,请参阅 Cloud Storage 订阅

转换

借助主题 SMT,您可以在 Pub/Sub 中直接对消息数据和属性进行轻量级修改。此功能可在消息发布到主题之前实现数据清理、过滤或格式转换。

如需详细了解 SMT,请参阅 SMT 概览

Google-owned and Google-managed encryption key

指定主题使用Google-owned and Google-managed encryption keys加密。默认情况下,Pub/Sub 使用 Google-owned and Google-managed encryption keys 加密消息,因此选择此选项可保持默认行为。Google 会自动处理密钥管理和轮换,确保您的消息始终受到最强大的加密保护。此选项无需进一步配置。 如需详细了解 Google-owned and Google-managed encryption keys,请参阅使用 Google-owned and Google-managed encryption keys进行默认加密

Cloud KMS 密钥

指定主题是否使用客户管理的加密密钥 (CMEK) 进行加密。Pub/Sub 默认使用 Google-owned and Google-managed encryption keys 加密消息。如果您指定此选项,Pub/Sub 将通过 CMEK 使用信封加密模式。在此方法中,Cloud KMS 不会对消息进行加密。Cloud KMS 会对 Pub/Sub 为每个主题创建的数据加密密钥 (DEK) 进行加密。Pub/Sub 会使用为相应主题生成的最新 DEK 对消息进行加密。Pub/Sub 会在消息即将传送给订阅者之前对其进行解密。如需详细了解如何创建密钥,请参阅配置消息加密

创建主题

您必须先创建一个主题,然后才能向其发布消息或订阅该主题。

控制台

如需创建主题,请按以下步骤操作:

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 创建主题页面。

    前往“创建主题”

  2. 主题 ID 字段中,输入主题 ID。 如需详细了解如何命名主题,请参阅命名准则

  3. 保留添加默认订阅选项。

  4. 可选。请勿选择其他选项。

  5. 点击创建主题

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 如需创建主题,请运行 gcloud pubsub topics create 命令:

    gcloud pubsub topics create TOPIC_ID

REST

如需创建主题,请使用 projects.topics.create 方法:

必须使用 Authorization 标头中的访问令牌对请求进行身份验证。如需获取当前应用默认凭据的访问令牌,请运行以下命令:gcloud auth application-default print-access-token

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID
Authorization: Bearer ACCESS_TOKEN
    

其中:

  • PROJECT_ID 是项目 ID。
  • TOPIC_ID 是主题 ID。
  • 回答:

    {
     "name": "projects/PROJECT_ID/topics/TOPIC_ID"
    }

    C++

    在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 C++ 设置说明执行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

    namespace pubsub = ::google::cloud::pubsub;
    namespace pubsub_admin = ::google::cloud::pubsub_admin;
    [](pubsub_admin::TopicAdminClient client, std::string project_id,
       std::string topic_id) {
      auto topic = client.CreateTopic(
          pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
      // Note that kAlreadyExists is a possible error when the library retries.
      if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
        std::cout << "The topic already exists\n";
        return;
      }
      if (!topic) throw std::move(topic).status();
    
      std::cout << "The topic was successfully created: " << topic->DebugString()
                << "\n";
    }

    C#

    在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 C# 设置说明执行操作。如需了解详情,请参阅 Pub/Sub C# API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

    
    using Google.Cloud.PubSub.V1;
    using Grpc.Core;
    using System;
    
    public class CreateTopicSample
    {
        public Topic CreateTopic(string projectId, string topicId)
        {
            PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
            var topicName = TopicName.FromProjectTopic(projectId, topicId);
            Topic topic = null;
    
            try
            {
                topic = publisher.CreateTopic(topicName);
                Console.WriteLine($"Topic {topic.Name} created.");
            }
            catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
            {
                Console.WriteLine($"Topic {topicName} already exists.");
            }
            return topic;
        }
    }

    Go

    在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 Go 设置说明执行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/pubsub/v2"
    	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
    )
    
    func create(w io.Writer, projectID, topicID string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    	defer client.Close()
    
    	topic := &pubsubpb.Topic{
    		Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
    	}
    	t, err := client.TopicAdminClient.CreateTopic(ctx, topic)
    	if err != nil {
    		return fmt.Errorf("CreateTopic: %w", err)
    	}
    	fmt.Fprintf(w, "Topic created: %v\n", t)
    	return nil
    }
    

    Java

    在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 Java 设置说明执行操作。如需了解详情,请参阅 Pub/Sub Java API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

    
    import com.google.cloud.pubsub.v1.TopicAdminClient;
    import com.google.pubsub.v1.Topic;
    import com.google.pubsub.v1.TopicName;
    import java.io.IOException;
    
    public class CreateTopicExample {
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String topicId = "your-topic-id";
    
        createTopicExample(projectId, topicId);
      }
    
      public static void createTopicExample(String projectId, String topicId) throws IOException {
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
          TopicName topicName = TopicName.of(projectId, topicId);
          Topic topic = topicAdminClient.createTopic(topicName);
          System.out.println("Created topic: " + topic.getName());
        }
      }
    }

    Node.js

    /**
     * TODO(developer): Uncomment this variable before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function createTopic(topicNameOrId) {
      // Creates a new topic
      await pubSubClient.createTopic(topicNameOrId);
      console.log(`Topic ${topicNameOrId} created.`);
    }

    Node.js

    /**
     * TODO(developer): Uncomment this variable before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    import {PubSub} from '@google-cloud/pubsub';
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function createTopic(topicNameOrId: string) {
      // Creates a new topic
      await pubSubClient.createTopic(topicNameOrId);
      console.log(`Topic ${topicNameOrId} created.`);
    }

    PHP

    在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 PHP 设置说明执行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

    use Google\Cloud\PubSub\PubSubClient;
    
    /**
     * Creates a Pub/Sub topic.
     *
     * @param string $projectId  The Google project ID.
     * @param string $topicName  The Pub/Sub topic name.
     */
    function create_topic($projectId, $topicName)
    {
        $pubsub = new PubSubClient([
            'projectId' => $projectId,
        ]);
        $topic = $pubsub->createTopic($topicName);
    
        printf('Topic created: %s' . PHP_EOL, $topic->name());
    }

    Python

    在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 Python 设置说明执行操作。如需了解详情,请参阅 Pub/Sub Python API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

    from google.cloud import pubsub_v1
    
    # TODO(developer)
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    
    topic = publisher.create_topic(request={"name": topic_path})
    
    print(f"Created topic: {topic.name}")

    Ruby

    在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 Ruby 设置说明执行操作。如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

    # topic_id = "your-topic-id"
    
    pubsub = Google::Cloud::Pubsub.new
    
    topic = pubsub.create_topic topic_id
    
    puts "Topic #{topic.name} created."

    组织政策限制条件

    组织政策可以限制主题创建,例如,某项政策可以限制在 Compute Engine 区域中存储消息。为避免出现主题创建错误,请在创建主题之前检查并更新组织政策(如有必要)。

    如果您的项目是新创建的,请等待组织政策初始化完成后再创建主题。

    转到组织政策

    如需了解详情,请参阅配置消息存储政策

    后续步骤

    Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。