Amazon Kinesis Data Streams-Importthema erstellen

Mit einem Amazon Kinesis Data Streams-Importthema können Sie Daten kontinuierlich aus Amazon Kinesis Data Streams als externe Quelle in Pub/Sub aufnehmen. Anschließend können Sie die Daten an eines der Ziele streamen, die von Pub/Sub unterstützt werden.

In diesem Dokument erfahren Sie, wie Sie Importthemen für Amazon Kinesis Data Streams erstellen und verwalten. Informationen zum Erstellen eines Standardthemas finden Sie unter Standardthema erstellen.

Hinweise

Erforderliche Rollen und Berechtigungen zum Verwalten von Importthemen

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle „Pub/Sub Editor“ (roles/pubsub.editor) für Ihr Thema oder Projekt zu gewähren, um die Berechtigungen zum Erstellen und Verwalten von Amazon Kinesis Data Streams-Importthemen zu erhalten. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Erstellen und Verwalten von Amazon Kinesis Data Streams-Importthemen erforderlich sind. Erweitern Sie den Abschnitt Erforderliche Berechtigungen, um die erforderlichen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind erforderlich, um Amazon Kinesis Data Streams-Importthemen zu erstellen und zu verwalten:

  • So erstellen Sie ein Importthema: pubsub.topics.create
  • Importthema löschen: pubsub.topics.delete
  • Importthema abrufen: pubsub.topics.get
  • Importthema angeben: pubsub.topics.list
  • In einem importierten Thema veröffentlichen: pubsub.topics.publish
  • Importthema aktualisieren: pubsub.topics.update
  • IAM-Richtlinie für ein Importthema abrufen: pubsub.topics.getIamPolicy
  • Konfigurieren Sie die IAM-Richtlinie für ein Importthema: pubsub.topics.setIamPolicy

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Sie können die Zugriffssteuerung auf Projektebene und auf der Ebene einzelner Ressourcen konfigurieren.

Föderierte Identität für den Zugriff auf Kinesis Data Streams einrichten

Mit der Workload Identity-Föderation können Google Cloud-Dienste auf Arbeitslasten zugreifen, die außerhalb von Google Cloud ausgeführt werden. Mit der Identitätsfederation müssen Sie keine Anmeldedaten für Google Cloud verwalten oder an Google Cloud weitergeben, um auf Ihre Ressourcen in anderen Clouds zuzugreifen. Stattdessen können Sie die Identitäten der Arbeitslasten selbst verwenden, um sich bei Google Cloud zu authentifizieren und auf Ressourcen zuzugreifen.

Dienstkonto in Google Cloud erstellen

Dieser Schritt ist optional. Wenn Sie bereits ein Dienstkonto haben, können Sie es in diesem Verfahren verwenden, anstatt ein neues Dienstkonto zu erstellen. Wenn Sie ein vorhandenes Dienstkonto verwenden, fahren Sie mit Eindeutige ID des Dienstkontos erfassen fort.

Bei einem Amazon Kinesis Data Streams-Importthema verwendet Pub/Sub das Dienstkonto als Identität für den Zugriff auf Ressourcen von AWS.

Weitere Informationen zum Erstellen eines Dienstkontos, einschließlich Voraussetzungen, erforderlicher Rollen und Berechtigungen sowie Richtlinien für die Benennung, finden Sie unter Dienstkonten erstellen. Nachdem Sie ein Dienstkonto erstellt haben, müssen Sie möglicherweise 60 Sekunden oder länger warten, bis Sie das Dienstkonto verwenden können. Dieses Verhalten tritt auf, weil Lesevorgänge Eventual Consistency haben. Es kann einige Zeit dauern, bis das neue Dienstkonto sichtbar ist.

Notieren Sie sich die eindeutige ID des Dienstkontos.

Sie benötigen eine eindeutige ID für das Dienstkonto, um eine Rolle in der AWS-Konsole einzurichten.

  1. Rufen Sie in der Google Cloud Console die Seite Dienstkonto auf.

    Weiter zu Dienstkonto

  2. Klicken Sie auf das Dienstkonto, das Sie gerade erstellt haben oder das Sie verwenden möchten.

  3. Notieren Sie sich auf der Seite Dienstkontodetails die eindeutige ID.

    Sie benötigen die ID im Abschnitt Rolle in AWS mit benutzerdefinierter Vertrauensrichtlinie erstellen.

