Créer un sujet d'importation Cloud Storage

Un sujet d'importation Cloud Storage vous permet d'ingérer en continu des données Cloud Storage dans Pub/Sub. Vous pouvez ensuite diffuser les données dans l'une des destinations compatibles avec Pub/Sub. Pub/Sub détecte automatiquement les nouveaux objets ajoutés au bucket Cloud Storage et les ingère.

Cloud Storage est un service permettant de stocker vos objets dans Google Cloud. Un objet est une donnée immuable constituée d'un fichier dans n'importe quel format. Vous stockez des objets dans des conteneurs appelés buckets. Les buckets peuvent également contenir des dossiers gérés, que vous utilisez pour fournir un accès étendu à des groupes d'objets partageant un préfixe de nom.

Pour en savoir plus sur Cloud Storage, consultez la documentation Cloud Storage.

Pour en savoir plus sur les sujets d'importation, consultez la page À propos des sujets d'importation.

Avant de commencer

Rôles et autorisations requis pour gérer les sujets d'importation Cloud Storage

Pour obtenir les autorisations nécessaires pour créer et gérer un sujet d'importation Cloud Storage, demandez à votre administrateur de vous accorder le rôle IAM Éditeur Pub/Sub (roles/pubsub.editor) sur votre sujet ou votre projet. Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.

Ce rôle prédéfini contient les autorisations requises pour créer et gérer un sujet d'importation Cloud Storage. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Vous devez disposer des autorisations suivantes pour créer et gérer un sujet d'importation Cloud Storage:

  • Créez un sujet d'importation : pubsub.topics.create
  • Supprimer un thème d'importation : pubsub.topics.delete
  • Obtenir un sujet d'importation : pubsub.topics.get
  • Lister un sujet d'importation : pubsub.topics.list
  • Publier dans un sujet d'importation : pubsub.topics.publish
  • Mettre à jour un sujet d'importation : pubsub.topics.update
  • Obtenez la stratégie IAM pour un sujet d'importation : pubsub.topics.getIamPolicy
  • Configurez la stratégie IAM pour un sujet d'importation : pubsub.topics.setIamPolicy

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Vous pouvez configurer le contrôle des accès au niveau du projet et au niveau de ressources individuelles.

La règle de stockage des messages est conforme à l'emplacement du bucket

La règle de stockage des messages du sujet Pub/Sub doit chevaucher les régions où se trouve votre bucket Cloud Storage. Cette règle indique où Pub/Sub est autorisé à stocker vos données de message.

  • Pour les buckets dont le type d'emplacement est "région" : la stratégie doit inclure cette région spécifique. Par exemple, si votre bucket se trouve dans la région us-central1, la règle de stockage des messages doit également inclure us-central1.

  • Pour les buckets dont le type d'emplacement est birégional ou multirégional: la règle doit inclure au moins une région dans l'emplacement birégional ou multirégional. Par exemple, si votre bucket se trouve dans la région US multi-region, la règle de stockage des messages peut inclure us-central1, us-east1 ou toute autre région de la US multi-region.

    Si la règle n'inclut pas la région du bucket, la création du sujet échoue. Par exemple, si votre bucket se trouve dans europe-west1 et que votre stratégie de stockage des messages n'inclut que asia-east1, un message d'erreur s'affiche.

    Si la règle de stockage des messages n'inclut qu'une seule région qui chevauche l'emplacement du bucket, la redondance multirégionale peut être compromise. En effet, si cette seule région devient indisponible, vos données risquent de ne pas être accessibles. Pour garantir une redondance totale, nous vous recommandons d'inclure au moins deux régions dans la règle de stockage des messages qui font partie de l'emplacement multirégional ou birégional du bucket.

Pour en savoir plus sur les emplacements des buckets, consultez la documentation.

Ajouter le rôle Éditeur Pub/Sub au compte de service Pub/Sub

Vous devez attribuer le rôle "Éditeur Pub/Sub" au compte de service Pub/Sub pour que Pub/Sub puisse publier des messages dans le sujet d'importation Cloud Storage.

Activer la publication sur tous les sujets d'importation Cloud Storage

Choisissez cette option lorsque vous ne disposez pas d'un sujet d'importation Cloud Storage dans votre projet.

  1. Dans la console Google Cloud, accédez à la page IAM.

    Accéder à IAM

  2. Activez l'option Inclure les attributions de rôles fournies par Google.

  3. Recherchez le compte de service Pub/Sub au format suivant:

    service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com

  4. Pour ce compte de service, cliquez sur le bouton Modifier le compte principal.

  5. Si nécessaire, cliquez sur Ajouter un autre rôle.

  6. Recherchez et sélectionnez le rôle Éditeur Pub/Sub (roles/pubsub.publisher).

  7. Cliquez sur Enregistrer.

