Crea un argomento di importazione Cloud Storage

Un argomento di importazione da Cloud Storage ti consente di importare continuamente i dati da Cloud Storage in Pub/Sub. Poi puoi inviare i dati in streaming in una delle destinazioni supportate da Pub/Sub. Pub/Sub rileva automaticamente i nuovi oggetti aggiunti al bucket Cloud Storage e li importa.

Cloud Storage è un servizio per l'archiviazione di oggetti in Google Cloud. Un oggetto è un dato immutabile costituito da un file di qualsiasi formato. Gli oggetti vengono memorizzati in container chiamati bucket. I bucket possono contenere anche cartelle gestite, che utilizzi per fornire accesso avanzato a gruppi di oggetti con un prefisso del nome condiviso.

Per ulteriori informazioni su Cloud Storage, consulta la documentazione di Cloud Storage.

Per ulteriori informazioni sugli argomenti di importazione, consulta Informazioni sugli argomenti di importazione.

Prima di iniziare

Ruoli e autorizzazioni richiesti per gestire gli argomenti di importazione di Cloud Storage

Per ottenere le autorizzazioni necessarie per creare e gestire un argomento di importazione di Cloud Storage, chiedi all'amministratore di concederti il ruolo IAM Editor Pub/Sub (roles/pubsub.editor) per il tuo argomento o progetto. Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.

Questo ruolo predefinito contiene le autorizzazioni necessarie per creare e gestire un argomento di importazione di Cloud Storage. Per visualizzare le autorizzazioni esatte richieste, espandi la sezione Autorizzazioni richieste:

Autorizzazioni obbligatorie

Per creare e gestire un argomento di importazione di Cloud Storage sono necessarie le seguenti autorizzazioni:

  • Crea un argomento di importazione: pubsub.topics.create
  • Elimina un argomento di importazione: pubsub.topics.delete
  • Ricevi un argomento di importazione: pubsub.topics.get
  • Elenca un argomento di importazione: pubsub.topics.list
  • Pubblica in un argomento di importazione: pubsub.topics.publish
  • Aggiorna un argomento di importazione: pubsub.topics.update
  • Recupera il criterio IAM per un argomento di importazione: pubsub.topics.getIamPolicy
  • Configura il criterio IAM per un argomento di importazione: pubsub.topics.setIamPolicy

Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.

Puoi configurare il controllo dell'accesso a livello di progetto e di singola risorsa.

Il criterio di archiviazione dei messaggi è conforme alla località del bucket

Il criterio di archiviazione dei messaggi dell'argomento Pub/Sub deve sovrapporsi alle regioni in cui si trova il bucket Cloud Storage. Questa norma prescrive dove Pub/Sub può archiviare i dati dei messaggi.

  • Per i bucket con tipo di località come regione: il criterio deve includere la regione specifica. Ad esempio, se il bucket si trova nella regione us-central1, il criterio di archiviazione dei messaggi deve includere anche us-central1.

  • Per i bucket con tipo di località a due o più regioni: il criterio deve includere almeno una regione all'interno della località a due o più regioni. Ad esempio, se il bucket si trova in US multi-region, il criterio di archiviazione dei messaggi potrebbe includere us-central1, us-east1 o qualsiasi altra regione all'interno di US multi-region.

    Se il criterio non include la regione del bucket, la creazione dell'argomento non va a buon fine. Ad esempio, se il bucket si trova in europe-west1 e il criterio di archiviazione dei messaggi include solo asia-east1, riceverai un errore.

    Se il criterio di archiviazione dei messaggi include una sola regione che si sovrappone alla posizione del bucket, la ridondanza multi-regione potrebbe essere compromessa. Questo accade perché se la singola regione non è più disponibile, i tuoi dati potrebbero non essere accessibili. Per garantire la ridondanza completa, ti consigliamo di includere almeno due regioni nel criterio di archiviazione dei messaggi che fanno parte della località multiregione o doppia regione del bucket.

Per saperne di più sulle località dei bucket, consulta la documentazione.

Aggiungi il ruolo Publisher Pub/Sub all'account di servizio Pub/Sub

Devi assegnare il ruolo Publisher Pub/Sub all'account di servizio Pub/Sub in modo che Pub/Sub possa pubblicare nell'argomento di importazione di Cloud Storage.

Attivare la pubblicazione in tutti gli argomenti di importazione di Cloud Storage