Dem Pub/Sub-Dienstkonto die Rolle „Ersteller von Dienstkonto-Tokens“ hinzufügen

Mit der Rolle Ersteller von Dienstkonto-Tokens (roles/iam.serviceAccountTokenCreator) können Hauptkonten kurzlebige Anmeldedaten für ein Dienstkonto erstellen. Diese Tokens oder Anmeldedaten werden verwendet, um das Dienstkonto zu imitieren.

Weitere Informationen zu Identitätswechsel für Dienstkonten.

Sie können während dieses Vorgangs auch die Rolle „Pub/Sub-Publisher“ (roles/pubsub.publisher) hinzufügen. Weitere Informationen zu dieser Rolle und warum Sie sie hinzufügen, finden Sie unter Dem Pub/Sub-Dienstkonto die Rolle „Pub/Sub-Publisher“ hinzufügen.

  1. Öffnen Sie in der Google Cloud Console die Seite IAM.

    IAM aufrufen

  2. Aktivieren Sie die Option Von Google bereitgestellte Rollenzuweisungen einschließen.

  3. Suchen Sie nach dem Dienstkonto im Format service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. Klicken Sie für dieses Dienstkonto auf die Schaltfläche Hauptkonto bearbeiten.

  5. Klicken Sie bei Bedarf auf Weitere Rolle hinzufügen.

  6. Suchen und wählen Sie die Rolle „Ersteller von Dienstkonto-Tokens“ (roles/iam.serviceAccountTokenCreator) aus.

  7. Klicken Sie auf Speichern.

Richtlinie in AWS erstellen

Sie benötigen eine Richtlinie in AWS, damit Pub/Sub sich bei AWS authentifizieren kann, damit Pub/Sub Daten aus einem AWS Kinesis-Datenstream aufnehmen kann. Bevor Sie eine AWS-Richtlinie erstellen, müssen Sie einen Kinesis-Datenstream und einen registrierten Verbraucher dafür erstellen. Wir empfehlen diese Vorgehensweise, damit Sie die Berechtigungen auf den jeweiligen Stream beschränken können.

  • Weitere Informationen zum Erstellen eines AWS Kinesis-Datenstreams finden Sie unter Kinesis-Datenstream.

  • Weitere Informationen zur AWS Kinesis Data Stream API, mit der Verbraucher registriert werden, finden Sie unter RegisterStreamConsumer.

  • Weitere Methoden und Informationen zum Erstellen einer Richtlinie in AWS finden Sie unter IAM-Richtlinien erstellen.

So erstellen Sie eine Richtlinie in AWS:

  1. Melden Sie sich in der AWS Management Console an und öffnen Sie die IAM-Konsole.

  2. Klicken Sie im Navigationsbereich der Console unter IAM auf Zugriffsverwaltung > Richtlinien.

  3. Klicken Sie auf Richtlinie erstellen.

  4. Wählen Sie unter Dienst auswählen die Option Kinesis aus.

  5. Wählen Sie für Zulässige Aktion Folgendes aus:

    • List > ListShards.

      Mit dieser Aktion wird die Berechtigung zum Auflisten der Shards in einem Stream gewährt und es werden Informationen zu jedem Shard bereitgestellt.

    • Lesen > SubscribeToShard

      Mit dieser Aktion wird die Berechtigung zum Abhören eines bestimmten Shards mit erweitertem Fan-out gewährt.

    • Lesen > DescribeStreamConsumer

      Diese Aktion gewährt die Berechtigung, die Beschreibung eines registrierten Stream-Empfängers abzurufen.

    Diese Berechtigungen umfassen das Lesen aus dem Stream. Pub/Sub unterstützt nur das Lesen aus einem Kinesis-Stream mit erweitertem Fan-Out, und zwar mithilfe der Streaming SubscribeToShard API.

  6. Wenn du die Richtlinie auf einen bestimmten Stream oder Nutzer beschränken möchtest (empfohlen), gib unter Ressourcen den ARN des Nutzers und den ARN des Streams an.

  7. Klicken Sie auf Weitere Berechtigungen hinzufügen.

  8. Geben Sie unter Dienst auswählen STS ein und wählen Sie diese Option aus.

  9. Wählen Sie für Zulässige Aktion die Option Schreiben > AssumeRoleWithWebIdentity aus.

    Mit dieser Aktion wird die Berechtigung zum Abrufen einer Reihe temporärer Sicherheitsanmeldedaten für Pub/Sub erteilt, damit sich die Anwendung mithilfe der Identitätsfederation beim Kinesis-Datenstream authentifizieren kann.

  10. Klicken Sie auf Weiter.

  11. Geben Sie einen Namen und eine Beschreibung für die Richtlinie ein.

  12. Klicken Sie auf Richtlinie erstellen.

