建立 Amazon Kinesis Data Streams 匯入主題

您可以使用 Amazon Kinesis Data Streams 匯入主題,持續從 Amazon Kinesis Data Streams 擷取資料,並將資料匯入 Pub/Sub。接著,您可以將資料串流至 Pub/Sub 支援的任何目的地。

如要進一步瞭解匯入主題,請參閱「關於匯入主題」。

事前準備

必要角色和權限

如要取得建立及管理 Amazon Kinesis Data Streams 匯入主題所需的權限,請要求管理員為您授予主題或專案的 Pub/Sub 編輯者 (roles/pubsub.editor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

這個預先定義的角色包含建立及管理 Amazon Kinesis Data Streams 匯入主題所需的權限。如要查看確切的必要權限,請展開「必要權限」部分:

所需權限

如要建立及管理 Amazon Kinesis Data Streams 匯入主題,您必須具備下列權限:

  • 建立匯入主題: pubsub.topics.create
  • 刪除匯入主題: pubsub.topics.delete
  • 取得匯入主題: pubsub.topics.get
  • 列出匯入主題: pubsub.topics.list
  • 發布至匯入主題: pubsub.topics.publish
  • 更新匯入主題: pubsub.topics.update
  • 取得匯入主題的身分與存取權管理政策: pubsub.topics.getIamPolicy
  • 為匯入主題設定 IAM 政策 pubsub.topics.setIamPolicy

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

您可以在專案層級和個別資源層級設定存取權控管。

設定聯合身分,以便存取 Amazon Kinesis Data Streams

Workload Identity 聯盟可讓 Google Cloud 服務存取在 Google Cloud以外執行的工作負載。有了身分聯盟,您就不必維護或傳遞憑證,就能 Google Cloud 存取其他雲端中的資源。您可以改用工作負載本身的識別資訊,驗證 Google Cloud 並存取資源。

在 Google Cloud中建立服務帳戶

這是選用步驟。如果您已有服務帳戶,可以在這項程序中使用該帳戶,而不需要建立新的服務帳戶。如果您使用現有的服務帳戶,請參閱記錄服務帳戶的唯一 ID,瞭解下一個步驟。

對於 Amazon Kinesis Data Streams 匯入主題,Pub/Sub 會使用服務帳戶做為身分,以便存取 AWS 的資源。

如要進一步瞭解如何建立服務帳戶,包括必要條件、必要角色和權限,以及命名規範,請參閱「建立服務帳戶」一文。建立服務帳戶後,您可能需要等待 60 秒或更長的時間,才能使用服務帳戶。這種行為的發生,是因為讀取作業最終會一致;新服務帳戶可能需要一段時間才會顯示。

記下服務帳戶專屬 ID

您需要服務帳戶專屬 ID,才能在 AWS 中設定角色。

  1. 前往 Google Cloud 控制台的「Service account」詳細資料頁面。

    前往服務帳戶

  2. 按一下您剛建立的服務帳戶,或您打算使用的服務帳戶。

  3. 在「服務帳戶詳細資料」頁面中記下專屬 ID 編號。

    您需要這組 ID 才能在工作流程中設定 AWS 中的角色

為 Pub/Sub 服務帳戶新增服務帳戶權杖建立者角色

服務帳戶權杖建立者角色 (roles/iam.serviceAccountTokenCreator) 可讓實體為服務帳戶建立短期憑證。這些權杖或憑證用於模擬服務帳戶。

如要進一步瞭解服務帳戶模擬功能,請參閱「服務帳戶模擬功能」。

您也可以在這個程序中新增 Pub/Sub 發布者角色 (roles/pubsub.publisher)。如要進一步瞭解這個角色以及為何要新增這個角色,請參閱「將 Pub/Sub 發布者角色新增至 Pub/Sub 服務帳戶」一文。

  1. 前往 Google Cloud 控制台的「IAM」頁面。

    前往身分與存取權管理頁面

  2. 按一下「包含 Google提供的角色授予項目」核取方塊。

  3. 找出格式為 service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 的服務帳戶。

  4. 針對這個服務帳戶,按一下「Edit Principal」(編輯主體) 按鈕。

  5. 如有需要,請按一下「新增其他角色」

  6. 搜尋並點選「服務帳戶憑證建立者角色」圖示 (roles/iam.serviceAccountTokenCreator)。

  7. 按一下 [儲存]

在 AWS 中建立政策

您需要在 AWS 中建立政策,讓 Pub/Sub 向 AWS 進行驗證,以便 Pub/Sub 擷取 Amazon Kinesis Data Streams 中的資料。

  • 如要進一步瞭解如何在 AWS 中建立政策,請參閱「建立 IAM 政策」一文。

如要在 AWS 中建立政策,請執行下列步驟:

  1. 登入 AWS 管理主控台並開啟 IAM 主控台

  2. IAM 控制台的導覽窗格中,依序點選「Access Management」(存取權管理) >「Policies」(政策)

  3. 按一下「建立政策」

  4. 在「選取服務」部分,按一下「Kinesis」

  5. 在「允許的動作」部分,按一下下列項目:

    • 依序前往「List」 >「List Shards」ListShards

      這項動作會授予權限,可列出串流中的分片,並提供每個分片的相關資訊。

    • Read > SubscribeToShard

      這項動作會授予權限,可透過增強型分散方式監聽特定區塊。

    • 依序點選「Read」 >「DescribeStreamConsumer」DescribeStreamConsumer

      這項動作會授予取得已註冊串流消費者說明的權限。

    這些權限涵蓋從串流讀取的權限。Pub/Sub 僅支援使用 Streaming SubscribeToShard API 時,透過擴充 Fan-Out 功能讀取 Kinesis 串流。

  6. 如要將政策限制在特定串流或消費者 (建議),請指定資源消費者 ARN串流 ARN

  7. 按一下「新增更多權限」

  8. 在「選取服務」部分,按一下「STS」

  9. 針對「允許的動作」,依序點選「寫入」 >「AssumeRoleWithWebIdentity」AssumeRoleWithWebIdentity

    這個動作會授予權限,讓 Pub/Sub 取得一組臨時安全性憑證,以便透過身分識別聯合機制,向 Amazon Kinesis Data Streams 進行驗證。

  10. 點選「下一步」

  11. 輸入政策名稱和說明。

  12. 按一下「建立政策」

使用自訂信任政策在 AWS 中建立角色

您必須在 AWS 中建立角色,讓 Pub/Sub 能夠驗證 AWS 並擷取 Amazon Kinesis Data Streams 中的資料。

  1. 登入 AWS 管理主控台並開啟 IAM 主控台

  2. IAM 控制台的導覽窗格中,按一下「角色」

  3. 按一下「建立角色」

  4. 在「選取信任的實體」中,按一下「自訂信任政策」

  5. 在「自訂信任政策」部分輸入或貼上下列內容:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
         "Effect": "Allow",
         "Principal": {
            "Federated": "accounts.google.com"
         },
         "Action": "sts:AssumeRoleWithWebIdentity",
         "Condition": {
             "StringEquals": {
               "accounts.google.com:sub": "<SERVICE_ACCOUNT_UNIQUE_ID>"
             }
          }
        }
      ]
    }
    

    <SERVICE_ACCOUNT_UNIQUE_ID> 替換為您在「記錄服務帳戶專屬 ID」一節中記錄的服務帳戶專屬 ID。

  6. 點選「下一步」

  7. 針對「新增權限」,搜尋並按一下剛建立的自訂政策。

  8. 點選「下一步」

  9. 輸入角色名稱和說明。

  10. 按一下「建立角色」

