Cambia el tipo de tema

Puedes convertir un tema de importación en uno estándar o viceversa.

Cómo convertir un tema de importación en un tema estándar

Para convertir un tema de importación en un tema estándar, borra la configuración de transferencia. Sigue los siguientes pasos:

Console

  1. En la consola de Google Cloud , ve a la página Temas.

    Ir a temas

  2. Haz clic en el tema de importación.

  3. En la página de detalles del tema, haz clic en Editar.

  4. Desmarca la opción Habilitar la transferencia.

  5. Haz clic en Actualizar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud pubsub topics update:

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    Reemplaza TOPIC_ID por el ID del tema.

Cómo convertir un tema estándar en un tema de importación de Amazon Kinesis Data Streams

Para convertir un tema estándar en un tema de importación de Amazon Kinesis Data Streams, primero verifica que cumplas con todos los requisitos previos.

Console

  1. En la consola de Google Cloud , ve a la página Temas.

    Ir a temas

  2. Haz clic en el tema que deseas convertir en un tema de importación.

  3. En la página de detalles del tema, haz clic en Editar.

  4. Selecciona la opción Habilitar la transferencia.

  5. En la fuente de transferencia, selecciona Amazon Kinesis Data Streams.

  6. Ingresa los siguientes detalles:

    • ARN de transmisión de Kinesis: Es el ARN de la transmisión de datos de Kinesis que planeas transferir a Pub/Sub. El formato del ARN es el siguiente: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • ARN del consumidor de Kinesis: Es el ARN del recurso del consumidor que está registrado en el flujo de datos de AWS Kinesis. El formato del ARN es el siguiente: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • ARN de rol de AWS: Es el ARN del rol de AWS. El formato del ARN del rol es el siguiente: arn:aws:iam::${Account}:role/${RoleName}.

    • Cuenta de servicio: La cuenta de servicio que creaste en Crea una cuenta de servicio en Google Cloud.

  7. Haz clic en Actualizar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud pubsub topics update con todas las marcas mencionadas en el siguiente ejemplo:

    gcloud pubsub topics update TOPIC_ID \
         --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
         --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
         --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
         --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
      

    Reemplaza lo siguiente:

    • TOPIC_ID es el ID o el nombre del tema. Este campo no se puede actualizar.

    • KINESIS_STREAM_ARN es el ARN de Kinesis Data Streams que planeas transferir a Pub/Sub. El formato del ARN es el siguiente: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN es el ARN del recurso de consumidor que está registrado en AWS Kinesis Data Streams. El formato del ARN es el siguiente: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}.

    • KINESIS_ROLE_ARN es el ARN del rol de AWS. El formato del ARN del rol es el siguiente: arn:aws:iam::${Account}:role/${RoleName}.

    • PUBSUB_SERVICE_ACCOUNT es la cuenta de servicio que creaste en Crea una cuenta de servicio en Google Cloud.

Go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido de Pub/Sub sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/protobuf/types/known/fieldmaskpb"
)

func updateTopicType(w io.Writer, projectID, topic string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	pbTopic := &pubsubpb.Topic{
		Name: topic,
		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
			Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{
				AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{
					StreamArn:         streamARN,
					ConsumerArn:       consumerARN,
					AwsRoleArn:        awsRoleARN,
					GcpServiceAccount: gcpServiceAccount,
				},
			},
		},
	}
	updateReq := &pubsubpb.UpdateTopicRequest{
		Topic: pbTopic,
		UpdateMask: &fieldmaskpb.FieldMask{
			Paths: []string{"ingestion_data_source_settings"},
		},
	}
	topicCfg, err := client.TopicAdminClient.UpdateTopic(ctx, updateReq)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido de Pub/Sub sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido de Pub/Sub sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn,
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido de Pub/Sub sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

Antes de probar esta muestra, sigue las instrucciones de configuración de C++ en la guía de inicio rápido de Pub/Sub sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C++.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string,
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Para obtener más información sobre los ARN, consulta Amazon Resource Names (ARN) y Identificadores de IAM.

Convierte un tema estándar en un tema de importación de Cloud Storage

Para convertir un tema estándar en un tema de importación de Cloud Storage, primero verifica que cumplas con todos los requisitos previos.

