Crea un tema de importación de Cloud Storage

Un tema de importación de Cloud Storage te permite transferir datos de Cloud Storage a Pub/Sub de forma continua. Luego, puedes transmitir los datos a cualquiera de los destinos que admite Pub/Sub. Pub/Sub detecta automáticamente los objetos nuevos que se agregan al bucket de Cloud Storage y los transfiere.

Cloud Storage es un servicio para almacenar tus objetos en Google Cloud. Un objeto es un dato inmutable que consta de un archivo de cualquier formato. Los objetos se almacenan en contenedores llamados buckets. Los buckets también pueden contener carpetas administradas, que usas para proporcionar acceso expandido a grupos de objetos con un prefijo de nombre compartido.

Para obtener más información sobre Cloud Storage, consulta la documentación de Cloud Storage.

Para obtener más información sobre los temas de importación, consulta Acerca de los temas de importación.

Antes de comenzar

Roles y permisos necesarios para administrar temas de importación de Cloud Storage

Para obtener los permisos que necesitas para crear y administrar un tema de importación de Cloud Storage, pídele a tu administrador que te otorgue el rol de IAM de editor de Pub/Sub (roles/pubsub.editor) en tu tema o proyecto. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Este rol predefinido contiene los permisos necesarios para crear y administrar un tema de importación de Cloud Storage. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:

Permisos necesarios

Se requieren los siguientes permisos para crear y administrar un tema de importación de Cloud Storage:

  • Crea un tema de importación: pubsub.topics.create
  • Borrar un tema de importación: pubsub.topics.delete
  • Obtén un tema de importación: pubsub.topics.get
  • Enumera un tema de importación: pubsub.topics.list
  • Publicar en un tema de importación: pubsub.topics.publish
  • Actualiza un tema de importación: pubsub.topics.update
  • Obtén la política de IAM de un tema de importación: pubsub.topics.getIamPolicy
  • Configura la política de IAM para un tema de importación: pubsub.topics.setIamPolicy

También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.

Puedes configurar el control de acceso a nivel del proyecto y a nivel de los recursos individuales.

La política de almacenamiento de mensajes cumple con la ubicación del bucket

La política de almacenamiento de mensajes del tema de Pub/Sub debe superponerse con las regiones donde se encuentra tu bucket de Cloud Storage. Esta política determina dónde puede almacenar Pub/Sub los datos de tus mensajes.

  • Para los buckets con el tipo de ubicación como región, la política debe incluir esa región específica. Por ejemplo, si tu bucket está en la región us-central1, la política de almacenamiento de mensajes también debe incluir us-central1.

  • En el caso de los buckets con el tipo de ubicación de región doble o múltiple, la política debe incluir al menos una región dentro de la ubicación de región doble o múltiple. Por ejemplo, si tu bucket está en US multi-region, la política de almacenamiento de mensajes podría incluir us-central1, us-east1 o cualquier otra región dentro de US multi-region.

    Si la política no incluye la región del bucket, fallará la creación del tema. Por ejemplo, si tu bucket está en europe-west1 y tu política de almacenamiento de mensajes solo incluye asia-east1, recibirás un error.

    Si la política de almacenamiento de mensajes incluye solo una región que se superpone con la ubicación del bucket, es posible que se vea comprometida la redundancia multirregión. Esto se debe a que, si esa única región deja de estar disponible, es posible que no se pueda acceder a tus datos. Para garantizar la redundancia total, se recomienda incluir al menos dos regiones en la política de almacenamiento de mensajes que sean parte de la ubicación multirregional o birregional del bucket.

Para obtener más información sobre las ubicaciones de bucket, consulta la documentación.

Agrega el rol de publicador de Pub/Sub a la cuenta de servicio de Pub/Sub

Debes asignar el rol de publicador de Pub/Sub a la cuenta de servicio de Pub/Sub para que Pub/Sub pueda publicar en el tema de importación de Cloud Storage.

Habilita la publicación en todos los temas de importación de Cloud Storage