將 Pub/Sub 發布者角色新增至 Pub/Sub 授權對象

如要啟用發布功能,您必須為 Pub/Sub 服務帳戶指派發布者角色,讓 Pub/Sub 能夠發布至 Amazon Kinesis Data Streams 匯入主題。

啟用從所有主題發布內容的權限

如果您尚未建立任何 Amazon Kinesis Data Streams 匯入主題,請使用這個方法。

  1. 前往 Google Cloud 控制台的「IAM」頁面。

    前往身分與存取權管理頁面

  2. 按一下「包含 Google提供的角色授予項目」核取方塊。

  3. 找出格式為 service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com 的服務帳戶。

  4. 針對這個服務帳戶,按一下「Edit Principal」(編輯主體) 按鈕。

  5. 如有需要,請按一下「新增其他角色」

  6. 搜尋並點選「Pub/Sub 發布者角色」 (roles/pubsub.publisher)。

  7. 按一下 [儲存]

啟用單一主題的發布功能

只有在 Amazon Kinesis Data Streams 匯入主題已存在時,才使用這個方法。

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 執行 gcloud pubsub topics add-iam-policy-binding 指令:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID \
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" \
       --role="roles/pubsub.publisher"

    更改下列內容:

    • TOPIC_ID:Amazon Kinesis Data Streams 匯入主題的主題 ID。

    • PROJECT_NUMBER:專案編號。如要查看專案編號,請參閱「識別專案」。

