Thementyp ändern

Sie können ein Importthema in ein Standardthema umwandeln und umgekehrt.

Importiertes Thema in ein Standardthema umwandeln

Wenn Sie ein Importthema in ein Standardthema umwandeln möchten, löschen Sie die Einstellungen für die Aufnahme. Führen Sie diese Schritte aus:

Console

  1. Rufen Sie in der Google Cloud Console die Seite Themen auf.

    Themen aufrufen

  2. Klicken Sie auf das Importthema.

  3. Klicken Sie auf der Seite mit den Themendetails auf Bearbeiten.

  4. Deaktivieren Sie die Option Aufnahme aktivieren.

  5. Klicken Sie auf Aktualisieren.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Führen Sie den Befehl gcloud pubsub topics update aus:

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    Ersetzen Sie TOPIC_ID durch die Themen-ID.

Standardthema in ein Amazon Kinesis Data Streams-Importthema umwandeln

Wenn Sie ein Standardthema in ein Amazon Kinesis Data Streams-Importthema umwandeln möchten, müssen Sie zuerst alle Voraussetzungen erfüllen.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Themen auf.

    Themen aufrufen

  2. Klicken Sie auf das Thema, das Sie in ein Importthema umwandeln möchten.

  3. Klicken Sie auf der Seite mit den Themendetails auf Bearbeiten.

  4. Wählen Sie die Option Aufnahme aktivieren aus.

  5. Wählen Sie als Quelle für die Aufnahme Amazon Kinesis Data Streams aus.

  6. Geben Sie die folgenden Informationen ein:

    • ARN des Kinesis-Streams: Der ARN für den Kinesis Data Stream, den Sie in Pub/Sub aufnehmen möchten. Das ARN-Format ist wie folgt: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • ARN des Kinesis-Nutzers: Der ARN der Consumer-Ressource, die für den AWS Kinesis Data Stream registriert ist. Das ARN-Format ist wie folgt: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • ARN der AWS-Rolle: Der ARN der AWS-Rolle. Der ARN der Rolle hat folgendes Format: arn:aws:iam::${Account}:role/${RoleName}.

    • Dienstkonto: Das Dienstkonto, das Sie in Dienstkonto in Google Clouderstellen erstellt haben.

  7. Klicken Sie auf Aktualisieren.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Führen Sie den Befehl gcloud pubsub topics update mit allen im folgenden Beispiel genannten Flags aus:

    gcloud pubsub topics update TOPIC_ID \
         --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
         --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
         --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
         --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
      

    Ersetzen Sie Folgendes:

    • TOPIC_ID ist die ID oder der Name des Themas. Dieses Feld kann nicht aktualisiert werden.

    • KINESIS_STREAM_ARN ist der ARN für die Kinesis Data Streams, die Sie in Pub/Sub aufnehmen möchten. Das ARN-Format ist wie folgt: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN ist der ARN der Consumer-Ressource, die bei AWS Kinesis Data Streams registriert ist. Das ARN-Format ist wie folgt: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN ist der ARN der AWS-Rolle. Der ARN der Rolle hat folgendes Format: arn:aws:iam::${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT ist das Dienstkonto, das Sie in Dienstkonto in Google Clouderstellen erstellt haben.

Go

Folgen Sie der Einrichtungsanleitung für Go in der Pub/Sub-Kurzanleitung: Clientbibliotheken verwenden, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Go API.

Richten Sie zur Authentifizierung bei Pub/Sub Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/protobuf/types/known/fieldmaskpb"
)

func updateTopicType(w io.Writer, projectID, topic string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	pbTopic := &pubsubpb.Topic{
		Name: topic,
		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
			Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{
				AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{
					StreamArn:         streamARN,
					ConsumerArn:       consumerARN,
					AwsRoleArn:        awsRoleARN,
					GcpServiceAccount: gcpServiceAccount,
				},
			},
		},
	}
	updateReq := &pubsubpb.UpdateTopicRequest{
		Topic: pbTopic,
		UpdateMask: &fieldmaskpb.FieldMask{
			Paths: []string{"ingestion_data_source_settings"},
		},
	}
	topicCfg, err := client.TopicAdminClient.UpdateTopic(ctx, updateReq)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

Folgen Sie der Einrichtungsanleitung für Java in der Pub/Sub-Kurzanleitung: Clientbibliotheken verwenden, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.

Richten Sie zur Authentifizierung bei Pub/Sub Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

Folgen Sie der Einrichtungsanleitung für Node.js in der Pub/Sub-Kurzanleitung: Clientbibliotheken verwenden, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

Richten Sie zur Authentifizierung bei Pub/Sub Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn,
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

Folgen Sie der Einrichtungsanleitung für Python in der Pub/Sub-Kurzanleitung: Clientbibliotheken verwenden, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Python API.

Richten Sie zur Authentifizierung bei Pub/Sub Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

Folgen Sie der Einrichtungsanleitung für C++ in der Pub/Sub-Kurzanleitung: Clientbibliotheken verwenden, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

Richten Sie zur Authentifizierung bei Pub/Sub Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Lesen Sie unter Pub/Sub-Kurzanleitung: Clientbibliotheken verwenden die Anleitung für die Einrichtung von Node.js, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

Richten Sie zur Authentifizierung bei Pub/Sub Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string,
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Weitere Informationen zu ARNs finden Sie unter Amazon-Ressourcennamen (ARNs) und IAM-Kennungen.

Standardthema in ein Cloud Storage-Importthema umwandeln

