Questo documento descrive come scrivere dati di testo da Dataflow a
Pub/Sub utilizzando il PubSubIO
connettore I/O Apache Beam.
Panoramica
Per scrivere dati in Pub/Sub, utilizza il connettore PubSubIO
. Gli elementi di input possono essere messaggi Pub/Sub o solo i dati del messaggio.
Se gli elementi di input sono messaggi Pub/Sub, puoi facoltativamente
impostare attributi o una chiave di ordinamento per ogni messaggio.
Puoi utilizzare la versione Java, Python o Go del connettore PubSubIO
, come segue:
Java
Per scrivere in un singolo argomento, chiama il metodo
PubsubIO.writeMessages
. Questo
metodo accetta una raccolta di input di oggetti PubsubMessage
. Il connettore
definisce anche metodi pratici per scrivere stringhe, messaggi Avro con codifica binaria
o messaggi protobuf con codifica binaria. Questi metodi convertono la raccolta di input in messaggi Pub/Sub.
Per scrivere in un insieme dinamico di argomenti in base ai dati di input, chiama
writeMessagesDynamic
. Specifica
l'argomento di destinazione per ogni messaggio chiamando PubsubMessage.withTopic
sul
messaggio. Ad esempio, puoi instradare i messaggi a argomenti diversi in base al valore di un determinato campo nei dati di input.
Per saperne di più, consulta la
documentazione di riferimento di PubsubIO
.
Python
Chiama il metodo pubsub.WriteToPubSub
.
Per impostazione predefinita, questo metodo accetta una raccolta di input di tipo bytes
,
che rappresenta il payload del messaggio. Se il parametro with_attributes
è
True
, il metodo accetta una raccolta di oggetti PubsubMessage
.
Per saperne di più, consulta la documentazione di riferimento del modulo pubsub
.
Vai
Per scrivere dati in Pub/Sub, chiama il metodo
pubsubio.Write
. Questo metodo accetta una raccolta di input di oggetti PubSubMessage
o sezioni di byte che contengono i payload dei messaggi.
Per saperne di più, consulta la
documentazione di riferimento del
pacchetto pubsubio
.
Per ulteriori informazioni sui messaggi Pub/Sub, consulta Formato dei messaggi nella documentazione di Pub/Sub.
Timestamp
Pub/Sub imposta un timestamp su ogni messaggio. Questo timestamp
rappresenta l'ora in cui il messaggio viene pubblicato su Pub/Sub. In uno scenario di streaming, potresti anche essere interessato al timestamp evento, ovvero l'ora in cui sono stati generati i dati del messaggio. Puoi utilizzare il timestamp dell'elemento di Apache Beam per rappresentare l'ora dell'evento. Le origini che creano un PCollection
senza limiti spesso
assegnano a ogni nuovo elemento un timestamp che corrisponde all'ora dell'evento.
Per Java e Python, il connettore I/O Pub/Sub può scrivere il timestamp di ogni elemento come attributo del messaggio Pub/Sub. I consumer di messaggi possono utilizzare questo attributo per ottenere il timestamp dell'evento.
Java
Chiama PubsubIO.Write<T>.withTimestampAttribute
e specifica il nome dell'attributo.
Python
Specifica il parametro timestamp_attribute
quando chiami WriteToPubSub
.
Consegna messaggi
Dataflow supporta l'elaborazione exactly-once dei messaggi all'interno di una pipeline. Tuttavia, il connettore I/O Pub/Sub non può garantire la consegna "exactly-once" dei messaggi tramite Pub/Sub.
Per Java e Python, puoi configurare il connettore Pub/Sub I/O per scrivere l'ID univoco di ogni elemento come attributo del messaggio. I consumatori di messaggi possono quindi utilizzare questo attributo per deduplicare i messaggi.
Java
Chiama PubsubIO.Write<T>.withIdAttribute
e specifica il nome dell'attributo.
Python
Specifica il parametro id_label
quando chiami WriteToPubSub
.
Uscita diretta
Se attivi la modalità di streaming almeno una volta nella pipeline, il connettore I/O utilizza l'output diretto. In questa modalità, il connettore non esegue il checkpoint dei messaggi, il che consente scritture più veloci. Tuttavia, i nuovi tentativi in questa modalità potrebbero causare messaggi duplicati con ID messaggio diversi, rendendo forse più difficile per i consumatori di messaggi deduplicare i messaggi.
Per le pipeline che utilizzano la modalità exactly-once, puoi attivare l'output diretto
impostando l'streaming_enable_pubsub_direct_output
opzione di servizio. L'output diretto
riduce la latenza di scrittura e consente un'elaborazione più efficiente. Prendi in considerazione questa
opzione se i consumer di messaggi possono gestire messaggi duplicati con ID messaggio
non univoci.
Esempi
Il seguente esempio crea un PCollection
di messaggi Pub/Sub
e li scrive in un argomento Pub/Sub. L'argomento è specificato come
opzione della pipeline. Ogni messaggio contiene dati di payload e un insieme di attributi.
Java
Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.
Python
Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.