Activer la publication sur un seul sujet d'importation Cloud Storage

Si vous souhaitez autoriser Pub/Sub à publier dans un sujet d'importation Cloud Storage spécifique qui existe déjà, procédez comme suit:

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Exécutez la commande gcloud pubsub topics add-iam-policy-binding :

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    Remplacez les éléments suivants :

    • TOPIC_ID est l'ID ou le nom du sujet d'importation Cloud Storage.

    • PROJECT_NUMBER est le numéro du projet. Pour afficher le numéro de projet, consultez Identifier des projets.

Attribuer des rôles Cloud Storage au compte de service Pub/Sub

Pour créer un sujet d'importation Cloud Storage, le compte de service Pub/Sub doit être autorisé à lire le bucket Cloud Storage spécifique. Les autorisations suivantes sont requises :

  • storage.objects.list
  • storage.objects.get
  • storage.buckets.get

Pour attribuer ces autorisations au compte de service Pub/Sub, choisissez l'une des procédures suivantes:

  • Accordez des autorisations au niveau du bucket. Sur le bucket Cloud Storage spécifique, attribuez les rôles "Lecteur des objets Storage anciens" (roles/storage.legacyObjectReader) et "Lecteur des anciens buckets Storage" (roles/storage.legacyBucketReader) au compte de service Pub/Sub.

  • Si vous devez attribuer des rôles au niveau du projet, vous pouvez attribuer le rôle "Administrateur de l'espace de stockage" (roles/storage.admin) au projet contenant le bucket Cloud Storage. Attribuez ce rôle au compte de service Pub/Sub.

Autorisations relatives aux buckets

