變更主題類型

您可以將匯入主題轉換為標準主題,反之亦然。

將匯入的主題轉換為標準主題

如要將匯入主題轉換為標準主題,請清除擷取設定。請執行下列步驟:

控制台

  1. 前往 Google Cloud 控制台的「Topics」頁面。

    前往「主題」

  2. 按一下匯入主題。

  3. 在主題詳細資料頁面中,按一下「編輯」

  4. 取消勾選「Enable ingestion」(啟用攝入功能) 選項。

  5. 按一下「更新」

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 執行 gcloud pubsub topics update 指令:

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    TOPIC_ID 替換為主題 ID。

將標準主題轉換為 Amazon Kinesis Data Streams 匯入主題

如要將標準主題轉換為 Amazon Kinesis Data Streams 匯入主題,請先確認您符合所有必要條件

控制台

  1. 前往 Google Cloud 控制台的「Topics」頁面。

    前往「主題」

  2. 按一下要轉換為匯入主題的主題。

  3. 在主題詳細資料頁面中,按一下「編輯」

  4. 選取「啟用攝入」選項。

  5. 在擷取來源中,選取「Amazon Kinesis Data Streams」

  6. 輸入下列詳細資訊:

    • Kinesis 串流 ARN:您打算擷取至 Pub/Sub 的 Kinesis 資料串流 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • Kinesis 用戶 ARN:已註冊至 AWS Kinesis Data Stream 的用戶資源 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • AWS 角色 ARN:AWS 角色的 ARN。角色的 ARN 格式如下:arn:aws:iam::${Account}:role/${RoleName}

    • 服務帳戶:您在「在 Google Cloud中建立服務帳戶」一文中建立的服務帳戶。

  7. 按一下「更新」

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 使用下列範例中提到的所有標記,執行 gcloud pubsub topics update 指令:

    gcloud pubsub topics update TOPIC_ID \
         --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
         --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
         --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
         --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
      

    更改下列內容:

    • TOPIC_ID 是主題 ID 或名稱。這個欄位無法更新。

    • KINESIS_STREAM_ARN 是您打算擷取至 Pub/Sub 的 Kinesis Data Streams 的 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • KINESIS_CONSUMER_ARN 是已註冊至 AWS Kinesis Data Streams 的消費者資源 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • KINESIS_ROLE_ARN 是 AWS 角色的 ARN。角色的 ARN 格式如下:arn:aws:iam::${Account}:role/${RoleName}

    • PUBSUB_SERVICE_ACCOUNT 是您在「在 Google Cloud中建立服務帳戶」一節中建立的服務帳戶。

Go

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Go。詳情請參閱 Pub/Sub Go API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func updateTopicType(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	updateCfg := pubsub.TopicConfigToUpdate{
		// If wanting to clear ingestion settings, set this to zero value: &pubsub.IngestionDataSourceSettings{}
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAWSKinesis{
				StreamARN:         streamARN,
				ConsumerARN:       consumerARN,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpServiceAccount,
			},
		},
	}
	topicCfg, err := client.Topic(topicID).Update(ctx, updateCfg)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Java。詳情請參閱 Pub/Sub Java API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Node.js。詳情請參閱 Pub/Sub Node.js API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn,
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Python。詳情請參閱 Pub/Sub Python API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 C++。詳情請參閱 Pub/Sub C++ API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Node.js 環境。 詳情請參閱 Pub/Sub Node.js API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string,
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

如要進一步瞭解 ARN,請參閱「Amazon 資源名稱 (ARN)」和「IAM 識別碼」。

將標準主題轉換為 Cloud Storage 匯入主題

如要將標準主題轉換為 Cloud Storage 匯入主題,請先確認您符合所有必要條件

控制台

  1. 前往 Google Cloud 控制台的「Topics」頁面。

    前往「主題」

  2. 按一下要轉換為 Cloud Storage 匯入主題的主題。

  3. 在主題詳細資料頁面中,按一下「編輯」

  4. 選取「啟用攝入」選項。

  5. 在「擷取來源」中,選取「Google Cloud Storage」

  6. 在 Cloud Storage bucket 中,按一下「Browse」

    「Select bucket」(選取值區) 頁面隨即開啟。選取下列選項之一:

    • 從任何適當的專案中選取現有值區。

    • 按一下建立圖示,然後按照畫面上的指示建立新的值區。建立值區後,請選取 Cloud Storage 匯入主題的值區。

  7. 指定值區時,Pub/Sub 會檢查 Pub/Sub 服務帳戶的值區是否具備適當權限。如果發生權限問題,您會看到與權限相關的錯誤訊息。

    如果發生權限問題,請按一下「設定權限」。詳情請參閱「 授予 Pub/Sub 服務帳戶 Cloud Storage 權限」。

  8. 在「Object format」(物件格式) 中,選取「Text」、「Avro」或「Pub/Sub Avro」

    如果您選取「文字」,可以選擇指定分隔符,用來將物件分割成訊息。

    如要進一步瞭解這些選項,請參閱「輸入格式」。

  9. (非必要) 您可以為主題指定物件建立時間下限。如果已設定,系統只會擷取在物件建立時間下限後建立的物件。

    詳情請參閱「 物件建立時間下限」。

  10. 您必須指定 Glob 模式。如要擷取 bucket 中的所有物件,請使用 ** 做為 glob 模式。系統只會擷取符合指定模式的物件。

    詳情請參閱「 比對 glob 模式」。

  11. 保留其他預設設定。
  12. 按一下「更新主題」

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 為避免遺失匯入主題的設定,請務必每次更新主題時都納入所有設定。如果您略過某些項目,Pub/Sub 會將設定重設為原始預設值。

    使用下列範例中提到的所有標記,執行 gcloud pubsub topics update 指令:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    更改下列內容:

    • TOPIC_ID 是主題 ID 或名稱。這個欄位無法更新。

    • BUCKET_NAME:指定現有值區的名稱。例如:prod_bucket。值區名稱不得包含專案 ID。如要建立值區,請參閱「建立值區」。

    • INPUT_FORMAT:指定攝入物件的格式。這可以是 textavropubsub_avro。如要進一步瞭解這些選項,請參閱「輸入格式」。

    • TEXT_DELIMITER:指定用於將文字物件拆分為 Pub/Sub 訊息的分隔符。此值必須是單一字元,且只能在 INPUT_FORMATtext 時設定。預設為換行字元 (\n)。

      使用 gcloud CLI 指定分隔符時,請特別留意如何處理特殊字元,例如換行符號 \n。請使用 '\n' 格式,確保系統能正確解讀分隔符。只要使用 \n 而不加上引號或轉義,就會產生 "n" 分隔符。

    • MINIMUM_OBJECT_CREATE_TIME:指定物件建立後,可供攝入的最低時間。格式應為 YYYY-MM-DDThh:mm:ssZ,以世界標準時間為準。例如:2024-10-14T08:30:30Z

      0001-01-01T00:00:00Z9999-12-31T23:59:59Z 之間的任何日期 (包括兩者) 皆有效。

    • MATCH_GLOB:指定要比對的 glob 模式,以便擷取物件。使用 gcloud CLI 時,如果比對萬用字元含有 * 字元,則 * 字元必須以 \*\*.txt 的形式加上反斜線轉義,或是整個比對萬用字元必須以 "**.txt"'**.txt' 加上引號。如要瞭解支援的 glob 模式語法,請參閱 Cloud Storage 說明文件