Escribir de Dataflow a Pub/Sub

En este documento se describe cómo escribir datos de texto de Dataflow en Pub/Sub mediante el PubSubIO conector de entrada/salida de Apache Beam.

Información general

Para escribir datos en Pub/Sub, usa el conector PubSubIO. Los elementos de entrada pueden ser mensajes de Pub/Sub o solo los datos del mensaje. Si los elementos de entrada son mensajes de Pub/Sub, puedes definir atributos o una clave de ordenación en cada mensaje.

Puedes usar la versión de Java, Python o Go del conector PubSubIO de la siguiente manera:

Java

Para escribir en un solo tema, llama al método PubsubIO.writeMessages. Este método toma una colección de entrada de objetos PubsubMessage. El conector también define métodos prácticos para escribir cadenas, mensajes de Avro codificados en formato binario o mensajes de protobuf codificados en formato binario. Estos métodos convierten la colección de entrada en mensajes de Pub/Sub.

Para escribir en un conjunto dinámico de temas en función de los datos de entrada, llama a writeMessagesDynamic. Especifica el tema de destino de cada mensaje llamando a PubsubMessage.withTopic en el mensaje. Por ejemplo, puedes enrutar mensajes a diferentes temas en función del valor de un campo concreto de tus datos de entrada.

Para obtener más información, consulta la documentación de referencia de PubsubIO.

Python

Llama al método pubsub.WriteToPubSub. De forma predeterminada, este método toma una colección de entrada de tipo bytes, que representa la carga útil del mensaje. Si el parámetro with_attributes es True, el método toma una colección de objetos PubsubMessage.

Para obtener más información, consulta la documentación de referencia del módulo pubsub.

Go

Para escribir datos en Pub/Sub, llama al método pubsubio.Write. Este método toma una colección de entrada de objetos PubSubMessage o de segmentos de bytes que contienen las cargas útiles de los mensajes.

Para obtener más información, consulta la documentación de referencia del paquete pubsubio.

Para obtener más información sobre los mensajes de Pub/Sub, consulta Formato de los mensajes en la documentación de Pub/Sub.

Marcas de tiempo

Pub/Sub asigna una marca de tiempo a cada mensaje. Esta marca de tiempo representa el momento en el que se publica el mensaje en Pub/Sub. En un escenario de streaming, también te puede interesar la marca de tiempo del evento, que es el momento en el que se generaron los datos del mensaje. Puedes usar la marca de tiempo del elemento de Apache Beam para representar la hora del evento. Las fuentes que crean un PCollection sin límites suelen asignar a cada elemento nuevo una marca de tiempo que corresponde a la hora del evento.

En Java y Python, el conector de E/S de Pub/Sub puede escribir la marca de tiempo de cada elemento como atributo de mensaje de Pub/Sub. Los consumidores de mensajes pueden usar este atributo para obtener la marca de tiempo del evento.

Java

Llama a PubsubIO.Write<T>.withTimestampAttribute y especifica el nombre del atributo.

Python

Especifica el parámetro timestamp_attribute cuando llames a WriteToPubSub.

Entrega de mensajes

Dataflow admite el procesamiento de mensajes una sola vez en un flujo de procesamiento. Sin embargo, el conector de E/S de Pub/Sub no puede garantizar la entrega exacta una sola vez de los mensajes a través de Pub/Sub.

En Java y Python, puedes configurar el conector de E/S de Pub/Sub para escribir el ID único de cada elemento como atributo de mensaje. Los consumidores de mensajes pueden usar este atributo para eliminar los mensajes duplicados.

Java

Llama a PubsubIO.Write<T>.withIdAttribute y especifica el nombre del atributo.

Python

Especifica el parámetro id_label cuando llames a WriteToPubSub.

Salida directa