Suivez les étapes ci-dessous pour accorder les rôles Lecteur des objets de l'espace de stockage (roles/storage.legacyObjectReader) et Lecteur des anciens buckets de l'espace de stockage (roles/storage.legacyBucketReader) au compte de service Pub/Sub au niveau du bucket:

  1. Dans la console Google Cloud, accédez à la page Cloud Storage.

    Accéder à Cloud Storage

  2. Cliquez sur le bucket Cloud Storage à partir duquel vous souhaitez lire les messages et les importer dans le sujet d'importation Cloud Storage.

    La page Informations sur le bucket s'ouvre.

  3. Sur la page Informations sur le bucket, cliquez sur l'onglet Autorisations.

  4. Dans l'onglet Autorisations > Afficher par compte principal, cliquez sur Accorder l'accès.

    La page Accorder l'accès s'affiche.

  5. Dans la section Ajouter des comptes principaux, saisissez le nom de votre compte de service Pub/Sub.

    Le format du compte de service est service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. Par exemple, pour un projet avec PROJECT_NUMBER=112233445566, le compte de service a le format service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com.

  6. Dans le menu déroulant Attribuer des rôles > Sélectionner un rôle, saisissez Object Reader, puis sélectionnez le rôle Storage Legacy Object Reader (Lecteur d'objets anciens Storage).

  7. Cliquez sur Ajouter un autre rôle.

  8. Dans la liste déroulante Sélectionner un rôle, saisissez Bucket Reader, puis sélectionnez le rôle Storage Legacy Bucket Reader (Lecteur de bucket ancien Storage).

  9. Cliquez sur Enregistrer.

Autorisations liées au projet

Pour attribuer le rôle Administrateur de l'espace de stockage (roles/storage.admin) au niveau du projet, procédez comme suit:

  1. Dans la console Google Cloud, accédez à la page IAM.

    Accéder à IAM

  2. Dans l'onglet Autorisations > Afficher par compte principal, cliquez sur Accorder l'accès.

    La page Accorder l'accès s'affiche.

  3. Dans la section Ajouter des comptes principaux, saisissez le nom de votre compte de service Pub/Sub.

    Le format du compte de service est service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. Par exemple, pour un projet avec PROJECT_NUMBER=112233445566, le compte de service a le format service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. Dans le menu déroulant Attribuer des rôles > Sélectionner un rôle, saisissez Storage Admin, puis sélectionnez le rôle Storage Admin (Administrateur de l'espace de stockage).

  5. Cliquez sur Enregistrer.

Pour en savoir plus sur l'IAM Cloud Storage, consultez la page Gestion de l'authentification et des accès Cloud Storage.

Propriétés des sujets d'importation Cloud Storage

Pour en savoir plus sur les propriétés communes à tous les sujets, consultez la section Propriétés d'un sujet.

Nom du bucket

Il s'agit du nom du bucket Cloud Storage à partir duquel Pub/Sub lit les données publiées dans un sujet d'importation Cloud Storage.

Format d'entrée

Lorsque vous créez un sujet d'importation Cloud Storage, vous pouvez spécifier le format des objets à ingèrer sous la forme Texte, Avro ou Avro Pub/Sub.

  • Texte Les objets sont censés contenir des données en texte brut. Ce format d'entrée tente d'ingérer tous les objets du bucket tant qu'ils répondent à la durée minimale de création d'objet et correspondent aux critères de modèle glob.

    Séparateur. Vous pouvez également spécifier un délimiteur par lequel les objets sont divisés en messages. Si elle n'est pas définie, le caractère de retour à la ligne (\n) est utilisé par défaut. Le délimiteur ne doit comporter qu'un seul caractère.

  • Avro Les objets sont au format binaire Apache Avro. Aucun objet qui n'est pas au format Apache Avro valide n'est ingéré. Voici les limites concernant Avro:

    • Les versions Avro 1.1.0 et 1.2.0 ne sont pas compatibles.
    • La taille maximale d'un bloc Avro est de 16 Mo.
  • Pub/Sub Avro. Les objets sont au format binaire Apache Avro avec un schéma correspondant à celui d'un objet écrit dans Cloud Storage à l'aide d'un abonnement Cloud Storage Pub/Sub avec le format de fichier Avro. Voici quelques consignes importantes pour Pub/Sub Avro:

    • Le champ de données de l'enregistrement Avro est utilisé pour renseigner le champ de données du message Pub/Sub généré.

    • Si l'option write_metadata est spécifiée pour l'abonnement Cloud Storage, toutes les valeurs du champ "attributes" sont renseignées en tant qu'attributs du message Pub/Sub généré.

    • Si une clé de tri est spécifiée dans le message d'origine écrit dans Cloud Storage, ce champ est renseigné en tant qu'attribut avec le nom original_message_ordering_key dans le message Pub/Sub généré.

Durée minimale pour créer l'objet

Vous pouvez éventuellement spécifier une durée minimale pour créer l'objet lorsque vous créez un sujet d'importation Cloud Storage. Seuls les objets créés à partir de ce code temporel sont ingérés. Ce code temporel doit être fourni au format YYYY-MM-DDThh:mm:ssZ. Toute date, passée ou future, comprise entre 0001-01-01T00:00:00Z et 9999-12-31T23:59:59Z inclus, est valide.

Correspondre au schéma glob

Vous pouvez éventuellement spécifier un format de correspondance glob lors de la création d'un sujet d'importation Cloud Storage. Seuls les objets dont le nom correspond à ce format sont ingérés. Par exemple, pour ingérer tous les objets avec le suffixe .txt, vous pouvez spécifier le modèle glob comme **.txt.

Pour en savoir plus sur les syntaxes acceptées pour les modèles glob, consultez la documentation Cloud Storage.

Créer un sujet d'importation Cloud Storage

Assurez-vous d'avoir effectué les procédures suivantes:

Créer le sujet et l'abonnement séparément, même si vous le faites rapidement, peut entraîner une perte de données. Il existe une courte période pendant laquelle le sujet existe sans abonnement. Si des données sont envoyées au sujet pendant cette période, elles sont perdues. En créant d'abord le sujet, puis l'abonnement, puis en convertissant le sujet en sujet d'importation, vous vous assurez qu'aucun message n'est manqué lors du processus d'importation.

Pour créer un sujet d'importation Cloud Storage, procédez comme suit:

Console

  1. Dans la console Google Cloud, accédez à la page Topics (Sujets).

    Accéder aux sujets

  2. Cliquez sur Create topic (Créer un sujet).

    La page des détails du sujet s'ouvre.

  3. Dans le champ ID du sujet, saisissez un ID pour votre sujet d'importation Cloud Storage.

    Pour en savoir plus sur l'attribution de noms aux sujets, consultez les consignes d'attribution de noms.

  4. Sélectionnez Ajouter un abonnement par défaut.

  5. Sélectionnez Activer l'ingestion.

  6. Pour la source d'ingestion, sélectionnez Google Cloud Storage.

  7. Pour le bucket Cloud Storage, cliquez sur Parcourir.

    La page Sélectionner un bucket s'ouvre. Sélectionnez l'une des options suivantes :

    • Sélectionnez un bucket existant dans un projet approprié.

    • Cliquez sur l'icône Créer, puis suivez les instructions à l'écran pour créer un bucket. Après avoir créé le bucket, sélectionnez-le pour le sujet d'importation Cloud Storage.

  8. Lorsque vous spécifiez le bucket, Pub/Sub recherche les autorisations appropriées sur le bucket pour le compte de service Pub/Sub. En cas de problème d'autorisation, un message semblable à celui-ci s'affiche:

    Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions.

    Si vous rencontrez des problèmes d'autorisation, cliquez sur Définir les autorisations. Pour en savoir plus, consultez la section Accorder des autorisations Cloud Storage au compte de service Pub/Sub.

  9. Pour Format d'objet, sélectionnez Texte, Avro ou Pub/Sub Avro.

    Si vous sélectionnez Texte, vous pouvez éventuellement spécifier un délimiteur pour diviser les objets en messages.

    Pour en savoir plus sur ces options, consultez la section Format d'entrée.

  10. Facultatif. Vous pouvez spécifier une durée minimale pour créer l'objet pour votre sujet. Si elle est définie, seuls les objets créés après la durée minimale de création d'objets sont ingérés.

    Pour en savoir plus, consultez la section Durée minimale pour créer l'objet.

  11. Vous devez spécifier un modèle glob. Pour ingérer tous les objets du bucket, utilisez ** comme modèle glob. Si ce champ est défini, seuls les objets correspondant au modèle donné sont ingérés.

    Pour en savoir plus, consultez la section Correspondre à un format générique.

  12. Conservez les autres paramètres par défaut.
  13. Cliquez sur Create topic (Créer un sujet).

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Exécutez la commande gcloud pubsub topics create:

    gcloud pubsub topics create TOPIC_ID\
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Dans la commande, seuls TOPIC_ID, l'option --cloud-storage-ingestion-bucket et l'option --cloud-storage-ingestion-input-format sont obligatoires. Les options restantes sont facultatives et peuvent être omises.

    Remplacez les éléments suivants :

    • TOPIC_ID: nom ou ID de votre sujet.

    • BUCKET_NAME: spécifie le nom d'un bucket existant. Exemple :prod_bucket Le nom du bucket ne doit pas inclure l'ID du projet. Pour créer un bucket, consultez la page Créer des buckets.

    • INPUT_FORMAT: spécifie le format des objets ingérés. Il peut être défini sur text, avro ou pubsub_avro. Pour en savoir plus sur ces options, consultez la section Format d'entrée.

    • TEXT_DELIMITER: spécifie le séparateur à utiliser pour diviser les objets texte en messages Pub/Sub. Il doit s'agir d'un seul caractère et ne doit être défini que lorsque INPUT_FORMAT est text. La valeur par défaut est le caractère de nouvelle ligne (\n).

      Lorsque vous utilisez gcloud CLI pour spécifier le délimiteur, soyez particulièrement attentif à la gestion des caractères spéciaux tels que la nouvelle ligne \n. Utilisez le format '\n' pour vous assurer que le séparateur est correctement interprété. L'utilisation simple de \n sans guillemets ni échappement entraîne un séparateur "n".

    • MINIMUM_OBJECT_CREATE_TIME: spécifie le délai minimal au cours duquel un objet a été créé pour qu'il soit ingéré. au format UTC YYYY-MM-DDThh:mm:ssZ. Par exemple, 2024-10-14T08:30:30Z.

      Toute date, passée ou future, de 0001-01-01T00:00:00Z à 9999-12-31T23:59:59Z inclus, est valide.

    • MATCH_GLOB: spécifie le modèle glob à faire correspondre pour qu'un objet soit ingéré. Lorsque vous utilisez gcloud CLI, le caractère * d'un caractère générique de correspondance avec des caractères * doit être formaté en tant qu'expression \*\*.txt ou l'ensemble du caractère générique de correspondance doit être placé entre guillemets "**.txt" ou '**.txt'. Pour en savoir plus sur les syntaxes acceptées pour les modèles glob, consultez la documentation Cloud Storage.

Go

Avant d'essayer cet exemple, suivez les instructions de configuration pour Go du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub Go.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// bucket := "my-bucket"
	// matchGlob := "**.txt"
	// minimumObjectCreateTime := "2006-01-02T15:04:05Z"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	minCreateTime, err := time.Parse(time.RFC3339, minimumObjectCreateTime)
	if err != nil {
		return err
	}

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceCloudStorage{
				Bucket: bucket,
				// Alternatively, can be Avro or PubSubAvro formats. See
				InputFormat: &pubsub.IngestionDataSourceCloudStorageTextFormat{
					Delimiter: ",",
				},
				MatchGlob:               matchGlob,
				MinimumObjectCreateTime: minCreateTime,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Cloud storage topic created: %v\n", t)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions de configuration pour Java du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub Java.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.text.ParseException;

public class CreateTopicWithCloudStorageIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Cloud Storage ingestion settings.
    // bucket and inputFormat are required arguments.
    String bucket = "your-bucket";
    String inputFormat = "text";
    String textDelimiter = "\n";
    String matchGlob = "**.txt";
    String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ";

    createTopicWithCloudStorageIngestionExample(
        projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
  }

  public static void createTopicWithCloudStorageIngestionExample(
      String projectId,
      String topicId,
      String bucket,
      String inputFormat,
      String textDelimiter,
      String matchGlob,
      String minimumObjectCreateTime)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder =
          IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket);
      switch (inputFormat) {
        case "text":
          cloudStorageBuilder.setTextFormat(
              IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder()
                  .setDelimiter(textDelimiter)
                  .build());
          break;
        case "avro":
          cloudStorageBuilder.setAvroFormat(
              IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance());
          break;
        case "pubsub_avro":
          cloudStorageBuilder.setPubsubAvroFormat(
              IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance());
          break;
        default:
          throw new IllegalArgumentException(
              "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat);
      }

      if (matchGlob != null && !matchGlob.isEmpty()) {
        cloudStorageBuilder.setMatchGlob(matchGlob);
      }

      if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) {
        try {
          cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime));
        } catch (ParseException e) {
          System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime);
        }
      }

      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder()
              .setCloudStorage(cloudStorageBuilder.build())
              .build();

      TopicName topicName = TopicName.of(projectId, topicId);

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions de configuration pour Node.js du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub Node.js.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId,
  bucket,
  inputFormat,
  textDelimiter,
  matchGlob,
  minimumObjectCreateTime
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Python

