Modifier le type de sujet

Vous pouvez convertir un thème d'importation en thème standard ou inversement.

Convertir un thème importé en thème standard

Pour convertir un thème d'importation en thème standard, effacez les paramètres d'ingestion. Procédez comme suit :

Console

  1. Dans la console Google Cloud , accédez à la page Sujets.

    Accéder aux sujets

  2. Cliquez sur le thème d'importation.

  3. Sur la page des détails de la rubrique, cliquez sur Modifier.

  4. Décochez l'option Activer l'ingestion.

  5. Cliquez sur Mettre à jour.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Exécutez la commande gcloud pubsub topics update :

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    Remplacez TOPIC_ID par l'ID du sujet.

Convertir un sujet standard en sujet d'importation Amazon Kinesis Data Streams

Pour convertir un sujet standard en sujet d'importation Amazon Kinesis Data Streams, vérifiez d'abord que vous remplissez toutes les conditions préalables.

Console

  1. Dans la console Google Cloud , accédez à la page Sujets.

    Accéder aux sujets

  2. Cliquez sur le sujet que vous souhaitez convertir en sujet d'importation.

  3. Sur la page des détails de la rubrique, cliquez sur Modifier.

  4. Sélectionnez l'option Activer l'ingestion.

  5. Pour la source d'ingestion, sélectionnez Amazon Kinesis Data Streams.

  6. Saisissez les informations suivantes :

    • ARN du flux Kinesis : ARN du flux de données Kinesis que vous prévoyez d'ingérer dans Pub/Sub. Le format ARN est le suivant : arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • ARN du consommateur Kinesis : ARN de la ressource consommateur enregistrée dans le flux de données AWS Kinesis. Le format ARN est le suivant : arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • ARN du rôle AWS : ARN du rôle AWS. Le format ARN du rôle est le suivant : arn:aws:iam::${Account}:role/${RoleName}.

    • Compte de service : compte de service que vous avez créé dans Créer un compte de service dans Google Cloud.

  7. Cliquez sur Mettre à jour.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Exécutez la commande gcloud pubsub topics update avec tous les indicateurs mentionnés dans l'exemple suivant :

    gcloud pubsub topics update TOPIC_ID \
         --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
         --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
         --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
         --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
      

    Remplacez les éléments suivants :

    • TOPIC_ID correspond à l'ID ou au nom du sujet. Ce champ ne peut pas être modifié.

    • KINESIS_STREAM_ARN est l'ARN pour les flux de données Kinesis que vous prévoyez d'ingérer dans Pub/Sub. Le format ARN est le suivant : arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN correspond à l'ARN de la ressource consommateur enregistrée dans AWS Kinesis Data Streams. Le format ARN est le suivant : arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN est l'ARN du rôle AWS. Le format ARN du rôle est le suivant : arn:aws:iam::${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT correspond au compte de service que vous avez créé dans Créer un compte de service dans Google Cloud.

Go

Avant d'essayer cet exemple, suivez les instructions de configuration pour Go du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Go.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/protobuf/types/known/fieldmaskpb"
)

func updateTopicType(w io.Writer, projectID, topic string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	pbTopic := &pubsubpb.Topic{
		Name: topic,
		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
			Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{
				AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{
					StreamArn:         streamARN,
					ConsumerArn:       consumerARN,
					AwsRoleArn:        awsRoleARN,
					GcpServiceAccount: gcpServiceAccount,
				},
			},
		},
	}
	updateReq := &pubsubpb.UpdateTopicRequest{
		Topic: pbTopic,
		UpdateMask: &fieldmaskpb.FieldMask{
			Paths: []string{"ingestion_data_source_settings"},
		},
	}
	topicCfg, err := client.TopicAdminClient.UpdateTopic(ctx, updateReq)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions de configuration pour Java du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Java.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions de configuration pour Node.js du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn,
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

Avant d'essayer cet exemple, suivez les instructions de configuration pour Python du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Python.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

Avant d'essayer cet exemple, suivez les instructions de configuration pour C++ du guide de démarrage rapide de Pub/Sub : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C++.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Avant d'essayer cet exemple, suivez les instructions de configuration de Node.js décrites dans le Guide de démarrage rapide de Pub/Sub : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string,
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Pour en savoir plus sur les ARN, consultez Noms de ressources Amazon (ARN) et Identifiants IAM.

Convertir un thème standard en thème d'importation Cloud Storage

Pour convertir un sujet standard en sujet d'importation Cloud Storage, vérifiez d'abord que vous remplissez tous les prerequisites.