Scegli questa opzione se non hai un argomento di importazione Cloud Storage disponibile nel tuo progetto.

  1. Nella console Google Cloud, vai alla pagina IAM.

    Vai a IAM

  2. Attiva l'opzione Includi concessioni di ruoli fornite da Google.

  3. Cerca l'account di servizio Pub/Sub con il seguente formato:

    service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com

  4. Per questo account di servizio, fai clic sul pulsante Modifica entità.

  5. Se necessario, fai clic su Aggiungi un altro ruolo.

  6. Cerca e seleziona il ruolo Publisher Pub/Sub (roles/pubsub.publisher).

  7. Fai clic su Salva.

Attivare la pubblicazione in un singolo argomento di importazione Cloud Storage

Se vuoi concedere a Pub/Sub l'autorizzazione per pubblicare in un argomento di importazione Cloud Storage specifico esistente, segui questi passaggi:

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Esegui il comando gcloud pubsub topics add-iam-policy-binding:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    Sostituisci quanto segue:

    • TOPIC_ID è l'ID o il nome dell'argomento di importazione di Cloud Storage.

    • PROJECT_NUMBER è il numero del progetto. Per visualizzare il numero del progetto, consulta Identificazione dei progetti.

Assegna i ruoli Cloud Storage all'account di servizio Pub/Sub

Per creare un argomento di importazione Cloud Storage, l'account di servizio Pub/Sub deve disporre dell'autorizzazione di lettura dal bucket Cloud Storage specifico. Sono richieste le seguenti autorizzazioni:

  • storage.objects.list
  • storage.objects.get
  • storage.buckets.get

Per assegnare queste autorizzazioni all'account di servizio Pub/Sub, scegli una delle seguenti procedure:

  • Concedi le autorizzazioni a livello di bucket. Al bucket Cloud Storage specifico, concedi all'account di servizio Pub/Sub i ruoli Legacy Object Reader (roles/storage.legacyObjectReader) e Legacy Bucket Reader (roles/storage.legacyBucketReader).

  • Se devi concedere i ruoli a livello di progetto, puoi concedere il ruolo Amministratore archiviazione (roles/storage.admin) al progetto contenente il bucket Cloud Storage. Concedi questo ruolo all'account di servizio Pub/Sub.

Autorizzazioni del bucket

Esegui i seguenti passaggi per concedere al account di servizio Pub/Sub i ruoli Storage Legacy Object Reader (roles/storage.legacyObjectReader) e Storage Legacy Bucket Reader (roles/storage.legacyBucketReader) a livello di bucket:

  1. Nella console Google Cloud, vai alla pagina Cloud Storage.

    Vai a Cloud Storage

  2. Fai clic sul bucket Cloud Storage da cui vuoi leggere i messaggi e importarli nell'argomento di importazione di Cloud Storage.

    Viene visualizzata la pagina Dettagli bucket.

  3. Nella pagina Dettagli bucket, fai clic sulla scheda Autorizzazioni.

  4. Nella scheda Autorizzazioni > Visualizza per entità, fai clic su Concedi accesso.

    Viene visualizzata la pagina Concedi l'accesso.

  5. Nella sezione Aggiungi entità, inserisci il nome del tuo account di servizio Pub/Sub.

    Il formato dell'account di servizio èservice-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. Ad esempio, per un progetto con PROJECT_NUMBER=112233445566, l'account di servizio è nel formato service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com.

  6. Nel menu a discesa Assegna i ruoli > Seleziona un ruolo, inserisci Object Reader e seleziona il ruolo Lettore di oggetti legacy di Storage.

  7. Fai clic su Aggiungi un altro ruolo.

  8. Nel menu a discesa Seleziona un ruolo, inserisci Bucket Reader e seleziona il ruolo Lettore di bucket legacy di Storage.

  9. Fai clic su Salva.

Autorizzazioni per il progetto

Per concedere il ruolo Amministratore archiviazione (roles/storage.admin) a livello di progetto, svolgi i seguenti passaggi:

  1. Nella console Google Cloud, vai alla pagina IAM.

    Vai a IAM

  2. Nella scheda Autorizzazioni > Visualizza per entità, fai clic su Concedi accesso.

    Viene visualizzata la pagina Concedi l'accesso.

  3. Nella sezione Aggiungi entità, inserisci il nome del tuo account di servizio Pub/Sub.

    Il formato dell'account di servizio èservice-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. Ad esempio, per un progetto con PROJECT_NUMBER=112233445566, l'account di servizio è nel formato service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. Nel menu a discesa Assegna i ruoli > Seleziona un ruolo, inserisci Storage Admin e seleziona il ruolo Amministratore dello spazio di archiviazione.

  5. Fai clic su Salva.