Avant d'essayer cet exemple, suivez les instructions de configuration pour Python du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub Python.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

from google.cloud import pubsub_v1
from google.protobuf import timestamp_pb2
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bucket = "your-bucket"
# input_format = "text"  (can be one of "text", "avro", "pubsub_avro")
# text_delimiter = "\n"
# match_glob = "**.txt"
# minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
    bucket=bucket,
)
if input_format == "text":
    cloud_storage_settings.text_format = (
        IngestionDataSourceSettings.CloudStorage.TextFormat(
            delimiter=text_delimiter
        )
    )
elif input_format == "avro":
    cloud_storage_settings.avro_format = (
        IngestionDataSourceSettings.CloudStorage.AvroFormat()
    )
elif input_format == "pubsub_avro":
    cloud_storage_settings.pubsub_avro_format = (
        IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
    )
else:
    print(
        "Invalid input_format: "
        + input_format
        + "; must be in ('text', 'avro', 'pubsub_avro')"
    )
    return

if match_glob:
    cloud_storage_settings.match_glob = match_glob

if minimum_object_create_time:
    try:
        minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
        minimum_object_create_time_timestamp.FromJsonString(
            minimum_object_create_time
        )
        cloud_storage_settings.minimum_object_create_time = (
            minimum_object_create_time_timestamp
        )
    except ValueError:
        print("Invalid minimum_object_create_time: " + minimum_object_create_time)
        return

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        cloud_storage=cloud_storage_settings,
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")