Wenn Sie ein Standardthema in ein Cloud Storage-Importthema umwandeln möchten, müssen Sie zuerst alle Voraussetzungen erfüllen.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Themen auf.

    Themen aufrufen

  2. Klicken Sie auf das Thema, das Sie in ein Cloud Storage-Importthema umwandeln möchten.

  3. Klicken Sie auf der Seite mit den Themendetails auf Bearbeiten.

  4. Wählen Sie die Option Aufnahme aktivieren aus.

  5. Wählen Sie als Erfassungsquelle Google Cloud Storage aus.

  6. Klicken Sie für den Cloud Storage-Bucket auf Durchsuchen.

    Die Seite Bucket auswählen wird geöffnet. Wählen Sie eine der folgenden Optionen aus:

    • Wählen Sie einen vorhandenen Bucket aus einem beliebigen geeigneten Projekt aus.

    • Klicken Sie auf das Symbol zum Erstellen und folgen Sie der Anleitung auf dem Bildschirm, um einen neuen Bucket zu erstellen. Nachdem Sie den Bucket erstellt haben, wählen Sie ihn für das Cloud Storage-Importthema aus.

  7. Wenn Sie den Bucket angeben, prüft Pub/Sub, ob das Pub/Sub-Dienstkonto die entsprechenden Berechtigungen für den Bucket hat. Bei Berechtigungsproblemen wird eine entsprechende Fehlermeldung angezeigt.

    Wenn Sie Probleme mit Berechtigungen haben, klicken Sie auf Berechtigungen festlegen. Weitere Informationen finden Sie unter Cloud Storage-Berechtigungen für das Pub/Sub-Dienstkonto gewähren.

  8. Wählen Sie für Objektformat die Option Text, Avro oder Pub/Sub-Avro aus.

    Wenn Sie Text auswählen, können Sie optional ein Trennzeichen angeben, mit dem Objekte in Nachrichten aufgeteilt werden.

    Weitere Informationen zu diesen Optionen finden Sie unter Eingabeformat.

  9. Optional. Sie können für Ihr Thema eine Mindesterstellungszeit der Objekte angeben. Wenn festgelegt, werden nur Objekte aufgenommen, die nach der Mindesterstellungszeit der Objekte erstellt wurden.

    Weitere Informationen finden Sie unter Mindesterstellungszeit der Objekte.

  10. Sie müssen ein Glob-Muster angeben. Wenn Sie alle Objekte im Bucket aufnehmen möchten, verwenden Sie ** als Glob-Muster. Es werden nur Objekte aufgenommen, die dem angegebenen Muster entsprechen.

    Weitere Informationen finden Sie unter Glob-Muster abgleichen.

  11. Behalten Sie die anderen Standardeinstellungen bei.
  12. Klicken Sie auf Thema aktualisieren.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Damit die Einstellungen für das Importthema nicht verloren gehen, müssen Sie sie bei jeder Aktualisierung des Themas angeben. Wenn Sie etwas weglassen, wird die Einstellung von Pub/Sub auf den ursprünglichen Standardwert zurückgesetzt.

    Führen Sie den Befehl gcloud pubsub topics update mit allen im folgenden Beispiel genannten Flags aus:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Ersetzen Sie Folgendes:

    • TOPIC_ID ist die ID oder der Name des Themas. Dieses Feld kann nicht aktualisiert werden.

    • BUCKET_NAME: Gibt den Namen eines vorhandenen Buckets an. Beispiel: prod_bucket. Der Bucket-Name darf die Projekt-ID nicht enthalten. Informationen zum Erstellen eines Buckets finden Sie unter Buckets erstellen.

    • INPUT_FORMAT: Gibt das Format der aufgenommenen Objekte an. Das kann text, avro oder pubsub_avro sein. Weitere Informationen zu diesen Optionen finden Sie unter Eingabeformat.

    • TEXT_DELIMITER: Gibt das Trennzeichen an, mit dem Textobjekte in Pub/Sub-Nachrichten aufgeteilt werden. Dies muss ein einzelnes Zeichen sein und darf nur festgelegt werden, wenn INPUT_FORMAT gleich text ist. Die Standardeinstellung ist das Zeilenumbruchzeichen (\n).

      Achten Sie bei der Angabe des Trennzeichens mit der gcloud CLI auf die Verarbeitung von Sonderzeichen wie dem Zeilenumbruch \n. Verwenden Sie das Format '\n', damit das Trennzeichen richtig interpretiert wird. Wenn Sie \n ohne Anführungszeichen oder Escapezeichen verwenden, wird "n" als Trennzeichen verwendet.

    • MINIMUM_OBJECT_CREATE_TIME: Gibt die Mindestzeit an, zu der ein Objekt erstellt wurde, damit es aufgenommen werden kann. Sie muss in UTC im Format YYYY-MM-DDThh:mm:ssZ angegeben werden. Beispiel: 2024-10-14T08:30:30Z

      Jedes Datum zwischen 0001-01-01T00:00:00Z und 9999-12-31T23:59:59Z (einschließlich), unabhängig davon, ob es in der Vergangenheit oder Zukunft liegt, ist gültig.

    • MATCH_GLOB: Gibt das Glob-Muster an, das abgeglichen werden muss, damit ein Objekt aufgenommen wird. Wenn Sie die gcloud CLI verwenden, muss ein Glob-Muster mit *-Zeichen das *-Zeichen als Escapezeichen in der Form \*\*.txt enthalten oder das gesamte Glob-Muster muss in Anführungszeichen "**.txt" oder '**.txt' stehen. Informationen zur unterstützten Syntax für Glob-Muster finden Sie in der Cloud Storage-Dokumentation.