Per ulteriori informazioni su Cloud Storage IAM, consulta Identity and Access Management (IAM) di Cloud Storage.

Argomenti sulle proprietà dell'importazione da Cloud Storage

Per ulteriori informazioni sulle proprietà comuni a tutti gli argomenti, consulta Proprietà di un argomento.

Nome bucket

Si tratta del nome del bucket Cloud Storage da cui Pub/Sub legge i dati pubblicati in un argomento di importazione Cloud Storage.

Formato di input

Quando crei un argomento di importazione di Cloud Storage, puoi specificare il formato degli oggetti da importare come Testo, Avro o Avro Pub/Sub.

  • Testo. Si presume che gli oggetti contengano dati in testo normale. Questo formato di input tenta di importare tutti gli oggetti nel bucket, a condizione che l'oggetto soddisfi il tempo minimo di creazione dell'oggetto e corrisponda ai criteri del pattern glob.

    Delimitatore. Puoi anche specificare un delimitatore in base al quale gli oggetti vengono suddivisi in messaggi. Se non viene impostato, viene utilizzato per impostazione predefinita il carattere di a capo (\n). Il delimitatore deve essere costituito da un solo carattere.

  • Avro. Gli oggetti sono nel formato binario Apache Avro. Qualsiasi oggetto non in un formato Apache Avro valido non viene importato. Di seguito sono riportate le limitazioni relative ad Avro:

    • Le versioni 1.1.0 e 1.2.0 di Avro non sono supportate.
    • La dimensione massima di un blocco Avro è 16 MB.
  • Pub/Sub Avro. Gli oggetti sono nel formato binario Apache Avro con uno schema corrispondente a quello di un oggetto scritto in Cloud Storage utilizzando un abbonamento Cloud Storage Pub/Sub con il formato file Avro. Di seguito sono riportate alcune linee guida importanti per Avro di Pub/Sub:

    • Il campo dati del record Avro viene utilizzato per compilare il campo dati del messaggio Pub/Sub generato.

    • Se per l'abbonamento Cloud Storage è specificata l'opzione write_metadata, tutti i valori nel campo degli attributi vengono inseriti come attributi del messaggio Pub/Sub generato.

    • Se nel messaggio originale scritto in Cloud Storage è specificata una chiave di ordinamento, questo campo viene compilato come attributo con il nome original_message_ordering_key nel messaggio Pub/Sub generato.

Ora minima di creazione dell'oggetto

Se vuoi, puoi specificare un tempo minimo di creazione dell'oggetto quando crei un argomento di importazione di Cloud Storage. Verranno importati solo gli oggetti creati a partire da questo timestamp. Questo timestamp deve essere fornito in un formato come YYYY-MM-DDThh:mm:ssZ. È valida qualsiasi data, passata o futura, da 0001-01-01T00:00:00Z a 9999-12-31T23:59:59Z inclusi.

Corrispondenza pattern glob

Se vuoi, puoi specificare un pattern glob di corrispondenza quando crei un argomento di importazione da Cloud Storage. Verranno importati solo gli oggetti con nomi che corrispondono a questo pattern. Ad esempio, per importare tutti gli oggetti con suffisso .txt, puoi specificare il pattern glob come **.txt.

Per informazioni sulla sintassi supportata per i pattern glob, consulta la documentazione di Cloud Storage.

Crea un argomento di importazione Cloud Storage

Assicurati di aver completato le seguenti procedure:

La creazione separata dell'argomento e della sottoscrizione, anche se eseguita in rapida successione, può comportare la perdita di dati. Esiste un breve periodo di tempo in cui l'argomento esiste senza un abbonamento. Se durante questo periodo vengono inviati dati all'argomento, andranno persi. Se crei prima l'argomento, poi la sottoscrizione e infine converti l'argomento in un argomento di importazione, garantisci che nessun messaggio venga perso durante il processo di importazione.

Per creare un argomento di importazione da Cloud Storage:

Console

  1. Nella console Google Cloud, vai alla pagina Topic.

    Vai ad Argomenti

  2. Fai clic su Crea argomento.

    Viene visualizzata la pagina dei dettagli dell'argomento.

  3. Nel campo ID argomento, inserisci un ID per l'argomento di importazione di Cloud Storage.

    Per ulteriori informazioni sugli argomenti relativi ai nomi, consulta le linee guida per i nomi.

  4. Seleziona Aggiungi una sottoscrizione predefinita.

  5. Seleziona Attiva importazione.

  6. Per l'origine di importazione, seleziona Google Cloud Storage.

  7. Per il bucket Cloud Storage, fai clic su Sfoglia.

    Viene visualizzata la pagina Seleziona bucket. Seleziona una delle seguenti opzioni:

    • Seleziona un bucket esistente da qualsiasi progetto appropriato.

    • Fai clic sull'icona Crea e segui le istruzioni sullo schermo per creare un nuovo bucket. Dopo aver creato il bucket, selezionalo per l'argomento sull'importazione da Cloud Storage.

  8. Quando specifichi il bucket, Pub/Sub controlla se sono presenti le autorizzazioni appropriate per l'account di servizio Pub/Sub. In caso di problemi di autorizzazione, viene visualizzato un messaggio simile al seguente:

    Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions.

    Se si verificano problemi di autorizzazione, fai clic su Imposta autorizzazioni. Per ulteriori informazioni, consulta Concedere le autorizzazioni Cloud Storage all'account di servizio Pub/Sub.

  9. Per Formato oggetto, seleziona Testo, Avro o Pub/Sub Avro.

    Se selezioni Testo, facoltativamente puoi specificare un Delimitatore con cui suddividere gli oggetti in messaggi.

    Per ulteriori informazioni su queste opzioni, vedi Formato di input.

  10. Facoltativo. Puoi specificare un tempo minimo di creazione dell'oggetto per il tuo argomento. Se impostato, vengono importati solo gli oggetti creati dopo il tempo minimo di creazione dell'oggetto.

    Per ulteriori informazioni, consulta Tempo minimo di creazione dell'oggetto.

  11. Devi specificare un pattern glob. Per importare tutti gli oggetti nel bucket, utilizza ** come pattern glob. Se impostato, vengono importati solo gli oggetti che corrispondono al pattern specificato.

    Per ulteriori informazioni, consulta Eseguire la corrispondenza a un pattern glob.

  12. Mantieni le altre impostazioni predefinite.
  13. Fai clic su Crea argomento.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Esegui il comando gcloud pubsub topics create:

    gcloud pubsub topics create TOPIC_ID\
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Nel comando sono obbligatori solo TOPIC_ID, il --cloud-storage-ingestion-bucket e il --cloud-storage-ingestion-input-format. I flag rimanenti sono facoltativi e possono essere omessi.

    Sostituisci quanto segue:

    • TOPIC_ID: il nome o l'ID dell'argomento.

    • BUCKET_NAME: specifica il nome di un bucket esistente. Ad esempio, prod_bucket. Il nome del bucket non deve includere l'ID progetto. Per creare un bucket, consulta la sezione Creare bucket.

    • INPUT_FORMAT: specifica il formato degli oggetti importati. Può essere text, avro o pubsub_avro. Per ulteriori informazioni su queste opzioni, vedi Formato di input.

    • TEXT_DELIMITER: specifica il delimitatore con cui suddividere gli oggetti di testo in messaggi Pub/Sub. Deve essere costituito da un solo carattere e deve essere impostato solo quando INPUT_FORMAT è text. Il valore predefinito è il carattere di nuova riga (\n).

      Quando utilizzi gcloud CLI per specificare il delimitatore, presta molta attenzione alla gestione dei caratteri speciali come il nuovo riga \n. Utilizza il formato '\n' per assicurarti che il delimitatore venga interpretato correttamente. Se utilizzi semplicemente \n senza virgolette o sfugge, il delimitatore sarà "n".

    • MINIMUM_OBJECT_CREATE_TIME: specifica il tempo minimo in cui è stato creato un oggetto affinché possa essere importato. Deve essere in UTC nel formato YYYY-MM-DDThh:mm:ssZ. Ad esempio: 2024-10-14T08:30:30Z.

      È valida qualsiasi data, passata o futura, compresa tra 0001-01-01T00:00:00Z e 9999-12-31T23:59:59Z.

    • MATCH_GLOB: specifica il pattern glob da associare per consentire l'importazione di un oggetto. Quando utilizzi gcloud CLI, un pattern di corrispondenza con caratteri * deve avere il carattere * formattato come carattere di escape nel formato \*\*.txt o l'intero pattern di corrispondenza deve essere tra virgolette "**.txt" o '**.txt'. Per informazioni sulla sintassi supportata per i pattern glob, consulta la documentazione di Cloud Storage.

Go