C++

Avant d'essayer cet exemple, suivez les instructions de configuration pour C++ du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub C++.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string bucket, std::string const& input_format,
   std::string text_delimiter, std::string match_glob,
   std::string const& minimum_object_create_time) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto& cloud_storage = *request.mutable_ingestion_data_source_settings()
                             ->mutable_cloud_storage();
  cloud_storage.set_bucket(std::move(bucket));
  if (input_format == "text") {
    cloud_storage.mutable_text_format()->set_delimiter(
        std::move(text_delimiter));
  } else if (input_format == "avro") {
    cloud_storage.mutable_avro_format();
  } else if (input_format == "pubsub_avro") {
    cloud_storage.mutable_pubsub_avro_format();
  } else {
    std::cout << "input_format must be in ('text', 'avro', 'pubsub_avro'); "
                 "got value: "
              << input_format << std::endl;
    return;
  }

  if (!match_glob.empty()) {
    cloud_storage.set_match_glob(std::move(match_glob));
  }

  if (!minimum_object_create_time.empty()) {
    google::protobuf::Timestamp timestamp;
    if (!google::protobuf::util::TimeUtil::FromString(
            minimum_object_create_time,
            cloud_storage.mutable_minimum_object_create_time())) {
      std::cout << "Invalid minimum object create time: "
                << minimum_object_create_time << std::endl;
    }
  }

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Avant d'essayer cet exemple, suivez les instructions de configuration de Node.js décrites dans le guide de démarrage rapide de Pub/Sub : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId: string,
  bucket: string,
  inputFormat: string,
  textDelimiter: string,
  matchGlob: string,
  minimumObjectCreateTime: string
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata: TopicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Si vous rencontrez des problèmes, consultez Résoudre les problèmes d'importation Cloud Storage.

