更改主题类型

您可以将导入主题转换为标准主题,反之亦然。

将导入的主题转换为标准主题

如需将导入的主题转换为标准主题,请清除提取设置。执行以下步骤:

控制台

  1. 在 Google Cloud 控制台中,前往主题页面。

    打开“主题”

  2. 点击“导入”主题。

  3. 在主题详情页面中,点击修改

  4. 取消选中启用提取选项。

  5. 点击更新

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud pubsub topics update 命令:

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    TOPIC_ID 替换为主题 ID。

将标准主题转换为 Amazon Kinesis Data Streams 导入主题

如需将标准主题转换为 Amazon Kinesis Data Streams 导入主题,请先检查您是否满足所有前提条件

控制台

  1. 在 Google Cloud 控制台中,前往主题页面。

    转到“主题”

  2. 点击您要转换为导入主题的主题。

  3. 在主题详情页面中,点击修改

  4. 选择启用提取选项。

  5. 对于提取来源,请选择 Amazon Kinesis Data Streams

  6. 输入以下详细信息:

    • Kinesis 数据流 ARN:您计划注入到 Pub/Sub 的 Kinesis Data Streams 的 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • Kinesis 使用方 ARN:已注册到 AWS Kinesis Data Streams 的使用方资源的 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • AWS 角色 ARN:AWS 角色的 ARN。角色的 ARN 格式如下:arn:aws:iam:${Account}:role/${RoleName}

    • 服务账号:您在在 Google Cloud 中创建服务账号中创建的服务账号。

  7. 点击更新

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 使用以下示例中提及的所有标志运行 gcloud pubsub topics update 命令:

    gcloud pubsub topics update TOPIC_ID \
         --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
         --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
         --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
         --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
      

    替换以下内容:

    • TOPIC_ID 是主题 ID 或名称。此字段无法更新。

    • KINESIS_STREAM_ARN 是您计划注入到 Pub/Sub 的 Kinesis Data Streams 的 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • KINESIS_CONSUMER_ARN 是已注册到 AWS Kinesis Data Streams 的使用方资源的 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • KINESIS_ROLE_ARN 是 AWS 角色的 ARN。该角色的 ARN 格式如下:arn:aws:iam:${Account}:role/${RoleName}

    • PUBSUB_SERVICE_ACCOUNT 是您在在 Google Cloud 中创建服务账号中创建的服务账号。

Go

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func updateTopicType(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	updateCfg := pubsub.TopicConfigToUpdate{
		// If wanting to clear ingestion settings, set this to zero value: &pubsub.IngestionDataSourceSettings{}
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAWSKinesis{
				StreamARN:         streamARN,
				ConsumerARN:       consumerARN,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpServiceAccount,
			},
		},
	}
	topicCfg, err := client.Topic(topicID).Update(ctx, updateCfg)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Node.js 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C++ 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C++ API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 Node.js 设置说明执行操作。 如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

如需详细了解 ARN,请参阅 Amazon 资源名称 (ARN)IAM 标识符

将标准主题转换为 Cloud Storage 导入主题

如需将标准主题转换为 Cloud Storage 导入主题,请先检查您是否满足所有前提条件

控制台

  1. 在 Google Cloud 控制台中,前往主题页面。

    转到“主题”

  2. 点击要转换为 Cloud Storage 导入主题的主题。

  3. 在主题详情页面中,点击修改

  4. 选择启用提取选项。

  5. 在“提取来源”中,选择 Google Cloud Storage

  6. 对于 Cloud Storage 存储桶,请点击浏览

    系统随即会打开选择存储桶页面。从下列选项中选择一项:

    • 从任何适当的项目中选择一个现有存储桶。

    • 点击“创建”图标,然后按照屏幕上的说明创建新存储桶。创建存储桶后,选择 Cloud Storage 导入主题对应的存储桶。

  7. 当您指定存储桶时,Pub/Sub 会检查 Pub/Sub 服务账号是否对该存储桶拥有适当的权限。如果存在权限问题,您会看到与权限相关的错误消息。

    如果您遇到权限问题,请点击设置权限。如需了解详情,请参阅 向 Pub/Sub 服务账号授予 Cloud Storage 权限

  8. 对于对象格式,请选择文本AvroPub/Sub Avro

    如果您选择文本,可以选择指定分隔符,以便将对象拆分为消息。

    如需详细了解这些选项,请参阅输入格式

  9. 可选。您可以为主题指定创建对象的最短时间。如果设置,则系统只会提取在最短对象创建时间之后创建的对象。

    如需了解详情,请参阅 创建对象的最短时间

  10. 您必须指定 Glob 模式。如需注入存储桶中的所有对象,请使用 ** 作为 glob 模式。仅注入与给定模式相匹配的对象。

    如需了解详情,请参阅 匹配全局通配模式

  11. 保留其他默认设置。
  12. 点击更新主题

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 为避免丢失导入主题的设置,请务必在每次更新主题时添加所有设置。如果您省略了某些内容,Pub/Sub 会将设置重置为原始默认值。

    使用以下示例中提及的所有标志运行 gcloud pubsub topics update 命令:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    替换以下内容:

    • TOPIC_ID 是主题 ID 或名称。此字段无法更新。

    • BUCKET_NAME:指定现有存储桶的名称。例如 prod_bucket。 存储桶名称不得包含项目 ID。如需创建存储桶,请参阅创建存储桶

    • INPUT_FORMAT:指定要提取的对象的格式。可以是 textavropubsub_avro。如需详细了解这些选项,请参阅输入格式

    • TEXT_DELIMITER:指定用于将文本对象拆分为 Pub/Sub 消息的分隔符。此字符必须为单个字符,并且只能在 INPUT_FORMATtext 时设置。默认为换行符 (\n)。

      使用 gcloud CLI 指定分隔符时,请仔细注意对新行 \n 等特殊字符的处理。使用格式 '\n' 可确保正确解读分隔符。仅使用 \n(不带引号或转义)会导致分隔符为 "n"

    • MINIMUM_OBJECT_CREATE_TIME:指定对象创建后的最短时间,以便进行提取。此值应采用 UTC 格式,格式为 YYYY-MM-DDThh:mm:ssZ。例如 2024-10-14T08:30:30Z

      0001-01-01T00:00:00Z9999-12-31T23:59:59Z(包括这两个数值)之间的任何日期(过去或未来)均有效。

    • MATCH_GLOB:指定要匹配的 glob 模式,以便提取对象。使用 gcloud CLI 时,包含 * 字符的匹配正则表达式必须将 * 字符的格式设置为转义形式(\*\*.txt),或者整个匹配正则表达式必须用引号 "**.txt"'**.txt' 括起来。如需了解 glob 模式支持的语法,请参阅 Cloud Storage 文档