Cloud Storage-Importthema erstellen

Mit einem Cloud Storage-Importthema können Sie Daten kontinuierlich aus Cloud Storage in Pub/Sub aufnehmen. Anschließend können Sie die Daten an eines der Ziele streamen, die von Pub/Sub unterstützt werden. Pub/Sub erkennt automatisch neue Objekte, die dem Cloud Storage-Bucket hinzugefügt werden, und nimmt sie auf.

Cloud Storage ist ein Dienst zum Speichern Ihrer Objekte in Google Cloud. Ein Objekt ist ein unveränderliches Datenelement, das aus einer Datei mit einem beliebigen Format besteht. Objekte werden in Containern gespeichert, die als Buckets bezeichnet werden. Buckets können auch verwaltete Ordner enthalten, mit denen Sie erweiterten Zugriff auf Gruppen von Objekten mit einem gemeinsamen Namenspräfix gewähren können.

Weitere Informationen zu Cloud Storage finden Sie in der Cloud Storage-Dokumentation.

Weitere Informationen zu Importthemen finden Sie unter Importthemen.

Hinweise

Erforderliche Rollen und Berechtigungen zum Verwalten von Cloud Storage-Importthemen

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Pub/Sub Editor (roles/pubsub.editor) für Ihr Thema oder Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Erstellen und Verwalten eines Cloud Storage-Importthemas benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Erstellen und Verwalten eines Cloud Storage-Import-Themas erforderlich sind. Erweitern Sie den Abschnitt Erforderliche Berechtigungen, um die erforderlichen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind erforderlich, um ein Cloud Storage-Importthema zu erstellen und zu verwalten:

  • So erstellen Sie ein Importthema: pubsub.topics.create
  • Importthema löschen: pubsub.topics.delete
  • Importthema abrufen: pubsub.topics.get
  • Importthema angeben: pubsub.topics.list
  • In einem importierten Thema veröffentlichen: pubsub.topics.publish
  • Importthema aktualisieren: pubsub.topics.update
  • IAM-Richtlinie für ein Importthema abrufen: pubsub.topics.getIamPolicy
  • Konfigurieren Sie die IAM-Richtlinie für ein Importthema: pubsub.topics.setIamPolicy

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Sie können die Zugriffssteuerung auf Projektebene und auf der Ebene einzelner Ressourcen konfigurieren.

Die Nachrichtenspeicherrichtlinie entspricht dem Speicherort des Buckets

Die Nachrichtenspeicherrichtlinie des Pub/Sub-Themas muss sich mit den Regionen überschneiden, in denen sich Ihr Cloud Storage-Bucket befindet. Diese Richtlinie legt fest, wo Ihre Nachrichtendaten in Pub/Sub gespeichert werden dürfen.

  • Für Bucket mit dem Standorttyp „Region“: Die Richtlinie muss diese Region enthalten. Wenn sich Ihr Bucket beispielsweise in der Region us-central1 befindet, muss die Speicherrichtlinie für Nachrichten auch us-central1 enthalten.

  • Für Buckets mit dem Standorttyp „Dual-Region“ oder „Multi-Region“: Die Richtlinie muss mindestens eine Region innerhalb des dual-regionalen oder multiregionalen Standorts enthalten. Wenn sich Ihr Bucket beispielsweise in der US multi-region befindet, kann die Nachrichtenspeicherrichtlinie us-central1, us-east1 oder eine andere Region innerhalb der US multi-region umfassen.

    Wenn die Richtlinie die Region des Buckets nicht enthält, schlägt die Themenerstellung fehl. Wenn sich Ihr Bucket beispielsweise in europe-west1 befindet und Ihre Nachrichtenspeicherrichtlinie nur asia-east1 umfasst, erhalten Sie eine Fehlermeldung.

    Wenn die Nachrichtenspeicherrichtlinie nur eine Region enthält, die sich mit dem Speicherort des Bucket überschneidet, ist die regionenübergreifende Redundanz möglicherweise beeinträchtigt. Wenn diese einzelne Region nicht verfügbar ist, sind Ihre Daten möglicherweise nicht zugänglich. Um für vollständige Redundanz zu sorgen, sollten Sie in der Richtlinie für die Nachrichtenspeicherung mindestens zwei Regionen angeben, die zum multiregionalen oder biregionalen Speicherort des Buckets gehören.