將服務帳戶使用者角色新增至服務帳戶

「服務帳戶使用者」角色 (roles/iam.serviceAccountUser) 包含 iam.serviceAccounts.actAs 權限,可讓授權者將服務帳戶連結至 Amazon Kinesis Data Streams 匯入主題的擷取設定,並使用該服務帳戶建立聯合身分。

  1. 前往 Google Cloud 控制台的「IAM」頁面。

    前往身分與存取權管理頁面

  2. 針對發出建立或更新主題呼叫的主體,按一下「Edit Principal」按鈕。

  3. 如有需要,請按一下「新增其他角色」

  4. 搜尋並點選「服務帳戶使用者角色」(roles/iam.serviceAccountUser)。

  5. 按一下 [儲存]

使用 Amazon Kinesis Data Streams 主題

您可以建立新的匯入主題,或編輯現有主題。

注意事項

  • 即使快速依序建立主題和訂閱項目,也可能導致資料遺失。主題會在短時間內存在,但不會有訂閱項目。如果在這段時間內有任何資料傳送至主題,資料就會遺失。請先建立主題、建立訂閱項目,然後再將主題轉換為匯入主題,這樣就能確保匯入過程中不會遺漏任何訊息。

建立 Amazon Kinesis Data Streams 匯入主題

如要進一步瞭解與主題相關聯的屬性,請參閱「主題的屬性」。

請確認你已完成下列程序:

如要建立 Amazon Kinesis Data Streams 匯入主題,請按照下列步驟操作:

控制台

  1. 前往 Google Cloud 控制台的「Topics」頁面。

    前往「主題」

  2. 按一下「建立主題」

  3. 在「Topic ID」欄位中,輸入 Amazon Kinesis Data Streams 匯入主題的 ID。

    如要進一步瞭解主題命名方式,請參閱命名規範

  4. 選取「Add a default subscription」

  5. 選取「啟用擷取」

  6. 在擷取來源中,選取「Amazon Kinesis Data Streams」

  7. 輸入下列詳細資訊:

    • Kinesis 串流 ARN:您打算擷取至 Pub/Sub 的 Kinesis 資料串流 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • Kinesis 用戶 ARN:已註冊至 AWS Kinesis Data Stream 的用戶資源 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimestamp}

    • AWS 角色 ARN:AWS 角色的 ARN。角色的 ARN 格式如下:arn:aws:iam::${Account}:role/${RoleName}

    • 服務帳戶:您在「在 Google Cloud中建立服務帳戶」一文中建立的服務帳戶。

  8. 按一下「建立主題」

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 執行 gcloud pubsub topics create 指令:

    gcloud pubsub topics create TOPIC_ID \
           --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
           --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
           --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
           --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    更改下列內容:

    • TOPIC_ID 是主題 ID。

    • KINESIS_STREAM_ARN 是您打算擷取至 Pub/Sub 的 Kinesis Data Streams 的 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • KINESIS_CONSUMER_ARN 是已註冊至 AWS Kinesis Data Streams 的消費者資源 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimestamp}

    • KINESIS_ROLE_ARN 是 AWS 角色的 ARN。角色的 ARN 格式如下:arn:aws:iam::${Account}:role/${RoleName}

    • PUBSUB_SERVICE_ACCOUNT 是您在「在 Google Cloud 中建立服務帳戶」一文中建立的服務帳戶。