Modifier un sujet d'importation Cloud Storage

Vous pouvez modifier un sujet d'importation Cloud Storage pour mettre à jour ses propriétés.

Par exemple, pour redémarrer l'ingestion, vous pouvez modifier le bucket ou mettre à jour la durée minimale de création d'objets.

Pour modifier un sujet d'importation Cloud Storage, procédez comme suit:

Console

  1. Dans la console Google Cloud, accédez à la page Topics (Sujets).

    Accéder aux sujets

  2. Cliquez sur le sujet "Importer depuis Cloud Storage".

  3. Sur la page des détails de l'article, cliquez sur Modifier.

  4. Modifiez les champs que vous souhaitez modifier.

  5. Cliquez sur Mettre à jour.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Pour éviter de perdre vos paramètres pour le sujet d'importation, veillez à les inclure tous chaque fois que vous mettez à jour le sujet. Si vous omettez quelque chose, Pub/Sub rétablit le paramètre sur sa valeur par défaut d'origine.

    Exécutez la commande gcloud pubsub topics update avec tous les indicateurs mentionnés dans l'exemple suivant:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Remplacez les éléments suivants :

    • TOPIC_ID correspond à l'ID ou au nom du sujet. Ce champ ne peut pas être modifié.

    • BUCKET_NAME: spécifie le nom d'un bucket existant. Exemple :prod_bucket Le nom du bucket ne doit pas inclure l'ID du projet. Pour créer un bucket, consultez la page Créer des buckets.

    • INPUT_FORMAT: spécifie le format des objets ingérés. Il peut être défini sur text, avro ou pubsub_avro. Pour en savoir plus sur ces options, consultez la section Format d'entrée.

    • TEXT_DELIMITER: spécifie le séparateur à utiliser pour diviser les objets texte en messages Pub/Sub. Il doit s'agir d'un seul caractère et ne doit être défini que lorsque INPUT_FORMAT est text. La valeur par défaut est le caractère de nouvelle ligne (\n).

      Lorsque vous utilisez gcloud CLI pour spécifier le délimiteur, soyez particulièrement attentif à la gestion des caractères spéciaux tels que la nouvelle ligne \n. Utilisez le format '\n' pour vous assurer que le séparateur est correctement interprété. L'utilisation simple de \n sans guillemets ni échappement entraîne un séparateur "n".

    • MINIMUM_OBJECT_CREATE_TIME: spécifie le délai minimal au cours duquel un objet a été créé pour qu'il soit ingéré. au format UTC YYYY-MM-DDThh:mm:ssZ. Par exemple, 2024-10-14T08:30:30Z.

      Toute date, passée ou future, de 0001-01-01T00:00:00Z à 9999-12-31T23:59:59Z inclus, est valide.

    • MATCH_GLOB: spécifie le modèle glob à faire correspondre pour qu'un objet soit ingéré. Lorsque vous utilisez gcloud CLI, le caractère * d'un caractère générique de correspondance avec des caractères * doit être formaté en tant qu'expression \*\*.txt ou l'ensemble du caractère générique de correspondance doit être placé entre guillemets "**.txt" ou '**.txt'. Pour en savoir plus sur les syntaxes acceptées pour les modèles glob, consultez la documentation Cloud Storage.

Quotas et limites pour les sujets d'importation Cloud Storage

Le débit de l'éditeur pour les sujets d'importation est limité par le quota de publication du sujet. Pour en savoir plus, consultez la page Quotas et limites de Pub/Sub.

Étape suivante