Prima di provare questo esempio, segui le istruzioni di configurazione di Go riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Go.

Per autenticarti a Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// bucket := "my-bucket"
	// matchGlob := "**.txt"
	// minimumObjectCreateTime := "2006-01-02T15:04:05Z"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	minCreateTime, err := time.Parse(time.RFC3339, minimumObjectCreateTime)
	if err != nil {
		return err
	}

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceCloudStorage{
				Bucket: bucket,
				// Alternatively, can be Avro or PubSubAvro formats. See
				InputFormat: &pubsub.IngestionDataSourceCloudStorageTextFormat{
					Delimiter: ",",
				},
				MatchGlob:               matchGlob,
				MinimumObjectCreateTime: minCreateTime,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Cloud storage topic created: %v\n", t)
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Java.

Per autenticarti a Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.text.ParseException;

public class CreateTopicWithCloudStorageIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Cloud Storage ingestion settings.
    // bucket and inputFormat are required arguments.
    String bucket = "your-bucket";
    String inputFormat = "text";
    String textDelimiter = "\n";
    String matchGlob = "**.txt";
    String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ";

    createTopicWithCloudStorageIngestionExample(
        projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
  }

  public static void createTopicWithCloudStorageIngestionExample(
      String projectId,
      String topicId,
      String bucket,
      String inputFormat,
      String textDelimiter,
      String matchGlob,
      String minimumObjectCreateTime)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder =
          IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket);
      switch (inputFormat) {
        case "text":
          cloudStorageBuilder.setTextFormat(
              IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder()
                  .setDelimiter(textDelimiter)
                  .build());
          break;
        case "avro":
          cloudStorageBuilder.setAvroFormat(
              IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance());
          break;
        case "pubsub_avro":
          cloudStorageBuilder.setPubsubAvroFormat(
              IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance());
          break;
        default:
          throw new IllegalArgumentException(
              "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat);
      }

      if (matchGlob != null && !matchGlob.isEmpty()) {
        cloudStorageBuilder.setMatchGlob(matchGlob);
      }

      if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) {
        try {
          cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime));
        } catch (ParseException e) {
          System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime);
        }
      }

      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder()
              .setCloudStorage(cloudStorageBuilder.build())
              .build();

      TopicName topicName = TopicName.of(projectId, topicId);

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

Per autenticarti a Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId,
  bucket,
  inputFormat,
  textDelimiter,
  matchGlob,
  minimumObjectCreateTime
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Python.

Per autenticarti a Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

from google.cloud import pubsub_v1
from google.protobuf import timestamp_pb2
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bucket = "your-bucket"
# input_format = "text"  (can be one of "text", "avro", "pubsub_avro")
# text_delimiter = "\n"
# match_glob = "**.txt"
# minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
    bucket=bucket,
)
if input_format == "text":
    cloud_storage_settings.text_format = (
        IngestionDataSourceSettings.CloudStorage.TextFormat(
            delimiter=text_delimiter
        )
    )
elif input_format == "avro":
    cloud_storage_settings.avro_format = (
        IngestionDataSourceSettings.CloudStorage.AvroFormat()
    )
elif input_format == "pubsub_avro":
    cloud_storage_settings.pubsub_avro_format = (
        IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
    )
else:
    print(
        "Invalid input_format: "
        + input_format
        + "; must be in ('text', 'avro', 'pubsub_avro')"
    )
    return

if match_glob:
    cloud_storage_settings.match_glob = match_glob

if minimum_object_create_time:
    try:
        minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
        minimum_object_create_time_timestamp.FromJsonString(
            minimum_object_create_time
        )
        cloud_storage_settings.minimum_object_create_time = (
            minimum_object_create_time_timestamp
        )
    except ValueError:
        print("Invalid minimum_object_create_time: " + minimum_object_create_time)
        return

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        cloud_storage=cloud_storage_settings,
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")

C++

Prima di provare questo esempio, segui le istruzioni di configurazione di C++ riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C++.

