Mudar o tipo de tópico

É possível converter um tema de importação em um padrão ou vice-versa.

Converter um tópico de importação em um tópico padrão

Para converter um tema de importação em um tema padrão, limpe as configurações de ingestão. Siga as etapas abaixo:

Console

  1. No console Google Cloud , acesse a página Tópicos.

    Acesse Tópicos

  2. Clique no tópico de importação.

  3. Na página de detalhes do tópico, clique em Editar.

  4. Desmarque a opção Ativar ingestão.

  5. Clique em Atualizar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Execute o comando gcloud pubsub topics update:

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    Substitua TOPIC_ID pelo ID do tópico.

Converter um tópico padrão em um tópico de importação do Amazon Kinesis Data Streams

Para converter um tópico padrão em um tópico de importação do Amazon Kinesis Data Streams, primeiro verifique se você atende a todos os pré-requisitos.

Console

  1. No console Google Cloud , acesse a página Tópicos.

    Acesse Tópicos

  2. Clique no tema que você quer converter em um tema de importação.

  3. Na página de detalhes do tópico, clique em Editar.

  4. Selecione a opção Ativar ingestão.

  5. Em "Origem da ingestão", selecione Amazon Kinesis Data Streams.

  6. Digite os seguintes detalhes:

    • ARN do fluxo do Kinesis: o ARN do fluxo de dados do Kinesis que você planeja ingerir no Pub/Sub. O formato do ARN é o seguinte: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • ARN do consumidor do Kinesis: o ARN do recurso consumidor registrado no AWS Kinesis Data Stream. O formato do ARN é o seguinte: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • ARN do papel da AWS: o ARN do papel da AWS. O formato ARN da função é o seguinte: arn:aws:iam::${Account}:role/${RoleName}.

    • Conta de serviço: a conta de serviço que você criou em Criar uma conta de serviço em Google Cloud.

  7. Clique em Atualizar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Execute o comando gcloud pubsub topics update com todas as flags mencionadas no exemplo a seguir:

    gcloud pubsub topics update TOPIC_ID \
         --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
         --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
         --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
         --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
      

    Substitua:

    • TOPIC_ID é o ID ou o nome do tópico. Não é possível atualizar este campo.

    • KINESIS_STREAM_ARN é o ARN do Kinesis Data Streams que você planeja ingerir no Pub/Sub. O formato do ARN é o seguinte: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN é o ARN do recurso do consumidor registrado no AWS Kinesis Data Streams. O formato do ARN é o seguinte: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN é o ARN da função da AWS. O formato do ARN da função é o seguinte: arn:aws:iam::${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT é a conta de serviço que você criou em Criar uma conta de serviço em Google Cloud.

Go

Antes de testar esta amostra, siga as instruções de configuração do Go no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/protobuf/types/known/fieldmaskpb"
)

func updateTopicType(w io.Writer, projectID, topic string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	pbTopic := &pubsubpb.Topic{
		Name: topic,
		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
			Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{
				AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{
					StreamArn:         streamARN,
					ConsumerArn:       consumerARN,
					AwsRoleArn:        awsRoleARN,
					GcpServiceAccount: gcpServiceAccount,
				},
			},
		},
	}
	updateReq := &pubsubpb.UpdateTopicRequest{
		Topic: pbTopic,
		UpdateMask: &fieldmaskpb.FieldMask{
			Paths: []string{"ingestion_data_source_settings"},
		},
	}
	topicCfg, err := client.TopicAdminClient.UpdateTopic(ctx, updateReq)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

Antes de testar esta amostra, siga as instruções de configuração do Java no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Java.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

Antes de testar esta amostra, siga as instruções de configuração do Node.js no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn,
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

Antes de testar esta amostra, siga as instruções de configuração do Python no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Python.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

Antes de testar esta amostra, siga as instruções de configuração do C++ no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub C++.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Antes de testar esta amostra, siga as instruções de configuração do Node.js no Guia de início rápido do Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string,
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Para mais informações sobre ARNs, consulte Nomes de recursos da Amazon (ARNs) e Identificadores do IAM.

Converter um tópico padrão em um tópico de importação do Cloud Storage

Para converter um tópico padrão em um tópico de importação do Cloud Storage, primeiro verifique se você atende a todos os pré-requisitos.