Si habilitas el modo de streaming al menos una vez en tu canalización, el conector de E/S usará la salida directa. En este modo, el conector no crea puntos de control de los mensajes, lo que permite escribir más rápido. Sin embargo, los reintentos en este modo pueden provocar que se dupliquen mensajes con diferentes IDs de mensaje, lo que puede dificultar que los consumidores de mensajes eliminen los duplicados.

En las canalizaciones que usan el modo de entrega única, puedes habilitar la salida directa configurando la streaming_enable_pubsub_direct_output opción de servicio. La salida directa reduce la latencia de escritura y permite que el procesamiento sea más eficiente. Considera esta opción si tus consumidores de mensajes pueden gestionar mensajes duplicados con IDs de mensaje no únicos.

Ejemplos

En el siguiente ejemplo se crea un PCollection de mensajes de Pub/Sub y se escribe en un tema de Pub/Sub. El tema se especifica como una opción de canalización. Cada mensaje contiene datos de carga útil y un conjunto de atributos.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;



public class PubSubWriteWithAttributes {
  public interface Options extends PipelineOptions {
    @Description("The Pub/Sub topic to write to. Format: projects/<PROJECT>/topics/<TOPIC>")
    String getTopic();

    void setTopic(String value);
  }

  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  static class ExampleData {
    public String name;
    public String product;
    public Long timestamp; // Epoch time in milliseconds

    public ExampleData() {}

    public ExampleData(String name, String product, Long timestamp) {
      this.name = name;
      this.product = product;
      this.timestamp = timestamp;
    }
  }

  // Write messages to a Pub/Sub topic.
  public static void main(String[] args) {
    // Example source data.
    final List<ExampleData> messages = Arrays.asList(
        new ExampleData("Robert", "TV", 1613141590000L),
        new ExampleData("Maria", "Phone", 1612718280000L),
        new ExampleData("Juan", "Laptop", 1611618000000L),
        new ExampleData("Rebeca", "Videogame", 1610000000000L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --topic=projects/MY_PROJECT/topics/MY_TOPIC"
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        // Create some data to write to Pub/Sub.
        .apply(Create.of(messages))
        // Convert the data to Pub/Sub messages.
        .apply(MapElements
            .into(TypeDescriptor.of(PubsubMessage.class))
            .via((message -> {
              byte[] payload = message.product.getBytes(StandardCharsets.UTF_8);
              // Create attributes for each message.
              HashMap<String, String> attributes = new HashMap<String, String>();
              attributes.put("buyer", message.name);
              attributes.put("timestamp", Long.toString(message.timestamp));
              return new PubsubMessage(payload, attributes);
            })))
        // Write the messages to Pub/Sub.
        .apply(PubsubIO.writeMessages().to(options.getTopic()));
    pipeline.run().waitUntilFinish();
  }
}

Python

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import argparse
from typing import Any, Dict, List

import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.io import WriteToPubSub
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def item_to_message(item: Dict[str, Any]) -> PubsubMessage:
    # Re-import needed types. When using the Dataflow runner, this
    # function executes on a worker, where the global namespace is not
    # available. For more information, see:
    # https://cloud.google.com/dataflow/docs/guides/common-errors#name-error
    from apache_beam.io import PubsubMessage

    attributes = {"buyer": item["name"], "timestamp": str(item["ts"])}
    data = bytes(item["product"], "utf-8")

    return PubsubMessage(data=data, attributes=attributes)


def write_to_pubsub(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$TOPIC_PATH --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option to specify the Pub/Sub topic.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic", required=True)

    example_data = [
        {"name": "Robert", "product": "TV", "ts": 1613141590000},
        {"name": "Maria", "product": "Phone", "ts": 1612718280000},
        {"name": "Juan", "product": "Laptop", "ts": 1611618000000},
        {"name": "Rebeca", "product": "Video game", "ts": 1610000000000},
    ]
    options = MyOptions()

    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(example_data)
            | "Convert to Pub/Sub messages" >> beam.Map(item_to_message)
            | WriteToPubSub(topic=options.topic, with_attributes=True)
        )

    print("Pipeline ran successfully.")