Console

  1. Dans la console Google Cloud , accédez à la page Sujets.

    Accéder aux sujets

  2. Cliquez sur le thème que vous souhaitez convertir en thème d'importation Cloud Storage.

  3. Sur la page des détails de la rubrique, cliquez sur Modifier.

  4. Sélectionnez l'option Activer l'ingestion.

  5. Pour la source d'ingestion, sélectionnez Google Cloud Storage.

  6. Pour le bucket Cloud Storage, cliquez sur Parcourir.

    La page Sélectionner un bucket s'ouvre. Sélectionnez l'une des options suivantes :

    • Sélectionnez un bucket existant dans un projet approprié.

    • Cliquez sur l'icône de création et suivez les instructions à l'écran pour créer un bucket. Après avoir créé le bucket, sélectionnez-le pour le thème d'importation Cloud Storage.

  7. Lorsque vous spécifiez le bucket, Pub/Sub vérifie que le compte de service Pub/Sub dispose des autorisations appropriées sur le bucket. En cas de problème d'autorisation, un message d'erreur s'affiche.

    Si vous rencontrez des problèmes d'autorisation, cliquez sur Définir les autorisations. Pour en savoir plus, consultez Accorder des autorisations Cloud Storage au compte de service Pub/Sub.

  8. Pour Format de l'objet, sélectionnez Texte, Avro ou Avro Pub/Sub.

    Si vous sélectionnez Texte, vous pouvez éventuellement spécifier un délimiteur pour diviser les objets en messages.

    Pour en savoir plus sur ces options, consultez Format d'entrée.

  9. Facultatif. Vous pouvez spécifier une durée minimale pour créer l'objet pour votre sujet. Si cette option est définie, seuls les objets créés après la durée minimale de création d'objet sont ingérés.

    Pour en savoir plus, consultez Durée minimale pour créer l'objet.

  10. Vous devez spécifier un modèle Glob. Pour ingérer tous les objets du bucket, utilisez ** comme modèle glob. Seuls les objets correspondant au modèle donné sont ingérés.

    Pour en savoir plus, consultez Faire correspondre un modèle glob.

  11. Conservez les autres paramètres par défaut.
  12. Cliquez sur Modifier le thème.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Pour éviter de perdre vos paramètres pour le thème d'importation, assurez-vous de tous les inclure chaque fois que vous mettez à jour le thème. Si vous oubliez de spécifier une valeur, Pub/Sub rétablit la valeur par défaut d'origine.

    Exécutez la commande gcloud pubsub topics update avec tous les indicateurs mentionnés dans l'exemple suivant :

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Remplacez les éléments suivants :

    • TOPIC_ID correspond à l'ID ou au nom du sujet. Ce champ ne peut pas être modifié.

    • BUCKET_NAME : spécifie le nom d'un bucket existant. Exemple :prod_bucket Le nom du bucket ne doit pas inclure l'ID du projet. Pour créer un bucket, consultez Créer des buckets.

    • INPUT_FORMAT : spécifie le format des objets ingérés. Il peut s'agir de text, avro ou pubsub_avro. Pour en savoir plus sur ces options, consultez Format d'entrée.

    • TEXT_DELIMITER : spécifie le délimiteur avec lequel diviser les objets texte en messages Pub/Sub. Il doit s'agir d'un seul caractère et il ne doit être défini que lorsque INPUT_FORMAT est text. Sa valeur par défaut est le caractère de nouvelle ligne (\n).

      Lorsque vous utilisez gcloud CLI pour spécifier le délimiteur, faites très attention à la gestion des caractères spéciaux tels que le caractère de nouvelle ligne \n. Utilisez le format '\n' pour vous assurer que le délimiteur est correctement interprété. Si vous utilisez simplement \n sans guillemets ni caractères d'échappement, le délimiteur sera "n".

    • MINIMUM_OBJECT_CREATE_TIME : spécifie la durée minimale de création d'un objet pour qu'il puisse être ingéré. Il doit être au format UTC YYYY-MM-DDThh:mm:ssZ. Par exemple, 2024-10-14T08:30:30Z.

      Toute date passée ou future comprise entre le 0001-01-01T00:00:00Z et le 9999-12-31T23:59:59Z inclus est valide.

    • MATCH_GLOB : spécifie le modèle glob à faire correspondre pour qu'un objet soit ingéré. Lorsque vous utilisez gcloud CLI, un glob de correspondance avec des caractères * doit avoir le caractère * au format échappé \*\*.txt, ou l'ensemble du glob de correspondance doit être entre guillemets "**.txt" ou '**.txt'. Pour en savoir plus sur les syntaxes acceptées pour les modèles glob, consultez la documentation Cloud Storage.