Elige esta opción cuando no tengas un tema de importación de Cloud Storage disponible en tu proyecto.

  1. En la consola de Google Cloud, ve a la página IAM.

    Ir a IAM

  2. Habilita la opción Incluir asignaciones de roles proporcionadas por Google.

  3. Busca la cuenta de servicio de Pub/Sub que tiene el siguiente formato:

    service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com

  4. En esta cuenta de servicio, haz clic en el botón Editar principal.

  5. Si es necesario, haz clic en Agregar otro rol.

  6. Busca y selecciona el rol de publicador de Pub/Sub (roles/pubsub.publisher).

  7. Haz clic en Guardar.

Habilita la publicación en un solo tema de importación de Cloud Storage

Si deseas otorgarle a Pub/Sub el permiso para publicar en un tema de importación de Cloud Storage específico que ya existe, sigue estos pasos:

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud pubsub topics add-iam-policy-binding:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    Reemplaza lo siguiente:

    • TOPIC_ID es el ID o el nombre del tema de importación de Cloud Storage.

    • PROJECT_NUMBER es el número del proyecto. Para ver el número del proyecto, consulta Identifica proyectos.

Asigna roles de Cloud Storage a la cuenta de servicio de Pub/Sub

Para crear un tema de importación de Cloud Storage, la cuenta de servicio de Pub/Sub debe tener permiso para leer desde el bucket específico de Cloud Storage. Los siguientes permisos son obligatorios:

  • storage.objects.list
  • storage.objects.get
  • storage.buckets.get

Para asignar estos permisos a la cuenta de servicio de Pub/Sub, elige uno de los siguientes procedimientos:

  • Otorga permisos a nivel del bucket. En el bucket específico de Cloud Storage, otorga los roles de lector de objetos heredados de almacenamiento (roles/storage.legacyObjectReader) y de lector de buckets heredados de almacenamiento (roles/storage.legacyBucketReader) a la cuenta de servicio de Pub/Sub.

  • Si debes otorgar roles a nivel del proyecto, puedes otorgar el rol de administrador de almacenamiento (roles/storage.admin) en el proyecto que contiene el bucket de Cloud Storage. Otorga este rol a la cuenta de servicio de Pub/Sub.

Permisos de depósitos

Sigue los pasos que se indican a continuación para otorgar los roles de lector de objetos heredados de almacenamiento (roles/storage.legacyObjectReader) y de lector de buckets heredados de almacenamiento (roles/storage.legacyBucketReader) a la cuenta de servicio de Pub/Sub a nivel del bucket:

  1. En la consola de Google Cloud, ve a la página de Cloud Storage.

    Ir a Cloud Storage

  2. Haz clic en el bucket de Cloud Storage desde el que deseas leer los mensajes y, luego, en el tema de importación de Cloud Storage.

    Se abrirá la página Detalles del bucket.

  3. En la página Detalles del bucket, haz clic en la pestaña Permisos.

  4. En la pestaña Permisos > Ver por principales, haz clic en Otorgar acceso.

    Se abrirá la página Otorgar acceso.

  5. En la sección Agregar principales, ingresa el nombre de tu cuenta de servicio de Pub/Sub.

    El formato de la cuenta de servicio es service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. Por ejemplo, para un proyecto con PROJECT_NUMBER=112233445566, la cuenta de servicio tiene el formato service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com.

  6. En el menú desplegable Asignar roles > Seleccionar un rol, ingresa Object Reader y selecciona el rol Lector de objetos heredados de Storage.

  7. Haz clic en Agregar otra función.

  8. En el menú desplegable Seleccionar un rol, ingresa Bucket Reader y selecciona el rol Lector de buckets heredados de almacenamiento.

  9. Haz clic en Guardar.

Permisos del proyecto

Sigue estos pasos para otorgar el rol Administrador de almacenamiento (roles/storage.admin) a nivel del proyecto:

  1. En la consola de Google Cloud, ve a la página IAM.

    Ir a IAM

  2. En la pestaña Permisos > Ver por principales, haz clic en Otorgar acceso.

    Se abrirá la página Otorgar acceso.

  3. En la sección Agregar principales, ingresa el nombre de tu cuenta de servicio de Pub/Sub.

    El formato de la cuenta de servicio es service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. Por ejemplo, para un proyecto con PROJECT_NUMBER=112233445566, la cuenta de servicio tiene el formato service-112233445566@gcp-sa-pubsub.iam.gserviceaccount.com.

  4. En el menú desplegable Asignar roles > Seleccionar un rol, ingresa Storage Admin y selecciona el rol Administrador de almacenamiento.

  5. Haz clic en Guardar.