Weitere Informationen zu den Speicherorten von Buckets finden Sie in der Dokumentation.

Dem Pub/Sub-Dienstkonto die Rolle „Pub/Sub-Publisher“ hinzufügen

Sie müssen dem Pub/Sub-Dienstkonto die Rolle „Pub/Sub-Publisher“ zuweisen, damit Pub/Sub im Cloud Storage-Importthema veröffentlichen kann.

Veröffentlichung für alle Cloud Storage-Importthemen aktivieren

Wählen Sie diese Option aus, wenn in Ihrem Projekt kein Cloud Storage-Import-Thema verfügbar ist.

  1. Öffnen Sie in der Google Cloud Console die Seite IAM.

    IAM aufrufen

  2. Aktivieren Sie die Option Von Google bereitgestellte Rollenzuweisungen einschließen.

  3. Suchen Sie nach dem Pub/Sub-Dienstkonto im folgenden Format:

    service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com

  4. Klicken Sie für dieses Dienstkonto auf die Schaltfläche Hauptkonto bearbeiten.

  5. Klicken Sie bei Bedarf auf Weitere Rolle hinzufügen.

  6. Suchen Sie nach der Pub/Sub-Publisher-Rolle (roles/pubsub.publisher) und wählen Sie sie aus.

  7. Klicken Sie auf Speichern.

Veröffentlichung in einem einzelnen Cloud Storage-Importthema aktivieren

Wenn Sie Pub/Sub die Berechtigung zum Veröffentlichen in einem bestimmten Cloud Storage-Importthema gewähren möchten, das bereits vorhanden ist, gehen Sie so vor:

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Führen Sie den Befehl gcloud pubsub topics add-iam-policy-binding aus:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    Ersetzen Sie Folgendes:

    • TOPIC_ID ist die ID oder der Name des Cloud Storage-Import-Themas.

    • ist PROJECT_NUMBER die Projektnummer. Informationen zum Ermitteln der Projektnummer finden Sie unter Projekte identifizieren.

Cloud Storage-Rollen dem Pub/Sub-Dienstkonto zuweisen

Damit ein Cloud Storage-Importthema erstellt werden kann, muss das Pub/Sub-Dienstkonto die Berechtigung zum Lesen aus dem entsprechenden Cloud Storage-Bucket haben. Folgende Berechtigungen sind erforderlich:

  • storage.objects.list
  • storage.objects.get
  • storage.buckets.get

Sie können diese Berechtigungen dem Pub/Sub-Dienstkonto auf folgende Weise zuweisen:

  • Berechtigungen auf Bucket-Ebene erteilen Weisen Sie dem Pub/Sub-Dienstkonto für den jeweiligen Cloud Storage-Bucket die Rollen „Leser alter Storage-Objekte“ (roles/storage.legacyObjectReader) und „Leser alter Storage-Buckets“ (roles/storage.legacyBucketReader) zu.

  • Wenn Sie Rollen auf Projektebene zuweisen müssen, können Sie stattdessen die Rolle „Storage-Administrator“ (roles/storage.admin) für das Projekt zuweisen, das den Cloud Storage-Bucket enthält. Weisen Sie diese Rolle dem Pub/Sub-Dienstkonto zu.

Bucket-Berechtigungen

Führen Sie die folgenden Schritte aus, um dem Pub/Sub-Dienstkonto auf Bucketebene die Rollen „Storage Legacy Object Reader“ (roles/storage.legacyObjectReader) und „Storage Legacy Bucket Reader“ (roles/storage.legacyBucketReader) zuzuweisen:

  1. Wechseln Sie in der Google Cloud Console zum Cloud Storage-Browser.

    Cloud Storage aufrufen

  2. Klicken Sie auf den Cloud Storage-Bucket, aus dem Sie Nachrichten lesen und in das Cloud Storage-Importthema importieren möchten.

    Die Seite Bucket-Details wird geöffnet.

  3. Klicken Sie auf der Seite Bucket-Details auf den Tab Berechtigungen.

  4. Klicken Sie auf dem Tab Berechtigungen > Nach Hauptkonten ansehen auf Zugriff gewähren.

    Die Seite Zugriff gewähren wird geöffnet.

  5. Geben Sie im Bereich Hauptkonten hinzufügen den Namen Ihres Pub/Sub-Dienstkontos ein.

    Das Dienstkonto hat das Format service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. Bei einem Projekt mit PROJECT_NUMBER=112233445566 hat das Dienstkonto beispielsweise das Format service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com.

  6. Geben Sie im Drop-down-Menü Rollen zuweisen > Rolle auswählen die Zeichenfolge Object Reader ein und wählen Sie die Rolle Storage Legacy Object Reader aus.

  7. Klicken Sie auf Weitere Rolle hinzufügen.

  8. Geben Sie im Drop-down-Menü Rolle auswählen die Zahl Bucket Reader ein und wählen Sie die Rolle Storage Legacy Bucket Reader aus.

  9. Klicken Sie auf Speichern.