Console

  1. En la consola de Google Cloud , ve a la página Temas.

    Ir a temas

  2. Haz clic en el tema que deseas convertir en un tema de importación de Cloud Storage.

  3. En la página de detalles del tema, haz clic en Editar.

  4. Selecciona la opción Habilitar la transferencia.

  5. En la fuente de transferencia, selecciona Google Cloud Storage.

  6. En el bucket de Cloud Storage, haz clic en Explorar.

    Se abrirá la página Seleccionar bucket. Selecciona una de las siguientes opciones:

    • Selecciona un bucket existente de cualquier proyecto adecuado.

    • Haz clic en el ícono de crear y sigue las instrucciones en pantalla para crear un bucket nuevo. Después de crear el bucket, selecciónalo para el tema de importación de Cloud Storage.

  7. Cuando especificas el bucket, Pub/Sub verifica los permisos adecuados en el bucket para la cuenta de servicio de Pub/Sub. Si hay problemas de permisos, verás un mensaje de error relacionado con los permisos.

    Si tienes problemas con los permisos, haz clic en Establecer permisos. Para obtener más información, consulta Otorga permisos de Cloud Storage a la cuenta de servicio de Pub/Sub.

  8. En Formato del objeto, selecciona Texto, Avro o Avro de Pub/Sub.

    Si seleccionas Texto, puedes especificar de forma opcional un Delimitador con el que dividir los objetos en mensajes.

    Para obtener más información sobre estas opciones, consulta Formato de entrada.

  9. Opcional. Puedes especificar un tiempo mínimo de creación del objeto para tu tema. Si se configura, solo se ingerirán los objetos creados después del tiempo mínimo de creación del objeto.

    Para obtener más información, consulta Tiempo mínimo de creación del objeto.

  10. Debes especificar un patrón glob. Para transferir todos los objetos del bucket, usa ** como patrón glob. Solo se transfieren los objetos que coinciden con el patrón determinado.

    Para obtener más información, consulta Cómo hacer coincidir un patrón de comodín.

  11. Mantén la configuración predeterminada.
  12. Haz clic en Actualizar tema.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Para no perder la configuración del tema de importación, asegúrate de incluirla toda cada vez que actualices el tema. Si omites algo, Pub/Sub restablece el parámetro de configuración a su valor predeterminado original.

    Ejecuta el comando gcloud pubsub topics update con todas las marcas mencionadas en el siguiente ejemplo:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Reemplaza lo siguiente:

    • TOPIC_ID es el ID o el nombre del tema. Este campo no se puede actualizar.

    • BUCKET_NAME: Especifica el nombre de un bucket existente. Por ejemplo, prod_bucket El nombre del bucket no debe incluir el ID del proyecto. Para crear un bucket, consulta Crea buckets.

    • INPUT_FORMAT: Especifica el formato de los objetos que se transfieren. Puede ser text, avro o pubsub_avro. Para obtener más información sobre estas opciones, consulta Formato de entrada.

    • TEXT_DELIMITER: Especifica el delimitador con el que se dividirán los objetos de texto en mensajes de Pub/Sub. Debe ser un solo carácter y solo se debe establecer cuando INPUT_FORMAT es text. El valor predeterminado es el carácter de salto de línea (\n).

      Cuando uses gcloud CLI para especificar el delimitador, presta mucha atención al manejo de caracteres especiales, como el salto de línea \n. Usa el formato '\n' para asegurarte de que el delimitador se interprete correctamente. Si solo usas \n sin comillas ni caracteres de escape, se generará un delimitador de "n".

    • MINIMUM_OBJECT_CREATE_TIME: Especifica la fecha y hora mínimas en las que se creó un objeto para que se pueda transferir. Debe estar en UTC y tener el formato YYYY-MM-DDThh:mm:ssZ. Por ejemplo, 2024-10-14T08:30:30Z.

      Cualquier fecha, pasada o futura, desde el 0001-01-01T00:00:00Z hasta el 9999-12-31T23:59:59Z inclusive es válida.

    • MATCH_GLOB: Especifica el patrón glob que debe coincidir para que se transfiera un objeto. Cuando usas gcloud CLI, un comodín de coincidencia con caracteres * debe tener el carácter * con formato de escape en la forma \*\*.txt o todo el comodín de coincidencia debe estar entre comillas "**.txt" o '**.txt'. Para obtener información sobre la sintaxis admitida en los patrones glob, consulta la documentación de Cloud Storage.