Console

  1. No console Google Cloud , acesse a página Tópicos.

    Acesse Tópicos

  2. Clique no tema que você quer converter em um tema de importação do Cloud Storage.

  3. Na página de detalhes do tópico, clique em Editar.

  4. Selecione a opção Ativar ingestão.

  5. Em "Origem da ingestão", selecione Google Cloud Storage.

  6. Para o bucket do Cloud Storage, clique em Procurar.

    A página Selecionar bucket é aberta. Selecione uma das seguintes opções:

    • Selecione um bucket de qualquer projeto adequado.

    • Clique no ícone de criação e siga as instruções na tela para criar um bucket. Depois de criar o bucket, selecione-o para o tópico de importação do Cloud Storage.

  7. Quando você especifica o bucket, o Pub/Sub verifica as permissões adequadas no bucket para a conta de serviço do Pub/Sub. Se houver problemas de permissões, uma mensagem de erro relacionada a elas vai aparecer.

    Se você tiver problemas de permissão, clique em Definir permissões. Para mais informações, consulte Conceder permissões do Cloud Storage à conta de serviço do Pub/Sub.

  8. Em Formato do objeto, selecione Texto, Avro ou Avro do Pub/Sub.

    Se você selecionar Texto, poderá especificar um Delimitador para dividir objetos em mensagens.

    Para mais informações sobre essas opções, consulte Formato de entrada.

  9. Opcional. É possível especificar um tempo mínimo de criação de objeto para seu tópico. Se definido, somente os objetos criados após a hora mínima de criação de objeto serão ingeridos.

    Para mais informações, consulte Hora mínima de criação de objeto.

  10. É necessário especificar um padrão glob. Para ingerir todos os objetos no bucket, use ** como o padrão glob. Somente objetos que correspondem ao padrão especificado são ingeridos.

    Para mais informações, consulte Corresponder um padrão glob.

  11. Mantenha as outras configurações padrão.
  12. Clique em Atualizar tópico.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Para não perder as configurações do tópico de importação, inclua todas elas sempre que atualizar o tópico. Se você deixar algo de fora, o Pub/Sub vai redefinir a configuração para o valor padrão original.

    Execute o comando gcloud pubsub topics update com todas as flags mencionadas no exemplo a seguir:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Substitua:

    • TOPIC_ID é o ID ou o nome do tópico. Não é possível atualizar este campo.

    • BUCKET_NAME: especifica o nome de um bucket atual. Por exemplo, prod_bucket. O nome do bucket não pode incluir o ID do projeto. Para criar um bucket, consulte Criar buckets.

    • INPUT_FORMAT: especifica o formato dos objetos ingeridos. Os valores podem ser text, avro ou pubsub_avro. Para mais informações sobre essas opções, consulte Formato de entrada.

    • TEXT_DELIMITER: especifica o delimitador com que dividir objetos de texto em mensagens do Pub/Sub. Precisa ser um único caractere e só pode ser definido quando INPUT_FORMAT for text. O padrão é o caractere de nova linha (\n).

      Ao usar a CLI gcloud para especificar o delimitador, preste atenção ao processamento de caracteres especiais, como a nova linha \n. Use o formato '\n' para garantir que o delimitador seja interpretado corretamente. Usar apenas \n sem aspas ou escape resulta em um delimitador de "n".

    • MINIMUM_OBJECT_CREATE_TIME: especifica o tempo mínimo em que um objeto foi criado para ser ingerido. Ele precisa estar em UTC no formato YYYY-MM-DDThh:mm:ssZ. Por exemplo, 2024-10-14T08:30:30Z.

      Qualquer data, passada ou futura, de 0001-01-01T00:00:00Z a 9999-12-31T23:59:59Z, inclusive, é válida.

    • MATCH_GLOB: especifica o padrão glob a ser correspondido para que um objeto seja ingerido. Ao usar a CLI gcloud, um glob de correspondência com caracteres * precisa ter o caractere * formatado como escape na forma \*\*.txt ou todo o glob de correspondência precisa estar entre aspas "**.txt" ou '**.txt'. Para informações sobre a sintaxe compatível com padrões glob, consulte a documentação do Cloud Storage.