Projektberechtigungen

Führen Sie die folgenden Schritte aus, um die Rolle „Storage-Administrator“ (roles/storage.admin) auf Projektebene zuzuweisen:

  1. Öffnen Sie in der Google Cloud Console die Seite IAM.

    IAM aufrufen

  2. Klicken Sie auf dem Tab Berechtigungen > Nach Hauptkonten ansehen auf Zugriff gewähren.

    Die Seite Zugriff gewähren wird geöffnet.

  3. Geben Sie im Bereich Hauptkonten hinzufügen den Namen Ihres Pub/Sub-Dienstkontos ein.

    Das Dienstkonto hat das Format service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. Bei einem Projekt mit PROJECT_NUMBER=112233445566 hat das Dienstkonto beispielsweise das Format service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. Geben Sie im Drop-down-Menü Rollen zuweisen > Rolle auswählen die Zahl Storage Admin ein und wählen Sie die Rolle Storage-Administrator aus.

  5. Klicken Sie auf Speichern.

Weitere Informationen zu Cloud Storage IAM finden Sie unter Identity and Access Management für Cloud Storage.

Eigenschaften von Cloud Storage-Importthemen

Weitere Informationen zu den gemeinsamen Properties für alle Themen finden Sie unter Properties eines Themas.

Bucket-Name

Das ist der Name des Cloud Storage-Bucket, aus dem Pub/Sub die Daten liest, die in einem Cloud Storage-Importthema veröffentlicht werden.

Eingabeformat

Wenn Sie ein Cloud Storage-Importthema erstellen, können Sie das Format der zu ingestierenden Objekte als Text, Avro oder Pub/Sub Avro angeben.

  • Text Es wird davon ausgegangen, dass Objekte Daten im Nur-Text-Format enthalten. Bei diesem Eingabeformat werden alle Objekte im Bucket aufgenommen, sofern sie die Mindesterstellungszeit der Objekte einhalten und den Kriterien für das Glob-Muster entsprechen.

    Trennzeichen. Sie können auch ein Trennzeichen angeben, anhand dessen Objekte in Nachrichten aufgeteilt werden. Wenn das Trennzeichen nicht festgelegt ist, wird standardmäßig das Zeilenvorschubzeichen (\n) verwendet. Das Trennzeichen darf nur ein einzelnes Zeichen sein.

  • Avro. Die Objekte liegen im Apache Avro-Binärformat vor. Objekte, die nicht im gültigen Apache Avro-Format vorliegen, werden nicht aufgenommen. Für Avro gelten die folgenden Einschränkungen:

    • Die Avro-Versionen 1.1.0 und 1.2.0 werden nicht unterstützt.
    • Die maximale Größe eines Avro-Blocks beträgt 16 MB.
  • Pub/Sub Avro. Die Objekte sind im Binärformat von Apache Avro mit einem Schema, das dem eines Objekts entspricht, das mit einem Cloud Storage-Pub/Sub-Abo im Avro-Dateiformat in Cloud Storage geschrieben wurde. Hier sind einige wichtige Richtlinien für Pub/Sub Avro:

    • Das Datenfeld des Avro-Eintrags wird verwendet, um das Datenfeld der generierten Pub/Sub-Nachricht zu füllen.

    • Wenn die Option write_metadata für das Cloud Storage-Abo angegeben ist, werden alle Werte im Feld „attributes“ als Attribute der generierten Pub/Sub-Nachricht eingefügt.

    • Wenn in der ursprünglichen Nachricht, die in Cloud Storage geschrieben wurde, ein Sortierschlüssel angegeben ist, wird dieses Feld in der generierten Pub/Sub-Nachricht als Attribut mit dem Namen original_message_ordering_key ausgefüllt.