Per autenticarti a Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string bucket, std::string const& input_format,
   std::string text_delimiter, std::string match_glob,
   std::string const& minimum_object_create_time) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto& cloud_storage = *request.mutable_ingestion_data_source_settings()
                             ->mutable_cloud_storage();
  cloud_storage.set_bucket(std::move(bucket));
  if (input_format == "text") {
    cloud_storage.mutable_text_format()->set_delimiter(
        std::move(text_delimiter));
  } else if (input_format == "avro") {
    cloud_storage.mutable_avro_format();
  } else if (input_format == "pubsub_avro") {
    cloud_storage.mutable_pubsub_avro_format();
  } else {
    std::cout << "input_format must be in ('text', 'avro', 'pubsub_avro'); "
                 "got value: "
              << input_format << std::endl;
    return;
  }

  if (!match_glob.empty()) {
    cloud_storage.set_match_glob(std::move(match_glob));
  }

  if (!minimum_object_create_time.empty()) {
    google::protobuf::Timestamp timestamp;
    if (!google::protobuf::util::TimeUtil::FromString(
            minimum_object_create_time,
            cloud_storage.mutable_minimum_object_create_time())) {
      std::cout << "Invalid minimum object create time: "
                << minimum_object_create_time << std::endl;
    }
  }

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

Per autenticarti a Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId: string,
  bucket: string,
  inputFormat: string,
  textDelimiter: string,
  matchGlob: string,
  minimumObjectCreateTime: string
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata: TopicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Se riscontri problemi, consulta la sezione Risolvere i problemi relativi a un argomento di importazione di Cloud Storage.

Modificare un argomento di importazione da Cloud Storage

Puoi modificare un argomento di importazione da Cloud Storage per aggiornarne le proprietà.

Ad esempio, per riavviare l'importazione, puoi modificare il bucket o aggiornare la data di creazione minima dell'oggetto.

Per modificare un argomento di importazione di Cloud Storage:

Console

  1. Nella console Google Cloud, vai alla pagina Topic.

    Vai ad Argomenti

  2. Fai clic sull'argomento relativo all'importazione da Cloud Storage.

  3. Nella pagina dei dettagli dell'argomento, fai clic su Modifica.

  4. Aggiorna i campi che vuoi modificare.

  5. Fai clic su Aggiorna.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Per evitare di perdere le impostazioni per l'argomento di importazione, assicurati di includerle tutte ogni volta che aggiorni l'argomento. Se ometti qualcosa, Pub/Sub reimposta l'impostazione sul valore predefinito originale.

    Esegui il comando gcloud pubsub topics update con tutti i flag menzionati nel seguente esempio:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Sostituisci quanto segue:

    • TOPIC_ID è l'ID o il nome dell'argomento. Questo campo non può essere aggiornato.

    • BUCKET_NAME: specifica il nome di un bucket esistente. Ad esempio, prod_bucket. Il nome del bucket non deve includere l'ID progetto. Per creare un bucket, consulta la sezione Creare bucket.

    • INPUT_FORMAT: specifica il formato degli oggetti importati. Può essere text, avro o pubsub_avro. Per ulteriori informazioni su queste opzioni, consulta Formato di input.

    • TEXT_DELIMITER: specifica il delimitatore con cui suddividere gli oggetti di testo in messaggi Pub/Sub. Deve essere costituito da un solo carattere e deve essere impostato solo quando INPUT_FORMAT è text. Il valore predefinito è il carattere di nuova riga (\n).

      Quando utilizzi gcloud CLI per specificare il delimitatore, presta molta attenzione alla gestione dei caratteri speciali come il nuovo riga \n. Utilizza il formato '\n' per assicurarti che il delimitatore venga interpretato correttamente. Se utilizzi semplicemente \n senza virgolette o sfugge, il delimitatore sarà "n".

    • MINIMUM_OBJECT_CREATE_TIME: specifica il tempo minimo in cui è stato creato un oggetto affinché possa essere importato. Deve essere in UTC nel formato YYYY-MM-DDThh:mm:ssZ. Ad esempio: 2024-10-14T08:30:30Z.

      È valida qualsiasi data, passata o futura, compresa tra 0001-01-01T00:00:00Z e 9999-12-31T23:59:59Z.

    • MATCH_GLOB: specifica il pattern glob da associare per consentire l'importazione di un oggetto. Quando utilizzi gcloud CLI, un pattern di corrispondenza con caratteri * deve avere il carattere * formattato come carattere di escape nel formato \*\*.txt o l'intero pattern di corrispondenza deve essere tra virgolette "**.txt" o '**.txt'. Per informazioni sulla sintassi supportata per i pattern glob, consulta la documentazione di Cloud Storage.

Quote e limiti per gli argomenti di importazione di Cloud Storage

Il throughput del publisher per gli argomenti di importazione è limitato dalla quota di pubblicazione dell'argomento. Per ulteriori informazioni, consulta Quote e limiti di Pub/Sub.

Passaggi successivi