Si deseas obtener más información sobre la IAM de Cloud Storage, consulta Administración de identidades y accesos de Cloud Storage.

Propiedades de los temas de importación de Cloud Storage

Para obtener más información sobre las propiedades comunes de todos los temas, consulta Propiedades de un tema.

Nombre del bucket

Es el nombre del bucket de Cloud Storage desde el que Pub/Sub lee los datos que se publican en un tema de importación de Cloud Storage.

Formato de entrada

Cuando creas un tema de importación de Cloud Storage, puedes especificar el formato de los objetos que se transferirán como Texto, Avro o Avro de Pub/Sub.

  • Texto: Se supone que los objetos contienen datos con texto sin formato. Este formato de entrada intenta transferir todos los objetos del bucket, siempre y cuando el objeto cumpla con el tiempo mínimo de creación del objeto y coincida con los criterios del patrón glob.

    Delimitador. También puedes especificar un delimitador con el que se dividen los objetos en mensajes. Si no se establece, se usará de forma predeterminada el carácter de línea nueva (\n). El delimitador solo debe ser un carácter único.

  • Avro. Los objetos están en formato binario de Apache Avro. No se transferirá ningún objeto que no tenga un formato Apache Avro válido. Estas son las limitaciones de Avro:

    • No se admiten las versiones 1.1.0 y 1.2.0 de Avro.
    • El tamaño máximo de un bloque de Avro es de 16 MB.
  • Avro de Pub/Sub. Los objetos están en el formato binario de Apache Avro con un esquema que coincide con el de un objeto escrito en Cloud Storage con una suscripción a Cloud Storage de Pub/Sub con el formato de archivo Avro. Estos son algunos lineamientos importantes para Avro de Pub/Sub:

    • El campo de datos del registro Avro se usa para propagar el campo de datos del mensaje de Pub/Sub generado.

    • Si se especifica la opción write_metadata para la suscripción a Cloud Storage, cualquier valor en el campo de atributos se propaga como los atributos del mensaje de Pub/Sub generado.

    • Si se especifica una clave de orden en el mensaje original escrito en Cloud Storage, este campo se propaga como un atributo con el nombre original_message_ordering_key en el mensaje de Pub/Sub generado.

Tiempo mínimo de creación del objeto

De manera opcional, puedes especificar un tiempo mínimo de creación de objetos cuando creas un tema de importación de Cloud Storage. Solo se transfieren los objetos que se crearon en esta marca de tiempo o después de ella. Esta marca de tiempo se debe proporcionar en un formato como YYYY-MM-DDThh:mm:ssZ. Cualquier fecha, pasada o futura, del 0001-01-01T00:00:00Z al 9999-12-31T23:59:59Z inclusive, es válida.

Coincidir con el patrón glob

De manera opcional, puedes especificar un patrón de concordancia de glob cuando creas un tema de importación de Cloud Storage. Solo se transfieren los objetos con nombres que coinciden con este patrón. Por ejemplo, para transferir todos los objetos con el sufijo .txt, puedes especificar el patrón glob como **.txt.

Para obtener información sobre la sintaxis admitida en los patrones glob, consulta la documentación de Cloud Storage.

Crea un tema de importación de Cloud Storage

Asegúrate de haber completado los siguientes procedimientos:

Si creas el tema y la suscripción por separado, incluso si lo haces de forma rápida, puedes perder datos. Hay un período breve en el que el tema existe sin una suscripción. Si se envían datos al tema durante este tiempo, se perderán. Si primero creas el tema, creas la suscripción y, luego, lo conviertes en un tema de importación, garantizas que no se pierdan mensajes durante el proceso de importación.

Para crear un tema de importación de Cloud Storage, sigue estos pasos:

Console

  1. En la consola de Google Cloud, ve a la página Temas.

    Ir a temas

  2. Haz clic en Crear tema.

    Se abrirá la página de detalles del tema.

  3. En el campo ID de tema, ingresa un ID para tu tema de importación de Cloud Storage.

    Para obtener más información sobre cómo nombrar temas, consulta los lineamientos de nombres.

  4. Selecciona Agregar una suscripción predeterminada.

  5. Selecciona Habilitar transferencia.

  6. En Fuente de transferencia, selecciona Google Cloud Storage.

  7. En el bucket de Cloud Storage, haz clic en Explorar.

    Se abrirá la página Seleccionar bucket. Selecciona una de las siguientes opciones:

    • Selecciona un bucket existente de cualquier proyecto adecuado.

    • Haz clic en el ícono de crear y sigue las instrucciones en pantalla para crear un bucket nuevo. Después de crear el bucket, selecciónalo para el tema de importación de Cloud Storage.

  8. Cuando especificas el bucket, Pub/Sub busca los permisos adecuados en el bucket para la cuenta de servicio de Pub/Sub. Si hay problemas de permisos, verás un mensaje similar al siguiente:

    Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions.

    Si tienes problemas de permisos, haz clic en Establecer permisos. Para obtener más información, consulta Otorga permisos de Cloud Storage a la cuenta de servicio de Pub/Sub.

  9. En Formato de objeto, selecciona Texto, Avro o Avro de Pub/Sub.

    Si seleccionas Texto, puedes especificar de manera opcional un Delimitador con el que dividir los objetos en mensajes.

    Para obtener más información sobre estas opciones, consulta Formato de entrada.

  10. Opcional. Puedes especificar un tiempo mínimo de creación de objetos para tu tema. Si se establece, solo se transferirán los objetos creados después del tiempo mínimo de creación del objeto.

    Para obtener más información, consulta Tiempo mínimo de creación del objeto.

  11. Debes especificar un patrón glob. Para transferir todos los objetos del bucket, usa ** como patrón glob. Si se establece, solo se transfieren los objetos que coinciden con el patrón determinado.

    Para obtener más información, consulta Cómo hacer coincidir un patrón de glob.

  12. Mantén la configuración predeterminada.
  13. Haz clic en Crear tema.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud pubsub topics create:

    gcloud pubsub topics create TOPIC_ID\
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    En el comando, solo se requieren TOPIC_ID, la marca --cloud-storage-ingestion-bucket y la marca --cloud-storage-ingestion-input-format. Las marcas restantes son opcionales y se pueden omitir.

    Reemplaza lo siguiente:

    • TOPIC_ID: Es el nombre o el ID de tu tema.

    • BUCKET_NAME: Especifica el nombre de un bucket existente. Por ejemplo, prod_bucket El nombre del bucket no debe incluir el ID del proyecto. Para crear un bucket, consulta Crea buckets.

    • INPUT_FORMAT: Especifica el formato de los objetos que se transfieren. Puede ser text, avro o pubsub_avro. Para obtener más información sobre estas opciones, consulta Formato de entrada.

    • TEXT_DELIMITER: Especifica el delimitador con el que se dividirán los objetos de texto en mensajes de Pub/Sub. Debe ser un solo carácter y solo debe establecerse cuando INPUT_FORMAT sea text. El valor predeterminado es el carácter de salto de línea (\n).

      Cuando uses gcloud CLI para especificar el delimitador, presta mucha atención a la manipulación de caracteres especiales, como la línea nueva \n. Usa el formato '\n' para asegurarte de que el delimitador se interprete correctamente. Simplemente, usar \n sin comillas o escapar los resultados genera un delimitador de "n".

    • MINIMUM_OBJECT_CREATE_TIME: Especifica el tiempo mínimo en el que se creó un objeto para que se transfiera. Debe estar en UTC con el formato YYYY-MM-DDThh:mm:ssZ. Por ejemplo, 2024-10-14T08:30:30Z.

      Cualquier fecha, pasada o futura, del 0001-01-01T00:00:00Z al 9999-12-31T23:59:59Z inclusive, es válida.

    • MATCH_GLOB: Especifica el patrón glob que debe coincidir para que se transfiera un objeto. Cuando usas gcloud CLI, un glob de coincidencia con caracteres * debe tener el carácter * con formato de escape en el formato \*\*.txt o todo el glob de coincidencia debe estar entre comillas "**.txt" o '**.txt'. Para obtener información sobre la sintaxis admitida en los patrones glob, consulta la documentación de Cloud Storage.