Rolle in AWS mit benutzerdefinierter Trust-Richtlinie erstellen

Sie müssen eine Rolle in AWS erstellen, damit Pub/Sub sich bei AWS authentifizieren kann, um Daten aus Kinesis Data Streams aufzunehmen.

So erstellen Sie eine Rolle mit einer benutzerdefinierten Vertrauensrichtlinie:

  1. Melden Sie sich in der AWS Management Console an und öffnen Sie die IAM-Konsole.

  2. Klicken Sie im Navigationsbereich der Console für IAM auf Rollen.

  3. Klicken Sie auf Rolle erstellen.

  4. Wählen Sie unter Vertrauenswürdige Entität auswählen die Option Benutzerdefinierte Vertrauensrichtlinie aus.

  5. Geben Sie im Abschnitt Benutzerdefinierte Vertrauensrichtlinie Folgendes ein oder fügen Sie es ein:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
         "Effect": "Allow",
         "Principal": {
            "Federated": "accounts.google.com"
         },
         "Action": "sts:AssumeRoleWithWebIdentity",
         "Condition": {
             "StringEquals": {
               "accounts.google.com:sub": "<SERVICE_ACCOUNT_UNIQUE_ID>"
             }
          }
        }
      ]
    }
    

    Ersetzen Sie <SERVICE_ACCOUNT_UNIQUE_ID> durch die eindeutige ID des Dienstkontos, die Sie unter Eindeutige ID des Dienstkontos erfassen erfasst haben.

  6. Klicken Sie auf Weiter.

  7. Suchen Sie unter Berechtigungen hinzufügen nach der benutzerdefinierten Richtlinie, die Sie gerade erstellt haben, und wählen Sie sie aus.

  8. Klicken Sie auf Weiter.

  9. Geben Sie einen Rollennamen und eine Beschreibung ein.

  10. Klicken Sie auf Rolle erstellen.

Dem Pub/Sub-Dienstkonto die Rolle „Pub/Sub-Publisher“ hinzufügen

Sie müssen dem Pub/Sub-Dienstkonto eine Publisher-Rolle zuweisen, damit Pub/Sub Nachrichten aus AWS Kinesis Data Streams in das Importthema veröffentlichen kann.

  • Informationen zum Aktivieren der Veröffentlichung aus allen Themen in einem Projekt finden Sie unter Veröffentlichung aus allen Themen aktivieren. Verwenden Sie diese Methode, wenn Sie noch keine AWS Kinesis Data Streams-Importthemen erstellt haben.

  • Informationen zum Aktivieren der Veröffentlichung aus einem bestimmten Thema (empfohlen) finden Sie unter Veröffentlichung aus einem einzelnen Thema aktivieren. Verwenden Sie diese Methode nur, wenn das AWS Kinesis Data Streams-Importthema bereits vorhanden ist.

Veröffentlichung aus allen Themenbereichen aktivieren

Verwenden Sie diese Methode, wenn Sie noch keine AWS Kinesis Data Streams-Importthemen erstellt haben.

  1. Öffnen Sie in der Google Cloud Console die Seite IAM.

    IAM aufrufen

  2. Aktivieren Sie die Option Von Google bereitgestellte Rollenzuweisungen einschließen.

  3. Suchen Sie nach dem Dienstkonto im Format service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. Klicken Sie für dieses Dienstkonto auf die Schaltfläche Hauptkonto bearbeiten.

  5. Klicken Sie bei Bedarf auf Weitere Rolle hinzufügen.

  6. Suchen Sie nach der Pub/Sub-Publisher-Rolle (roles/pubsub.publisher) und wählen Sie sie aus.

  7. Klicken Sie auf Speichern.