Mindesterstellungszeit der Objekte

Beim Erstellen eines Cloud Storage-Importthemas können Sie optional eine Mindesterstellungszeit für Objekte angeben. Es werden nur Objekte aufgenommen, die an oder nach diesem Zeitstempel erstellt wurden. Dieser Zeitstempel muss in einem Format wie YYYY-MM-DDThh:mm:ssZ angegeben werden. Gültig sind alle Datumsangaben zwischen 0001-01-01T00:00:00Z und 9999-12-31T23:59:59Z, sowohl in der Vergangenheit als auch in der Zukunft.

Mit Glob-Muster abgleichen

Beim Erstellen eines Cloud Storage-Importthemas können Sie optional ein Globus-Muster für die Übereinstimmung angeben. Es werden nur Objekte aufgenommen, deren Namen diesem Muster entsprechen. Wenn Sie beispielsweise alle Objekte mit dem Suffix .txt aufnehmen möchten, können Sie das Glob-Muster als **.txt angeben.

Informationen zur unterstützten Syntax für Glob-Muster finden Sie in der Cloud Storage-Dokumentation.

Cloud Storage-Importthema erstellen

Führen Sie die folgenden Schritte aus:

Wenn Sie das Thema und das Abo separat erstellen, kann das zu Datenverlusten führen, auch wenn Sie das in schneller Folge tun. Es gibt eine kurze Zeitspanne, in der das Thema ohne Abo verfügbar ist. Daten, die in dieser Zeit an das Thema gesendet werden, gehen verloren. Wenn Sie zuerst das Thema und dann das Abo erstellen und das Thema dann in ein Importthema umwandeln, wird sichergestellt, dass beim Import keine Nachrichten übersehen werden.

So erstellen Sie ein Cloud Storage-Importthema:

Console

  1. Rufen Sie in der Google Cloud Console die Seite Themen auf.

    Themen aufrufen

  2. Klicken Sie auf Thema erstellen.

    Die Seite mit den Themendetails wird geöffnet.

  3. Geben Sie im Feld Themen-ID eine ID für das Cloud Storage-Importthema ein.

    Weitere Informationen zur Benennung von Themen finden Sie in den Benennungsrichtlinien.

  4. Wählen Sie Standardabo hinzufügen aus.

  5. Wählen Sie Aufnahme aktivieren aus.

  6. Wählen Sie als Datenaufnahmequelle Google Cloud Storage aus.

  7. Klicken Sie für den Cloud Storage-Bucket auf Durchsuchen.

    Die Seite Bucket auswählen wird geöffnet. Wählen Sie eine der folgenden Optionen aus:

    • Wählen Sie einen vorhandenen Bucket aus einem geeigneten Projekt aus.

    • Klicken Sie auf das Symbol „Erstellen“ und folgen Sie der Anleitung auf dem Bildschirm, um einen neuen Bucket zu erstellen. Wählen Sie nach dem Erstellen des Buckets den Bucket für das Cloud Storage-Importthema aus.

  8. Wenn Sie den Bucket angeben, prüft Pub/Sub, ob das Pub/Sub-Dienstkonto die entsprechenden Berechtigungen für den Bucket hat. Bei Berechtigungsproblemen wird eine Meldung ähnlich der folgenden angezeigt:

    Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions.

    Wenn Berechtigungsprobleme auftreten, klicken Sie auf Berechtigungen festlegen. Weitere Informationen finden Sie unter Cloud Storage-Berechtigungen für das Pub/Sub-Dienstkonto gewähren.

  9. Wählen Sie unter Objektformat die Option Text, Avro oder Pub/Sub Avro aus.

    Wenn Sie Text auswählen, können Sie optional ein Trennzeichen angeben, mit dem Objekte in Nachrichten aufgeteilt werden.

    Weitere Informationen zu diesen Optionen finden Sie unter Eingabeformat.

  10. Optional. Sie können für Ihr Thema eine Mindesterstellungszeit der Objekte angeben. Wenn diese Option festgelegt ist, werden nur Objekte aufgenommen, die nach der Mindesterstellungszeit der Objekte erstellt wurden.

    Weitere Informationen finden Sie unter Mindesterstellungszeit der Objekte.

  11. Sie müssen ein Globusmuster angeben. Wenn Sie alle Objekte in den Bucket aufnehmen möchten, verwenden Sie ** als Glob-Muster. Wenn festgelegt, werden nur Objekte aufgenommen, die dem angegebenen Muster entsprechen.

    Weitere Informationen finden Sie unter Glob-Muster abgleichen.

  12. Behalten Sie die anderen Standardeinstellungen bei.
  13. Klicken Sie auf Thema erstellen.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Führen Sie den Befehl gcloud pubsub topics create aus:

    gcloud pubsub topics create TOPIC_ID\
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Im Befehl sind nur TOPIC_ID, das Flag --cloud-storage-ingestion-bucket und das Flag --cloud-storage-ingestion-input-format erforderlich. Die verbleibenden Flags sind optional und können weggelassen werden.

    Ersetzen Sie Folgendes:

    • TOPIC_ID: Der Name oder die ID des Themas.

    • BUCKET_NAME: Gibt den Namen eines vorhandenen Buckets an. Beispiel: prod_bucket. Der Bucket-Name darf keine Projekt-ID enthalten. Informationen zum Erstellen eines Buckets finden Sie unter Buckets erstellen.

    • INPUT_FORMAT: Gibt das Format der aufgenommenen Objekte an. Das kann text, avro oder pubsub_avro sein. Weitere Informationen zu diesen Optionen finden Sie unter Eingabeformat.

    • TEXT_DELIMITER: Gibt das Trennzeichen an, mit dem Textobjekte in Pub/Sub-Nachrichten aufgeteilt werden. Dies sollte ein einzelnes Zeichen sein und nur festgelegt werden, wenn INPUT_FORMAT text ist. Standardmäßig ist das Zeilenumbruchzeichen (\n) festgelegt.

      Achten Sie bei der Verwendung der gcloud CLI zum Angeben des Trennzeichens genau auf die Behandlung von Sonderzeichen wie dem Zeilenumbruch \n. Verwenden Sie das Format '\n', damit das Trennzeichen richtig interpretiert wird. Wenn Sie \n ohne Anführungszeichen oder Escape-Zeichen verwenden, wird das Trennzeichen "n" verwendet.

    • MINIMUM_OBJECT_CREATE_TIME: Gibt die Mindestzeit an, zu der ein Objekt erstellt wurde, damit es aufgenommen werden kann. Sie muss im UTC-Format YYYY-MM-DDThh:mm:ssZ vorliegen. Beispiel: 2024-10-14T08:30:30Z

      Gültig sind alle Datumsangaben zwischen 0001-01-01T00:00:00Z und 9999-12-31T23:59:59Z, sowohl in der Vergangenheit als auch in der Zukunft.

    • MATCH_GLOB: Gibt das Glob-Muster an, das mit einem Objekt übereinstimmen muss, damit es aufgenommen wird. Wenn Sie die gcloud CLI verwenden, muss das Zeichen * in einem Matchglob mit *-Zeichen in Form von \*\*.txt formatiert sein oder der gesamte Matchglob muss in Anführungszeichen "**.txt" oder '**.txt' stehen. Informationen zur unterstützten Syntax für Glob-Muster finden Sie in der Cloud Storage-Dokumentation.

Go

Folgen Sie der Einrichtungsanleitung für Go in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Go API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// bucket := "my-bucket"
	// matchGlob := "**.txt"
	// minimumObjectCreateTime := "2006-01-02T15:04:05Z"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	minCreateTime, err := time.Parse(time.RFC3339, minimumObjectCreateTime)
	if err != nil {
		return err
	}

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceCloudStorage{
				Bucket: bucket,
				// Alternatively, can be Avro or PubSubAvro formats. See
				InputFormat: &pubsub.IngestionDataSourceCloudStorageTextFormat{
					Delimiter: ",",
				},
				MatchGlob:               matchGlob,
				MinimumObjectCreateTime: minCreateTime,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Cloud storage topic created: %v\n", t)
	return nil
}

Java

Folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.text.ParseException;

public class CreateTopicWithCloudStorageIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Cloud Storage ingestion settings.
    // bucket and inputFormat are required arguments.
    String bucket = "your-bucket";
    String inputFormat = "text";
    String textDelimiter = "\n";
    String matchGlob = "**.txt";
    String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ";

    createTopicWithCloudStorageIngestionExample(
        projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
  }

  public static void createTopicWithCloudStorageIngestionExample(
      String projectId,
      String topicId,
      String bucket,
      String inputFormat,
      String textDelimiter,
      String matchGlob,
      String minimumObjectCreateTime)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder =
          IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket);
      switch (inputFormat) {
        case "text":
          cloudStorageBuilder.setTextFormat(
              IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder()
                  .setDelimiter(textDelimiter)
                  .build());
          break;
        case "avro":
          cloudStorageBuilder.setAvroFormat(
              IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance());
          break;
        case "pubsub_avro":
          cloudStorageBuilder.setPubsubAvroFormat(
              IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance());
          break;
        default:
          throw new IllegalArgumentException(
              "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat);
      }

      if (matchGlob != null && !matchGlob.isEmpty()) {
        cloudStorageBuilder.setMatchGlob(matchGlob);
      }

      if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) {
        try {
          cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime));
        } catch (ParseException e) {
          System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime);
        }
      }

      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder()
              .setCloudStorage(cloudStorageBuilder.build())
              .build();

      TopicName topicName = TopicName.of(projectId, topicId);

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

Folgen Sie der Einrichtungsanleitung für Node.js in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId,
  bucket,
  inputFormat,
  textDelimiter,
  matchGlob,
  minimumObjectCreateTime
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Python

Folgen Sie der Einrichtungsanleitung für Python in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Python API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

from google.cloud import pubsub_v1
from google.protobuf import timestamp_pb2
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bucket = "your-bucket"
# input_format = "text"  (can be one of "text", "avro", "pubsub_avro")
# text_delimiter = "\n"
# match_glob = "**.txt"
# minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
    bucket=bucket,
)
if input_format == "text":
    cloud_storage_settings.text_format = (
        IngestionDataSourceSettings.CloudStorage.TextFormat(
            delimiter=text_delimiter
        )
    )
elif input_format == "avro":
    cloud_storage_settings.avro_format = (
        IngestionDataSourceSettings.CloudStorage.AvroFormat()
    )
elif input_format == "pubsub_avro":
    cloud_storage_settings.pubsub_avro_format = (
        IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
    )
else:
    print(
        "Invalid input_format: "
        + input_format
        + "; must be in ('text', 'avro', 'pubsub_avro')"
    )
    return

if match_glob:
    cloud_storage_settings.match_glob = match_glob

if minimum_object_create_time:
    try:
        minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
        minimum_object_create_time_timestamp.FromJsonString(
            minimum_object_create_time
        )
        cloud_storage_settings.minimum_object_create_time = (
            minimum_object_create_time_timestamp
        )
    except ValueError:
        print("Invalid minimum_object_create_time: " + minimum_object_create_time)
        return

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        cloud_storage=cloud_storage_settings,
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")

C++

Folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string bucket, std::string const& input_format,
   std::string text_delimiter, std::string match_glob,
   std::string const& minimum_object_create_time) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto& cloud_storage = *request.mutable_ingestion_data_source_settings()
                             ->mutable_cloud_storage();
  cloud_storage.set_bucket(std::move(bucket));
  if (input_format == "text") {
    cloud_storage.mutable_text_format()->set_delimiter(
        std::move(text_delimiter));
  } else if (input_format == "avro") {
    cloud_storage.mutable_avro_format();
  } else if (input_format == "pubsub_avro") {
    cloud_storage.mutable_pubsub_avro_format();
  } else {
    std::cout << "input_format must be in ('text', 'avro', 'pubsub_avro'); "
                 "got value: "
              << input_format << std::endl;
    return;
  }

  if (!match_glob.empty()) {
    cloud_storage.set_match_glob(std::move(match_glob));
  }

  if (!minimum_object_create_time.empty()) {
    google::protobuf::Timestamp timestamp;
    if (!google::protobuf::util::TimeUtil::FromString(
            minimum_object_create_time,
            cloud_storage.mutable_minimum_object_create_time())) {
      std::cout << "Invalid minimum object create time: "
                << minimum_object_create_time << std::endl;
    }
  }

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Lesen Sie unter Pub/Sub-Kurzanleitung: Clientbibliotheken verwenden die Anleitung für die Einrichtung von Node.js, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId: string,
  bucket: string,
  inputFormat: string,
  textDelimiter: string,
  matchGlob: string,
  minimumObjectCreateTime: string
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata: TopicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Falls Probleme auftreten, lesen Sie den Hilfeartikel Probleme beim Importieren von Daten aus Google Cloud Storage beheben.