Go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go que encontrarás en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Go de Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// bucket := "my-bucket"
	// matchGlob := "**.txt"
	// minimumObjectCreateTime := "2006-01-02T15:04:05Z"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	minCreateTime, err := time.Parse(time.RFC3339, minimumObjectCreateTime)
	if err != nil {
		return err
	}

	cfg := &pubsub.TopicConfig{
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceCloudStorage{
				Bucket: bucket,
				// Alternatively, can be Avro or PubSubAvro formats. See
				InputFormat: &pubsub.IngestionDataSourceCloudStorageTextFormat{
					Delimiter: ",",
				},
				MatchGlob:               matchGlob,
				MinimumObjectCreateTime: minCreateTime,
			},
		},
	}
	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Cloud storage topic created: %v\n", t)
	return nil
}

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java que encontrarás en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Java de Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.text.ParseException;

public class CreateTopicWithCloudStorageIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Cloud Storage ingestion settings.
    // bucket and inputFormat are required arguments.
    String bucket = "your-bucket";
    String inputFormat = "text";
    String textDelimiter = "\n";
    String matchGlob = "**.txt";
    String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ";

    createTopicWithCloudStorageIngestionExample(
        projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
  }

  public static void createTopicWithCloudStorageIngestionExample(
      String projectId,
      String topicId,
      String bucket,
      String inputFormat,
      String textDelimiter,
      String matchGlob,
      String minimumObjectCreateTime)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder =
          IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket);
      switch (inputFormat) {
        case "text":
          cloudStorageBuilder.setTextFormat(
              IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder()
                  .setDelimiter(textDelimiter)
                  .build());
          break;
        case "avro":
          cloudStorageBuilder.setAvroFormat(
              IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance());
          break;
        case "pubsub_avro":
          cloudStorageBuilder.setPubsubAvroFormat(
              IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance());
          break;
        default:
          throw new IllegalArgumentException(
              "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat);
      }

      if (matchGlob != null && !matchGlob.isEmpty()) {
        cloudStorageBuilder.setMatchGlob(matchGlob);
      }

      if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) {
        try {
          cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime));
        } catch (ParseException e) {
          System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime);
        }
      }

      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder()
              .setCloudStorage(cloudStorageBuilder.build())
              .build();

      TopicName topicName = TopicName.of(projectId, topicId);

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js que encontrarás en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId,
  bucket,
  inputFormat,
  textDelimiter,
  matchGlob,
  minimumObjectCreateTime
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python que encontrarás en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Python de Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

from google.cloud import pubsub_v1
from google.protobuf import timestamp_pb2
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bucket = "your-bucket"
# input_format = "text"  (can be one of "text", "avro", "pubsub_avro")
# text_delimiter = "\n"
# match_glob = "**.txt"
# minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
    bucket=bucket,
)
if input_format == "text":
    cloud_storage_settings.text_format = (
        IngestionDataSourceSettings.CloudStorage.TextFormat(
            delimiter=text_delimiter
        )
    )
elif input_format == "avro":
    cloud_storage_settings.avro_format = (
        IngestionDataSourceSettings.CloudStorage.AvroFormat()
    )
elif input_format == "pubsub_avro":
    cloud_storage_settings.pubsub_avro_format = (
        IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
    )
else:
    print(
        "Invalid input_format: "
        + input_format
        + "; must be in ('text', 'avro', 'pubsub_avro')"
    )
    return

if match_glob:
    cloud_storage_settings.match_glob = match_glob

if minimum_object_create_time:
    try:
        minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
        minimum_object_create_time_timestamp.FromJsonString(
            minimum_object_create_time
        )
        cloud_storage_settings.minimum_object_create_time = (
            minimum_object_create_time_timestamp
        )
    except ValueError:
        print("Invalid minimum_object_create_time: " + minimum_object_create_time)
        return

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        cloud_storage=cloud_storage_settings,
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")

C++