Go

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Go。詳情請參閱 Pub/Sub Go API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createTopicWithKinesisIngestion(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAWSKinesis{
				StreamARN:         streamARN,
				ConsumerARN:       consumerARN,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpServiceAccount,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Kinesis topic created: %v\n", t)
	return nil
}

Java

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Java。詳情請參閱 Pub/Sub Java API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithKinesisIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    createTopicWithKinesisIngestionExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void createTopicWithKinesisIngestionExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println("Created topic with Kinesis ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Node.js。詳情請參閱 Pub/Sub Node.js API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn,
) {
  // Creates a new topic with Kinesis ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

Python

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Python。詳情請參閱 Pub/Sub Python API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
            stream_arn=stream_arn,
            consumer_arn=consumer_arn,
            aws_role_arn=aws_role_arn,
            gcp_service_account=gcp_service_account,
        )
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 C++。詳情請參閱 Pub/Sub C++ API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis =
      request.mutable_ingestion_data_source_settings()->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

在嘗試這個範例之前,請先按照 Pub/Sub 快速入門:使用用戶端程式庫中的操作說明設定 Node.js 環境。 詳情請參閱 Pub/Sub Node.js API 參考說明文件

如要向 Pub/Sub 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string,
) {
  // Creates a new topic with Kinesis ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

如要進一步瞭解 ARN,請參閱「Amazon 資源名稱 (ARN)」和「IAM 識別碼」。

如果發生問題,請參閱「排解 Amazon Kinesis Data Streams 匯入問題」一文。

編輯 Amazon Kinesis Data Streams 匯入主題

您可以編輯 Amazon Kinesis Data Streams 匯入主題的擷取資料來源設定。請執行下列步驟:

控制台

  1. 前往 Google Cloud 控制台的「Topics」頁面。

    前往「主題」

  2. 按一下 Amazon Kinesis Data Streams 匯入主題。

  3. 在主題詳細資料頁面中,按一下「編輯」

  4. 更新要變更的欄位。

  5. 按一下「更新」

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 為避免遺失匯入主題的設定,請務必每次更新主題時都納入所有設定。如果您略過某些項目,Pub/Sub 會將設定重設為原始預設值。

    使用下列範例中提到的所有標記,執行 gcloud pubsub topics update 指令:

      gcloud pubsub topics update TOPIC_ID \
             --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
             --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
             --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
             --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    更改下列內容:

    • TOPIC_ID 是主題 ID。這個欄位無法更新。

    • KINESIS_STREAM_ARN 是您打算擷取至 Pub/Sub 的 Kinesis Data Streams 的 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • KINESIS_CONSUMER_ARN 是已註冊至 AWS Kinesis Data Streams 的消費者資源 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimestamp}

    • KINESIS_ROLE_ARN 是 AWS 角色的 ARN。角色的 ARN 格式如下:arn:aws:iam::${Account}:role/${RoleName}

    • PUBSUB_SERVICE_ACCOUNT 是您在「在 Google Cloud中建立服務帳戶」一節中建立的服務帳戶。

Amazon Kinesis Data Streams 匯入主題的配額和限制

匯入主題的發布者傳送量受限於主題的發布配額。詳情請參閱「Pub/Sub 配額和限制」。

後續步驟

Apache Kafka® 是 Apache 軟體基金會 (Apache Software Foundation) 或其關聯企業在美國與/或其他國家/地區的註冊商標。