Cloud Storage-Importthemen bearbeiten

Sie können ein Cloud Storage-Importthema bearbeiten, um die Eigenschaften zu aktualisieren.

Wenn Sie die Datenaufnahme beispielsweise neu starten möchten, können Sie den Bucket ändern oder die Mindestzeit für die Objekterstellung aktualisieren.

So bearbeiten Sie ein Cloud Storage-Importthema:

Console

  1. Rufen Sie in der Google Cloud Console die Seite Themen auf.

    Themen aufrufen

  2. Klicken Sie auf das Thema „Cloud Storage-Import“.

  3. Klicken Sie auf der Seite mit den Themendetails auf Bearbeiten.

  4. Aktualisieren Sie die Felder, die Sie ändern möchten.

  5. Klicken Sie auf Aktualisieren.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Damit die Einstellungen für das importierte Thema nicht verloren gehen, sollten Sie sie jedes Mal angeben, wenn Sie das Thema aktualisieren. Wenn du etwas auslässt, wird die Einstellung von Pub/Sub auf den ursprünglichen Standardwert zurückgesetzt.

    Führen Sie den Befehl gcloud pubsub topics update mit allen im folgenden Beispiel genannten Flags aus:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Ersetzen Sie Folgendes:

    • TOPIC_ID ist die ID oder der Name des Themas. Dieses Feld kann nicht aktualisiert werden.

    • BUCKET_NAME: Gibt den Namen eines vorhandenen Buckets an. Beispiel: prod_bucket. Der Bucket-Name darf keine Projekt-ID enthalten. Informationen zum Erstellen eines Buckets finden Sie unter Buckets erstellen.

    • INPUT_FORMAT: Gibt das Format der aufgenommenen Objekte an. Das kann text, avro oder pubsub_avro sein. Weitere Informationen zu diesen Optionen finden Sie unter Eingabeformat.

    • TEXT_DELIMITER: Gibt das Trennzeichen an, mit dem Textobjekte in Pub/Sub-Nachrichten aufgeteilt werden. Dies sollte ein einzelnes Zeichen sein und nur festgelegt werden, wenn INPUT_FORMAT text ist. Standardmäßig ist das Zeilenumbruchzeichen (\n) festgelegt.

      Achten Sie bei der Verwendung der gcloud CLI zum Angeben des Trennzeichens genau auf die Behandlung von Sonderzeichen wie dem Zeilenumbruch \n. Verwenden Sie das Format '\n', damit das Trennzeichen richtig interpretiert wird. Wenn Sie \n ohne Anführungszeichen oder Escape-Zeichen verwenden, wird das Trennzeichen "n" verwendet.

    • MINIMUM_OBJECT_CREATE_TIME: Gibt die Mindestzeit an, zu der ein Objekt erstellt wurde, damit es aufgenommen werden kann. Sie muss im UTC-Format YYYY-MM-DDThh:mm:ssZ vorliegen. Beispiel: 2024-10-14T08:30:30Z

      Gültig sind alle Datumsangaben zwischen 0001-01-01T00:00:00Z und 9999-12-31T23:59:59Z, sowohl in der Vergangenheit als auch in der Zukunft.

    • MATCH_GLOB: Gibt das Glob-Muster an, das mit einem Objekt übereinstimmen muss, damit es aufgenommen wird. Wenn Sie die gcloud CLI verwenden, muss das Zeichen * in einem Matchglob mit *-Zeichen in Form von \*\*.txt formatiert sein oder der gesamte Matchglob muss in Anführungszeichen "**.txt" oder '**.txt' stehen. Informationen zur unterstützten Syntax für Glob-Muster finden Sie in der Cloud Storage-Dokumentation.

Kontingente und Limits für Cloud Storage-Importthemen

Der Publisher-Durchsatz für importierte Themen ist durch das Publish-Kontingent des Themas begrenzt. Weitere Informationen finden Sie unter Pub/Sub-Kontingente und Limits.

Nächste Schritte