Questa pagina illustra le best practice da utilizzare per lo sviluppo delle pipeline Dataflow. L'utilizzo di queste best practice offre i seguenti vantaggi:
- Migliorare l'osservabilità e le prestazioni della pipeline
- Maggiore produttività degli sviluppatori
- Migliorare la testabilità della pipeline
Gli esempi di codice Apache Beam in questa pagina utilizzano Java, ma i contenuti si applicano agli SDK Apache Beam per Java, Python e Go.
Domande da considerare
Quando progetti la pipeline, considera le seguenti domande:
- Dove sono memorizzati i dati di input della pipeline? Quanti set di dati di input hai?
- Che aspetto hanno i tuoi dati?
- Cosa vuoi fare con i tuoi dati?
- Dove devono essere inviati i dati di output della pipeline?
- Il tuo job Dataflow utilizza Assured Workloads?
Utilizzare i modelli
Per accelerare lo sviluppo della pipeline, anziché creare una pipeline scrivendo codice Apache Beam, utilizza un modello Dataflow se possibile. I modelli offrono i seguenti vantaggi:
- I modelli sono riutilizzabili.
- I modelli ti consentono di personalizzare ogni job modificando parametri specifici della pipeline.
- Chiunque disponga delle autorizzazioni può utilizzare il modello per eseguire il deployment della pipeline. Ad esempio, uno sviluppatore può creare un job da un modello e un data scientist dell'organizzazione può eseguire il deployment del modello in un secondo momento.
Puoi utilizzare un modello fornito da Google oppure crearne uno tuo. Alcuni modelli forniti da Google ti consentono di aggiungere logica personalizzata come passaggio della pipeline. Ad esempio, il modello Pub/Sub to BigQuery fornisce un parametro per eseguire una funzione JavaScript definita dall'utente (UDF) memorizzata in Cloud Storage.
Poiché i modelli forniti da Google sono open source ai sensi della Licenza Apache 2.0, puoi utilizzarli come base per nuove pipeline. I modelli sono utili anche come esempi di codice. Visualizza il codice del modello nel repository GitHub.
Assured Workloads
Assured Workloads contribuisce a applicare i requisiti di sicurezza e conformità per i clienti Google Cloud. Ad esempio, regioni UE e assistenza con controlli di sovranità contribuisce a garantire la residenza dei dati e la sovranità dei dati per i clienti con sede nell'UE. Per fornire queste funzionalità, alcune funzionalità di Dataflow sono limitate o limitate. Se utilizzi Assured Workloads con Dataflow, tutte le risorse a cui accede la pipeline devono trovarsi nel progetto o nella cartella Assured Workloads della tua organizzazione. Le risorse includono:
- Bucket Cloud Storage
- Set di dati di BigQuery
- Argomenti e iscrizioni Pub/Sub
- Set di dati Firestore
- Connettori I/O
In Dataflow, per i job in streaming creati dopo il 7 marzo 2024, tutti i dati utente vengono criptati con CMEK.
Per i job di streaming creati prima del 7 marzo 2024, le chiavi di dati utilizzate nelle operazioni basate su chiavi, come finestre, raggruppamenti e unioni, non sono protette dalla crittografia CMEK. Per attivare questa crittografia per i tuoi job, svuotalo o annullalo, quindi riavvialo. Per ulteriori informazioni, consulta Crittografia degli elementi dello stato della pipeline.
Condividere i dati tra le pipeline
Non esiste un meccanismo di comunicazione tra pipeline specifico di Dataflow per condividere dati o contesto di elaborazione tra le pipeline. Puoi utilizzare un'unità di archiviazione permanente come Cloud Storage o una cache in memoria come App Engine per condividere i dati tra le istanze della pipeline.
Pianifica job
Puoi automatizzare l'esecuzione della pipeline nei seguenti modi:
- Utilizza Cloud Scheduler.
- Utilizza l'operatore Dataflow di Apache Airflow, uno dei numerosi operatori Google Cloud in un flusso di lavoro Cloud Composer.
- Esegui processi di job personalizzati (cron) su Compute Engine.
Best practice per la scrittura del codice della pipeline
Le sezioni seguenti forniscono le best practice da utilizzare quando crei pipeline scrivendo codice Apache Beam.
Strutturare il codice Apache Beam
Per creare pipeline, è comune utilizzare la trasformazione Apache Beam di elaborazione parallela generica
ParDo
.
Quando applichi una trasformazione ParDo
, fornisci il codice sotto forma di oggetto
DoFn
. DoFn
è una classe SDK Apache Beam che definisce una funzione di elaborazione distribuita.
Puoi considerare il codice DoFn
come entità piccole e indipendenti: possono essere eseguite potenzialmente molte istanze su macchine diverse, ciascuna senza conoscenza delle altre. Pertanto, consigliamo di creare funzioni pure, che sono ideali per la natura parallela e distribuita degli elementi DoFn
.
Le funzioni pure hanno le seguenti caratteristiche:
- Le funzioni pure non dipendono da uno stato nascosto o esterno.
- Non hanno effetti collaterali osservabili.
- Sono deterministici.
Il modello di funzione pura non è rigidamente rigido. Quando il codice non dipende da elementi che non sono garantiti dal servizio Dataflow, le informazioni sullo stato o i dati di inizializzazione esterni possono essere validi per DoFn
e altri oggetti funzione.
Quando strutturi le trasformazioni ParDo
e crei gli elementi DoFn
,
considera le seguenti linee guida:
- Quando utilizzi l'elaborazione exactly-once, il servizio Dataflow garantisce che ogni elemento dell'input
PCollection
venga elaborato da un'istanzaDoFn
esattamente una volta. - Il servizio Dataflow non garantisce il numero di volte in cui viene invocato un
DoFn
. - Il servizio Dataflow non garantisce esattamente in che modo vengono raggruppati gli elementi distribuiti. Non garantisce quali, se presenti, elementi vengono elaborati insieme.
- Il servizio Dataflow non garantisce il numero esatto di istanze
DoFn
create nel corso di una pipeline. - Il servizio Dataflow è tollerante agli errori e potrebbe riprovare il codice più volte se i worker riscontrano problemi.
- Il servizio Dataflow potrebbe creare copie di backup del codice. Potrebbero verificarsi problemi con gli effetti collaterali manuali, ad esempio se il codice si basa su o crea file temporanei con nomi non univoci.
- Il servizio Dataflow esegue la serializzazione dell'elaborazione degli elementi per
DoFn
istanza. Il codice non deve essere rigorosamente thread-safe, ma qualsiasi stato condiviso tra più istanze diDoFn
deve essere thread-safe.
Creare librerie di trasformazioni riutilizzabili
Il modello di programmazione Apache Beam ti consente di riutilizzare le trasformazioni. Creando una libreria condivisa di trasformazioni comuni, puoi migliorare la riutilizzabilità, la testabilità e la proprietà del codice da parte di team diversi.
Considera i seguenti due esempi di codice Java, che leggono entrambi gli eventi di pagamento. Supponendo che entrambe le pipeline eseguano la stessa elaborazione, possono utilizzare le stesse trasformazioni tramite una libreria condivisa per i restanti passaggi di elaborazione.
Il primo esempio proviene da un'origine Pub/Sub illimitata:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options)
// Initial read transform
PCollection<PaymentEvent> payments =
p.apply("Read from topic",
PubSubIO.readStrings().withTimestampAttribute(...).fromTopic(...))
.apply("Parse strings into payment events",
ParDo.of(new ParsePaymentEventFn()));
Il secondo esempio proviene da un'origine dati di database relazionale delimitata:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<PaymentEvent> payments =
p.apply(
"Read from database table",
JdbcIO.<PaymentEvent>read()
.withDataSourceConfiguration(...)
.withQuery(...)
.withRowMapper(new RowMapper<PaymentEvent>() {
...
}));
Il modo in cui implementi le best practice per la riutilizzabilità del codice varia in base al linguaggio di programmazione e allo strumento di compilazione. Ad esempio, se utilizzi Maven, puoi separare il codice di trasformazione in un proprio modulo. Puoi quindi includere il modulo come sottomodulo in progetti multi-modulo più grandi per pipeline diverse, come mostrato nel seguente esempio di codice:
// Reuse transforms across both pipelines
payments
.apply("ValidatePayments", new PaymentTransforms.ValidatePayments(...))
.apply("ProcessPayments", new PaymentTransforms.ProcessPayments(...))
...
Per ulteriori informazioni, consulta le seguenti pagine della documentazione di Apache Beam:
- Requisiti per la scrittura del codice utente per le trasformazioni Apache Beam
- Guida di stile
PTransform
: una guida di stile per gli autori di nuove collezioniPTransform
riutilizzabili
Utilizzare le code dei messaggi non recapitabili per la gestione degli errori
A volte la pipeline non riesce a elaborare gli elementi. I problemi relativi ai dati sono una causa comune. Ad esempio, un elemento che contiene JSON con formato non corretto può causare errori di analisi.
Sebbene sia possibile intercettare le eccezioni all'interno del metodo
DoFn.ProcessElement
, registrare l'errore e eliminare l'elemento, questo approccio comporta la perdita dei dati
e ne impedisce l'ispezione successiva per la gestione manuale o la risoluzione dei problemi.
Utilizza invece un pattern chiamato coda di messaggi non recapitati (coda di messaggi non elaborati).
Cattura le eccezioni nel metodo DoFn.ProcessElement
e registra
gli errori. Anziché eliminare l'elemento non riuscito,
utilizza le uscite di ramificazione per scrivere gli elementi non riusciti in un oggetto PCollection
separato. Questi elementi vengono poi scritti in un'area di destinazione dei dati per la successiva ispezione
e gestione con una trasformazione separata.
Il seguente esempio di codice Java mostra come implementare il pattern della coda di messaggi inutilizzati.
TupleTag<Output> successTag = new TupleTag<>() {};
TupleTag<Input> deadLetterTag = new TupleTag<>() {};
PCollection<Input> input = /* ... */;
PCollectionTuple outputTuple =
input.apply(ParDo.of(new DoFn<Input, Output>() {
@Override
void processElement(ProcessContext c) {
try {
c.output(process(c.element()));
} catch (Exception e) {
LOG.severe("Failed to process input {} -- adding to dead-letter file",
c.element(), e);
c.sideOutput(deadLetterTag, c.element());
}
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));
// Write the dead-letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
.apply(BigQueryIO.write(...));
// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing ...
Utilizza Cloud Monitoring per applicare diversi criteri di monitoraggio e invio di avvisi per la coda delle email inutilizzate della pipeline. Ad esempio, puoi visualizzare il numero e le dimensioni degli elementi elaborati dalla trasformazione delle caselle postali morto e configurare l'attivazione degli avvisi se vengono soddisfatte determinate condizioni di soglia.
Gestire le mutazioni dello schema
Puoi gestire i dati con schemi inaspettati, ma validi, utilizzando un pattern per le lettere morte, che scrive gli elementi non riusciti in un oggetto PCollection
separato.
In alcuni casi, è opportuno gestire automaticamente gli elementi
che riflettono uno schema mutato come elementi validi. Ad esempio, se lo schema di un elemento riflette una mutazione come l'aggiunta di nuovi campi, puoi adattare lo schema dell'elemento di destinazione dei dati in modo da adattarlo alle mutazioni.
La mutazione dello schema automatico si basa sull'approccio di output con ramificazione utilizzato dal pattern dead-letter. Tuttavia, in questo caso viene attivata una trasformazione che muta lo schema di destinazione ogni volta che vengono rilevati schemi additivi. Per un esempio di questo approccio, consulta Come gestire gli schemi JSON in mutazione in una pipeline di streaming, con Square Enix sul blog di Google Cloud.
Decidi come unire i set di dati
L'unione di set di dati è un caso d'uso comune per le pipeline di dati. Puoi utilizzare input laterali o la trasformazione CoGroupByKey
per eseguire join nella pipeline.
Ognuno ha vantaggi e svantaggi.
Gli input laterali
offrono un modo flessibile per risolvere problemi comuni di elaborazione dei dati, come l'arricchimento dei dati e le ricerche con chiavi. A differenza degli oggetti PCollection
, gli input laterali sono immutabili e possono essere determinati in fase di esecuzione. Ad esempio, i valori in un input secondario potrebbero essere calcolati da un altro ramo della pipeline o determinati chiamando un servizio remoto.
Dataflow supporta gli input laterali memorizzando i dati in un'archiviazione permanente, in modo simile a un disco condiviso. Questa configurazione rende disponibile l'input laterale completo per tutti i worker.
Tuttavia, le dimensioni degli input secondari possono essere molto grandi e potrebbero non rientrare nella memoria del worker. La lettura da un input laterale di grandi dimensioni può causare problemi di prestazioni se i worker devono leggere costantemente dallo spazio di archiviazione permanente.
La trasformazione CoGroupByKey
è una trasformazione di Apache Beam di base che unisce (appiattisce) più oggetti PCollection
e raggruppa gli elementi che hanno una chiave comune. A differenza di un input secondario, che rende disponibili tutti i dati dell'input secondario per ogni worker, CoGroupByKey
esegue un'operazione di ordinamento (raggruppamento) per distribuire i dati tra i worker. CoGroupByKey
è quindi ideale quando gli oggetti PCollection
da unire sono molto grandi e non rientrano nella memoria del worker.
Segui queste linee guida per decidere se utilizzare gli input laterali o
CoGroupByKey
:
- Utilizza gli input aggiuntivi quando uno degli oggetti
PCollection
che stai unendo è sproporzionatamente più piccolo degli altri e l'oggettoPCollection
più piccolo rientra nella memoria del worker. La memorizzazione nella cache dell'input laterale interamente in memoria consente di recuperare gli elementi in modo rapido ed efficiente. - Utilizza gli input aggiuntivi quando hai un oggetto
PCollection
che deve essere unito più volte nella pipeline. Anziché utilizzare più trasformazioniCoGroupByKey
, crea un singolo input secondario che può essere riutilizzato da più trasformazioniParDo
. - Utilizza
CoGroupByKey
se devi recuperare una grande parte di un oggettoPCollection
che supera in modo significativo la memoria del worker.
Per ulteriori informazioni, consulta Risolvere gli errori di esaurimento della memoria di Dataflow.
Riduci al minimo le operazioni per elemento costose
Un'istanza DoFn
elabora batch di elementi chiamati
bundle,
ovvero unità di lavoro atomiche costituite da zero o più
elementi. I singoli elementi vengono poi elaborati dal metodo
DoFn.ProcessElement
che viene eseguito per ogni elemento. Poiché il metodo DoFn.ProcessElement
viene chiamato per ogni elemento, eventuali operazioni che richiedono molto tempo o che sono computazionalmente complesse e richiamate da questo metodo vengono eseguite per ogni singolo elemento elaborato dal metodo.
Se devi eseguire operazioni dispendiose una sola volta per un batch di elementi, includile nel metodo DoFn.Setup
o nel metodo DoFn.StartBundle
anziché nell'elemento DoFn.ProcessElement
. Ecco alcuni esempi di operazioni:
Analisi di un file di configurazione che controlla alcuni aspetti del comportamento dell'
DoFn
istanza. Richiama questa azione una sola volta, quando viene inizializzata l'istanzaDoFn
, utilizzando il metodoDoFn.Setup
.L'inizializzazione di un client di breve durata che viene riutilizzato in tutti gli elementi di un pacchetto, ad esempio quando tutti gli elementi del pacchetto vengono inviati tramite una singola connessione di rete. Richiama questa azione una volta per bundle utilizzando il metodo
DoFn.StartBundle
.
Limita le dimensioni dei batch e le chiamate simultanee ai servizi esterni
Quando chiami servizi esterni, puoi ridurre i costi per chiamata utilizzando la trasformazione
GroupIntoBatches
. Questa trasformazione crea batch di elementi di una dimensione specificata.
Il raggruppamento in batch invia gli elementi a un servizio esterno come un unico payload anziché individualmente.
In combinazione con il batching, limita il numero massimo di chiamate parallele (contemporanee) al servizio esterno scegliendo chiavi appropriate per partizionare i dati in entrata. Il numero di partizioni determina la parallellizzazione massima. Ad esempio, se a ogni elemento viene assegnata la stessa chiave, una trasformazione a valle per la chiamata del servizio esterno non viene eseguita in parallelo.
Valuta uno dei seguenti approcci per generare chiavi per gli elementi:
- Scegli un attributo del set di dati da utilizzare come chiavi di dati, ad esempio gli ID utente.
- Genera chiavi di dati per suddividere gli elementi in modo casuale in un numero fisso di partizioni, dove il numero di possibili valori chiave determina il numero di partizioni. Devi creare partizioni sufficienti per il parallelismo.
Ogni partizione deve contenere elementi sufficienti per rendere utile la trasformazione
GroupIntoBatches
.
Il seguente esempio di codice Java mostra come suddividere in modo casuale gli elementi in dieci partitizioni:
// PII or classified data which needs redaction.
PCollection<String> sensitiveData = ...;
int numPartitions = 10; // Number of parallel batches to create.
PCollection<KV<Long, Iterable<String>>> batchedData =
sensitiveData
.apply("Assign data into partitions",
ParDo.of(new DoFn<String, KV<Long, String>>() {
Random random = new Random();
@ProcessElement
public void assignRandomPartition(ProcessContext context) {
context.output(
KV.of(randomPartitionNumber(), context.element()));
}
private static int randomPartitionNumber() {
return random.nextInt(numPartitions);
}
}))
.apply("Create batches of sensitive data",
GroupIntoBatches.<Long, String>ofSize(100L));
// Use batched sensitive data to fully utilize Redaction API,
// which has a rate limit but allows large payloads.
batchedData
.apply("Call Redaction API in batches", callRedactionApiOnBatch());
Identificare i problemi di prestazioni causati da passaggi fusi
Dataflow crea un grafo di passaggi che rappresenta la pipeline in base alle trasformazioni e ai dati utilizzati per costruirla. Questo grafico è chiamato grafico di esecuzione della pipeline.
Quando esegui il deployment della pipeline, Dataflow
potrebbe modificare il grafico di esecuzione della pipeline. Ad esempio, Dataflow potrebbe unire alcune operazioni, un processo noto come ottimizzazione della fusione, per evitare l'impatto sulle prestazioni e sui costi della scrittura di ogni oggettoPCollection
intermedio nella pipeline.
In alcuni casi, Dataflow potrebbe determinare erroneamente il modo ottimale per unire le operazioni nella pipeline, il che può limitare la capacità del job di utilizzare tutti i worker disponibili. In questi casi, puoi impedire l'unione delle operazioni.
Considera il seguente esempio di codice Apache Beam. Una trasformazione
GenerateSequence
crea un piccolo oggetto PCollection
delimitato, che viene poi elaborato ulteriormente da due trasformazioni ParDo
a valle.
La trasformazione Find Primes Less-than-N
potrebbe essere computazionalmente costosa ed è probabile che funzioni lentamente per numeri elevati. Al contrario, la trasformazioneIncrement Number
probabilmente viene completata rapidamente.
import com.google.common.math.LongMath;
...
public class FusedStepsPipeline {
final class FindLowerPrimesFn extends DoFn<Long, String> {
@ProcessElement
public void processElement(ProcessContext c) {
Long n = c.element();
if (n > 1) {
for (long i = 2; i < n; i++) {
if (LongMath.isPrime(i)) {
c.output(Long.toString(i));
}
}
}
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create(options);
PCollection<Long> sequence = p.apply("Generate Sequence",
GenerateSequence
.from(0)
.to(1000000));
// Pipeline branch 1
sequence.apply("Find Primes Less-than-N",
ParDo.of(new FindLowerPrimesFn()));
// Pipeline branch 2
sequence.apply("Increment Number",
MapElements.via(new SimpleFunction<Long, Long>() {
public Long apply(Long n) {
return ++n;
}
}));
p.run().waitUntilFinish();
}
}
Il seguente diagramma mostra una rappresentazione grafica della pipeline nell'interfaccia di monitoraggio di Dataflow.
L'interfaccia di monitoraggio di Dataflow mostra che si verifica la stessa bassa velocità di elaborazione per entrambe le trasformazioni, in particolare 13 elementi al secondo. Potresti aspettarti che la trasformazione Increment Number
elabori rapidamente gli elementi, ma sembra essere legata alla stessa velocità di elaborazione di Find Primes Less-than-N
.
Il motivo è che Dataflow ha unito i passaggi in un unico
livello, impedendone l'esecuzione in modo indipendente. Puoi utilizzare il comando
gcloud dataflow jobs describe
per trovare ulteriori informazioni:
gcloud dataflow jobs describe --full job-id --format json
Nell'output risultante, i passaggi fusi sono descritti nell'oggetto
ExecutionStageSummary
nell'array
ComponentTransform
:
...
"executionPipelineStage": [
{
"componentSource": [
...
],
"componentTransform": [
{
"name": "s1",
"originalTransform": "Generate Sequence/Read(BoundedCountingSource)",
"userName": "Generate Sequence/Read(BoundedCountingSource)"
},
{
"name": "s2",
"originalTransform": "Find Primes Less-than-N",
"userName": "Find Primes Less-than-N"
},
{
"name": "s3",
"originalTransform": "Increment Number/Map",
"userName": "Increment Number/Map"
}
],
"id": "S01",
"kind": "PAR_DO_KIND",
"name": "F0"
}
...
In questo caso, poiché la trasformazione Find Primes Less-than-N
è il passaggio lento, interrompere la fusione prima di questo passaggio è una strategia appropriata. Un metodo per scomporre i passaggi è inserire una trasformazione GroupByKey
e un'operazione di scomposizione prima del passaggio, come mostrato nell'esempio di codice Java riportato di seguito.
sequence
.apply("Map Elements", MapElements.via(new SimpleFunction<Long, KV<Long, Void>>() {
public KV<Long, Void> apply(Long n) {
return KV.of(n, null);
}
}))
.apply("Group By Key", GroupByKey.<Long, Void>create())
.apply("Emit Keys", Keys.<Long>create())
.apply("Find Primes Less-than-N", ParDo.of(new FindLowerPrimesFn()));
Puoi anche combinare questi passaggi di separazione in una trasformazione composita riutilizzabile.
Dopo aver annullato l'unione dei passaggi, quando esegui la pipeline, Increment Number
viene completata in pochi secondi e la trasformazione Find Primes Less-than-N
, molto più lunga, viene eseguita in una fase separata.
Questo esempio applica un'operazione di raggruppamento e sgruppamento ai passaggi di separazione.
Puoi utilizzare altri approcci per altre circostanze. In questo caso, la gestione dell'output duplicato non è un problema, dato l'output consecutivo della trasformazione GenerateSequence
.
Gli oggetti KV
con chiavi duplicate vengono deduplicati in una singola chiave nella trasformazione Gruppa (GroupByKey
) e nella trasformazione Unisci (Keys
). Per conservare i duplicati dopo le operazioni di raggruppamento e sgruppamento,
crea coppie chiave-valore seguendo questi passaggi:
- Utilizza una chiave casuale e l'input originale come valore.
- Raggruppa utilizzando la chiave casuale.
- Emette i valori per ogni chiave come output.
Puoi anche utilizzare una trasformazione
Reshuffle
per impedire la fusione delle trasformazioni circostanti. Tuttavia, gli effetti collaterali della trasformazioneReshuffle
non sono trasferibili su diversi runner Apache Beam.
Per ulteriori informazioni sull'ottimizzazione del parallelismo e della fusione, consulta Ciclo di vita della pipeline.
Utilizzare le metriche di Apache Beam per raccogliere informazioni sulle pipeline
Le metriche di Apache Beam sono una classe di utilità che produce metriche per la generazione di report sulle proprietà di una pipeline in esecuzione. Quando utilizzi Cloud Monitoring, le metriche Apache Beam sono disponibili come metriche personalizzate di Cloud Monitoring.
L'esempio seguente mostra le metriche Counter
di Apache Beam utilizzate in una sottoclasse DoFn
.
Il codice di esempio utilizza due contatori. Un contatore monitora gli errori di analisi del JSON
(malformedCounter
), mentre l'altro monitora se il messaggio JSON è valido, ma contiene un payload vuoto (emptyCounter
). In Cloud Monitoring,
i nomi delle metrica personalizzata sono custom.googleapis.com/dataflow/malformedJson
e
custom.googleapis.com/dataflow/emptyPayload
. Puoi utilizzare le metriche personalizzate per creare visualizzazioni e criteri di avviso in Cloud Monitoring.
final TupleTag<String> errorTag = new TupleTag<String>(){};
final TupleTag<MockObject> successTag = new TupleTag<MockObject>(){};
final class ParseEventFn extends DoFn<String, MyObject> {
private final Counter malformedCounter = Metrics.counter(ParseEventFn.class, "malformedJson");
private final Counter emptyCounter = Metrics.counter(ParseEventFn.class, "emptyPayload");
private Gson gsonParser;
@Setup
public setup() {
gsonParser = new Gson();
}
@ProcessElement
public void processElement(ProcessContext c) {
try {
MyObject myObj = gsonParser.fromJson(c.element(), MyObject.class);
if (myObj.getPayload() != null) {
// Output the element if non-empty payload
c.output(successTag, myObj);
}
else {
// Increment empty payload counter
emptyCounter.inc();
}
}
catch (JsonParseException e) {
// Increment malformed JSON counter
malformedCounter.inc();
// Output the element to dead-letter queue
c.output(errorTag, c.element());
}
}
}
Scopri di più
Le pagine seguenti forniscono ulteriori informazioni su come strutturare la pipeline, su come scegliere quali trasformazioni applicare ai dati e su cosa tenere presente quando si scelgono i metodi di input e output della pipeline.
Per ulteriori informazioni sulla creazione del codice utente, consulta i requisiti per le funzioni fornite dall'utente.