Veröffentlichungen aus einem einzelnen Thema aktivieren

Verwenden Sie diese Methode nur, wenn das AWS Kinesis Data Streams-Importthema bereits vorhanden ist.

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Führen Sie den Befehl gcloud pubsub topics add-iam-policy-binding aus:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    Ersetzen Sie Folgendes:

    • TOPIC_ID ist die Topic-ID des Amazon Kinesis Data Streams-Import-Topics.

    • ist PROJECT_NUMBER die Projektnummer. Informationen zum Ermitteln der Projektnummer finden Sie unter Projekte identifizieren.

Dienstkonto die Rolle „Dienstkontonutzer“ hinzufügen

Die Rolle „Dienstkontonutzer“ (roles/iam.serviceAccountUser) umfasst die Berechtigung iam.serviceAccounts.actAs, mit der ein Hauptkonto ein Dienstkonto an die Aufnahmeeinstellungen des Amazon Kinesis Data Streams-Import-Themas anhängen und dieses Dienstkonto für die föderierte Identität verwenden kann.

Führen Sie diese Schritte aus:

  1. Öffnen Sie in der Google Cloud Console die Seite IAM.

    IAM aufrufen

  2. Klicken Sie für das Hauptkonto, das die Aufrufe zum Erstellen oder Aktualisieren von Themen ausführt, auf die Schaltfläche Hauptkonto bearbeiten.

  3. Klicken Sie bei Bedarf auf Weitere Rolle hinzufügen.

  4. Suchen Sie nach der Rolle Dienstkontonutzer (roles/iam.serviceAccountUser) und wählen Sie sie aus.

  5. Klicken Sie auf Speichern.

Amazon Kinesis Data Streams-Importthema erstellen

Weitere Informationen zu den mit einem Thema verknüpften Properties finden Sie unter Properties eines Themas.

Führen Sie die folgenden Schritte aus:

Wenn Sie das Thema und das Abo separat erstellen, kann das zu Datenverlusten führen, auch wenn Sie das in schneller Folge tun. Es gibt eine kurze Zeitspanne, in der das Thema ohne Abo verfügbar ist. Daten, die während dieser Zeit an das Thema gesendet werden, gehen verloren. Wenn Sie zuerst das Thema und dann das Abo erstellen und das Thema dann in ein Importthema umwandeln, wird sichergestellt, dass beim Import keine Nachrichten übersehen werden.

So erstellen Sie ein Amazon Kinesis Data Streams-Importthema:

Console

  1. Rufen Sie in der Google Cloud Console die Seite Themen auf.

    Themen aufrufen

  2. Klicken Sie auf Thema erstellen.

  3. Geben Sie im Feld Themen-ID eine ID für das Amazon Kinesis Data Streams-Importthema ein.

    Weitere Informationen zur Benennung von Themen finden Sie in den Benennungsrichtlinien.

  4. Wählen Sie Standardabo hinzufügen aus.

  5. Wählen Sie Aufnahme aktivieren aus.

  6. Wählen Sie als Datenaufnahmequelle Amazon Kinesis Data Streams aus.

  7. Geben Sie die folgenden Informationen ein:

    • ARN des Kinesis-Streams: Der ARN des Kinesis-Datenstreams, den Sie in Pub/Sub aufnehmen möchten. Das ARN-Format hat folgendes Format: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • ARN des Kinesis-Nutzers: Der ARN der Nutzerressource, die für den AWS Kinesis-Datenstream registriert ist. Das ARN-Format hat folgendes Format: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • ARN der AWS-Rolle: Der ARN der AWS-Rolle. Die ARN der Rolle hat folgendes Format: arn:aws:iam:${Account}:role/${RoleName}.

    • Dienstkonto: Das Dienstkonto, das Sie unter Dienstkonto in Google Cloud erstellen erstellt haben.

  8. Klicken Sie auf Thema erstellen.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Führen Sie den Befehl gcloud pubsub topics create aus:

    gcloud pubsub topics create TOPIC_ID \
           --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
           --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
           --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
           --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    Ersetzen Sie Folgendes:

    • TOPIC_ID ist die Themen-ID.

    • KINESIS_STREAM_ARN ist die ARN für die Kinesis Data Streams, die Sie in Pub/Sub aufnehmen möchten. Die ARN hat folgendes Format: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN ist der ARN der Nutzerressource, die bei AWS Kinesis Data Streams registriert ist. Das ARN-Format hat folgendes Format: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN ist die ARN der AWS-Rolle. Die ARN der Rolle hat folgendes Format: arn:aws:iam:${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT ist das Dienstkonto, das Sie unter Dienstkonto in Google Cloud erstellen erstellt haben.

Go

Folgen Sie der Einrichtungsanleitung für Go in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Go API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createTopicWithKinesisIngestion(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAWSKinesis{
				StreamARN:         streamARN,
				ConsumerARN:       consumerARN,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpServiceAccount,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Kinesis topic created: %v\n", t)
	return nil
}

Java

Folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithKinesisIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    createTopicWithKinesisIngestionExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void createTopicWithKinesisIngestionExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println("Created topic with Kinesis ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

Folgen Sie der Einrichtungsanleitung für Node.js in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn
) {
  // Creates a new topic with Kinesis ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

Python

Folgen Sie der Einrichtungsanleitung für Python in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Python API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
            stream_arn=stream_arn,
            consumer_arn=consumer_arn,
            aws_role_arn=aws_role_arn,
            gcp_service_account=gcp_service_account,
        )
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

Folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis =
      request.mutable_ingestion_data_source_settings()->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Lesen Sie unter Pub/Sub-Kurzanleitung: Clientbibliotheken verwenden die Anleitung für die Einrichtung von Node.js, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const roleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithKinesisIngestion(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string
) {
  // Creates a new topic with Kinesis ingestion.
  await pubSubClient.createTopic({
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  });
  console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
}

Weitere Informationen zu ARNs finden Sie unter Amazon-Ressourcennamen (ARNs) und IAM-IDs.

Wenn Probleme auftreten, lesen Sie den Hilfeartikel Probleme beim Importieren in Amazon Kinesis Data Streams beheben.

Amazon Kinesis Data Streams-Importthema bearbeiten

Sie können die Einstellungen für die Datenaufnahmequelle eines Amazon Kinesis Data Streams-Importthemas bearbeiten. Führen Sie diese Schritte aus:

Console

  1. Rufen Sie in der Google Cloud Console die Seite Themen auf.

    Themen aufrufen

  2. Klicken Sie auf das Thema „Importieren in Amazon Kinesis Data Streams“.

  3. Klicken Sie auf der Seite mit den Themendetails auf Bearbeiten.

  4. Aktualisieren Sie die Felder, die Sie ändern möchten.

  5. Klicken Sie auf Aktualisieren.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Damit die Einstellungen für das importierte Thema nicht verloren gehen, sollten Sie sie jedes Mal angeben, wenn Sie das Thema aktualisieren. Wenn Sie etwas auslassen, wird die Einstellung von Pub/Sub auf den ursprünglichen Standardwert zurückgesetzt.

    Führen Sie den Befehl gcloud pubsub topics update mit allen im folgenden Beispiel genannten Flags aus:

      gcloud pubsub topics update TOPIC_ID \
             --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
             --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
             --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
             --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    Ersetzen Sie Folgendes:

    • TOPIC_ID ist die Themen-ID. Dieses Feld kann nicht aktualisiert werden.

    • KINESIS_STREAM_ARN ist die ARN für die Kinesis Data Streams, die Sie in Pub/Sub aufnehmen möchten. Die ARN hat folgendes Format: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN ist der ARN der Nutzerressource, die bei AWS Kinesis Data Streams registriert ist. Das ARN-Format hat folgendes Format: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN ist die ARN der AWS-Rolle. Die ARN der Rolle hat folgendes Format: arn:aws:iam:${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT ist das Dienstkonto, das Sie unter Dienstkonto in Google Cloud erstellen erstellt haben.

Kontingente und Limits für Amazon Kinesis Data Streams-Importthemen

Der Publisher-Durchsatz für importierte Themen ist durch das Publish-Kontingent des Themas begrenzt. Weitere Informationen finden Sie unter Pub/Sub-Kontingente und Limits.

Nächste Schritte