Antes de probar esta muestra, sigue las instrucciones de configuración de C++ que encontrarás en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de C++ de Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string bucket, std::string const& input_format,
   std::string text_delimiter, std::string match_glob,
   std::string const& minimum_object_create_time) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto& cloud_storage = *request.mutable_ingestion_data_source_settings()
                             ->mutable_cloud_storage();
  cloud_storage.set_bucket(std::move(bucket));
  if (input_format == "text") {
    cloud_storage.mutable_text_format()->set_delimiter(
        std::move(text_delimiter));
  } else if (input_format == "avro") {
    cloud_storage.mutable_avro_format();
  } else if (input_format == "pubsub_avro") {
    cloud_storage.mutable_pubsub_avro_format();
  } else {
    std::cout << "input_format must be in ('text', 'avro', 'pubsub_avro'); "
                 "got value: "
              << input_format << std::endl;
    return;
  }

  if (!match_glob.empty()) {
    cloud_storage.set_match_glob(std::move(match_glob));
  }

  if (!minimum_object_create_time.empty()) {
    google::protobuf::Timestamp timestamp;
    if (!google::protobuf::util::TimeUtil::FromString(
            minimum_object_create_time,
            cloud_storage.mutable_minimum_object_create_time())) {
      std::cout << "Invalid minimum object create time: "
                << minimum_object_create_time << std::endl;
    }
  }

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido de Pub/Sub mediante bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId: string,
  bucket: string,
  inputFormat: string,
  textDelimiter: string,
  matchGlob: string,
  minimumObjectCreateTime: string
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata: TopicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Si tienes problemas, consulta Cómo solucionar problemas relacionados con un tema de importación de Cloud Storage.

Edita un tema de importación de Cloud Storage

Puedes editar un tema de importación de Cloud Storage para actualizar sus propiedades.

Por ejemplo, para reiniciar la transferencia, puedes cambiar el bucket o actualizar el tiempo mínimo de creación de objetos.

Para editar un tema de importación de Cloud Storage, sigue estos pasos:

Console

  1. En la consola de Google Cloud, ve a la página Temas.

    Ir a temas

  2. Haz clic en el tema de importación de Cloud Storage.

  3. En la página de detalles del tema, haz clic en Editar.

  4. Actualiza los campos que deseas cambiar.

  5. Haz clic en Actualizar.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Para evitar perder la configuración del tema de importación, asegúrate de incluirlos todos cada vez que lo actualices. Si omites algo, Pub/Sub restablecerá la configuración a su valor predeterminado original.

    Ejecuta el comando gcloud pubsub topics update con todas las marcas mencionadas en la siguiente muestra:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Reemplaza lo siguiente:

    • TOPIC_ID es el ID o el nombre del tema. No se puede actualizar este campo.

    • BUCKET_NAME: Especifica el nombre de un bucket existente. Por ejemplo, prod_bucket El nombre del bucket no debe incluir el ID del proyecto. Para crear un bucket, consulta Crea buckets.

    • INPUT_FORMAT: Especifica el formato de los objetos que se transfieren. Puede ser text, avro o pubsub_avro. Consulta Formato de entrada para obtener más información sobre estas opciones.

    • TEXT_DELIMITER: Especifica el delimitador con el que se dividirán los objetos de texto en mensajes de Pub/Sub. Debe ser un solo carácter y solo debe establecerse cuando INPUT_FORMAT sea text. El valor predeterminado es el carácter de salto de línea (\n).

      Cuando uses gcloud CLI para especificar el delimitador, presta mucha atención a la manipulación de caracteres especiales, como el carácter de línea nueva \n. Usa el formato '\n' para asegurarte de que el delimitador se interprete correctamente. Simplemente, usar \n sin comillas o sin escapar los resultados genera un delimitador de "n".

    • MINIMUM_OBJECT_CREATE_TIME: Especifica el tiempo mínimo en el que se creó un objeto para que se transfiera. Debe estar en UTC con el formato YYYY-MM-DDThh:mm:ssZ. Por ejemplo, 2024-10-14T08:30:30Z.

      Cualquier fecha, pasada o futura, del 0001-01-01T00:00:00Z al 9999-12-31T23:59:59Z inclusive, es válida.

    • MATCH_GLOB: Especifica el patrón glob que debe coincidir para que se transfiera un objeto. Cuando usas gcloud CLI, un glob de coincidencia con caracteres * debe tener el carácter * con formato de escape en el formato \*\*.txt o todo el glob de coincidencia debe estar entre comillas "**.txt" o '**.txt'. Para obtener información sobre la sintaxis admitida en los patrones glob, consulta la documentación de Cloud Storage.

Cuotas y límites de los temas de importación de Cloud Storage

La capacidad de procesamiento del publicador para los temas de importación está limitada por la cuota de publicación del tema. Para obtener más información, consulta Cuotas y límites de Pub/Sub.

¿Qué sigue?