Questa pagina fornisce indicazioni sulle best practice da seguire per creare ed eseguire flussi di lavoro HPC Dataflow altamente paralleli, tra cui come utilizzare il codice esterno nelle pipeline, come eseguire la pipeline e come gestire la gestione degli errori.
Includere codice esterno nella pipeline
Un elemento di differenziazione chiave delle pipeline altamente parallele è che utilizzano codice C++ all'interno di DoFn
anziché uno dei linguaggi dell'SDK Apache Beam standard. Per le pipeline Java, per semplificare l'utilizzo delle librerie C++ nella pipeline, ti consigliamo di utilizzare le chiamate di procedure esterne. Questa sezione descrive l'approccio generale utilizzato per eseguire codice esterno (C++) nelle pipeline Java.
Una definizione di pipeline Apache Beam ha diversi componenti chiave:
PCollections
sono raccolte immutabili di elementi omogenei.PTransforms
vengono utilizzati per definire le trasformazioni di unPCollection
che genera un altroPCollection
.- La pipeline è il costrutto che ti consente, tramite codice, di dichiarare le interazioni tra
PTransforms
ePCollections
. La pipeline è rappresentata come un grafo diretto aciclico (DAG).
Quando utilizzi codice di un linguaggio diverso da uno dei linguaggi standard dell'SDK Apache Beam, inseriscilo in PTransform
, che si trova all'interno di DoFn
, e utilizza uno dei linguaggi standard dell'SDK per definire la pipeline stessa.
Ti consigliamo di utilizzare l'SDK Apache Beam per Python per definire la pipeline, poiché l'SDK Python ha una classe di utilità che semplifica l'utilizzo di altro codice. Tuttavia, puoi utilizzare gli altri SDK Apache Beam.
Puoi utilizzare il codice per eseguire esperimenti rapidi senza richiedere una compilazione completa. Per un sistema di produzione, in genere crei i tuoi binari, il che ti consente di ottimizzare il processo in base alle tue esigenze.
Il seguente diagramma illustra i due utilizzi dei dati della pipeline:
- I dati vengono utilizzati per gestire il processo.
- I dati vengono acquisiti durante l'elaborazione e uniti ai dati del conducente.
In questa pagina, i dati principali (provenienti dall'origine) sono indicati come dati principali e i dati secondari (provenienti dalla fase di elaborazione) sono indicati come dati uniti.
In un caso d'uso finanziario, i dati alla base potrebbero essere alcune centinaia di migliaia di operazioni. Ogni transazione deve essere elaborata insieme ai dati di mercato. In questo caso, i dati di mercato sono i dati di unione. In un caso d'uso multimediale, i dati di guida potrebbero essere file di immagini che richiedono l'elaborazione, ma non richiedono altre origini dati e quindi non utilizzano i dati di unione.
Considerazioni sulle dimensioni dei dati di guida
Se le dimensioni dell'elemento di dati principali rientrano nell'intervallo di pochi megabyte, gestiscilo con il normale paradigma Apache Beam di creazione di un oggetto PCollection
dall'origine e invio dell'oggetto alle trasformazioni Apache Beam per l'elaborazione.
Se le dimensioni dell'elemento di dati di guida sono superiori a 10 megabyte o in gigabyte, come è tipico per i contenuti multimediali, puoi inserire i dati di guida in Cloud Storage. Poi, nell'oggetto PCollection
iniziale, fai riferimento all'URI dello spazio di archiviazione e solo a un riferimento URI ai dati utilizzati.
Considerazioni sulle dimensioni per l'unione dei dati
Se i dati di unione sono pari o inferiori a poche centinaia di megabyte, utilizza un input secondario per inviarli alle trasformazioni Apache Beam. L'input laterale invia il pacchetto di dati a ogni worker che ne ha bisogno.
Se i dati di unione rientrano nell'intervallo di gigabyte o terabyte, utilizza Bigtable o Cloud Storage per unire i dati di unione ai dati di guida, a seconda della natura dei dati. Bigtable è ideale per gli scenari finanziari in cui si accede spesso ai dati di mercato come ricerche chiave-valore da Bigtable. Per ulteriori informazioni sulla progettazione dello schema Bigtable, inclusi i consigli per lavorare con i dati delle serie temporali, consulta la seguente documentazione di Bigtable:
Esegui il codice esterno
Puoi eseguire codice esterno in Apache Beam in molti modi.
Crea un processo chiamato da un oggetto
DoFn
all'interno di una trasformazione Dataflow.Utilizza JNI con l'SDK Java.
Crea un sottoprocesso direttamente dall'oggetto
DoFn
. Anche se questo approccio non è il più efficiente, è affidabile e semplice da implementare. A causa dei potenziali problemi relativi all'utilizzo di JNI, questa pagina mostra l'utilizzo di una chiamata di sottoprocesso.
Quando progetti il flusso di lavoro, prendi in considerazione la pipeline end-to-end completa. Eventuali inefficienze nel modo in cui viene eseguito il processo sono compensate dal fatto che lo spostamento dei dati dall'origine fino al destino viene eseguito con una singola pipeline. Se confronti questo approccio con altri, tieni conto dei tempi e dei costi end-to-end della pipeline.
Estrai i binari negli host
Quando utilizzi un linguaggio Apache Beam nativo, l'SDK Apache Beam sposta automaticamente tutto il codice necessario nei worker. Tuttavia, quando effettui una chiamata a codice esterno, devi spostare il codice manualmente.
Per spostare il codice: L'esempio mostra i passaggi per l'SDK Apache Beam Java.
- Archivia il codice esterno compilato, insieme alle informazioni sul controllo delle versioni, in Cloud Storage.
- Nel metodo
@Setup
, crea un blocco sincronizzato per verificare se il file di codice è disponibile nella risorsa locale. Anziché implementare un controllo fisico, puoi confermare la disponibilità utilizzando una variabile statica al termine del primo thread. - Se il file non è disponibile, utilizza la libreria client di Cloud Storage per recuperare il file dal bucket Cloud Storage nel worker locale. Un approccio consigliato è utilizzare la classe Apache Beam
FileSystems
per questa attività. - Dopo aver spostato il file, verifica che il bit di esecuzione sia impostato sul file di codice.
- In un sistema di produzione, controlla l'hash dei file binari per assicurarti che il file sia stato copiato correttamente.
Anche l'utilizzo della funzione Apache Beam
filesToStage
è un'opzione, ma rimuove alcuni dei vantaggi della capacità del
runner di pacchettizzare e spostare automaticamente il codice Java. Inoltre, poiché la chiamata al sottoprocesso richiede una posizione del file assoluta, devi utilizzare il codice per determinare il percorso della classe e quindi la posizione del file spostato da filesToStage
. Sconsigliamo questo approccio.
Esegui i binari esterni
Prima di poter eseguire il codice esterno, devi creare un wrapper. Scrivi questo wrapper nello stesso linguaggio del codice esterno (ad esempio C++) o come uno script shell. Il wrapper ti consente di passare handle dei file e implementare ottimizzazioni come descritto nella sezione Progettare l'elaborazione per cicli CPU ridotti di questa pagina. L'wrapper non deve essere sofisticato. Il seguente snippet mostra un'ossatura di un wrapper in C++.
int main(int argc, char* argv[])
{
if(argc < 3){
std::cerr << "Required return file and data to process" << '\n';
return 1;
}
std::string returnFile = argv[1];
std::string word = argv[2];
std::ofstream myfile;
myfile.open (returnFile);
myfile << word;
myfile.close();
return 0;
}
Questo codice legge due parametri dall'elenco degli argomenti. Il primo parametro è la posizione del file di ritorno in cui vengono inviati i dati. Il secondo parametro è costituito dai dati che il codice restituisce all'utente. Nelle implementazioni reali, questo codice farebbe molto di più che stampare "Hello, world".
Dopo aver scritto il codice wrapper, esegui il codice esterno nel seguente modo:
- Trasmetti i dati ai file binari del codice esterno.
- Esegui i file binari, rileva eventuali errori e registra gli errori e i risultati.
- Gestisci le informazioni di logging.
- Acquisisci i dati dall'elaborazione completata.
Trasmettere i dati ai file binari
Per avviare la procedura di esecuzione della libreria, trasmetti i dati al codice C++. In questo passaggio puoi sfruttare l'integrazione di Dataflow con altri strumenti Google Cloud. Uno strumento come Bigtable può gestire set di dati molto grandi e gestire l'accesso a bassa latenza e ad alta concorrenza, il che consente a migliaia di core di accedere contemporaneamente al set di dati. Inoltre, Bigtable può pre-elaborare i dati, consentendo la loro definizione, l'arricchimento e il filtraggio. Tutto questo lavoro può essere svolto nelle trasformazioni di Apache Beam prima di eseguire il codice esterno.
Per un sistema di produzione, il percorso consigliato è utilizzare un buffer di protocollo per incapsulare i dati di input. Puoi convertire i dati di input in byte e codificarli in base64 prima di trasmetterli alla libreria esterna. I due modi per passare questi dati alla libreria esterna sono i seguenti:
- Dati di input di piccole dimensioni. Per i dati di piccole dimensioni che non superano la lunghezza massima del sistema per un argomento del comando, passa l'argomento nella posizione 2 della procedura in fase di creazione con
java.lang.ProcessBuilder
. - Dati di input di grandi dimensioni. Per dimensioni dei dati maggiori, crea un file il cui nome include un UUID per contenere i dati richiesti dalla procedura.
Esegui il codice C++, rileva gli errori e genera log
La cattura e la gestione delle informazioni sugli errori è una parte fondamentale della pipeline. Le risorse utilizzate dal runner Dataflow sono effimere ed è spesso difficile ispezionare i file di log dei worker. Devi assicurarti di acquisire e inviare tutte le informazioni utili al logging del runner Dataflow e di archiviare i dati di logging in uno o più bucket Cloud Storage.
L'approccio consigliato è reindirizzare stdout
e stderr
a file, il che consente di evitare problemi di esaurimento della memoria. Ad esempio, nel programma di esecuzione di Dataflow che chiama il codice C++, puoi includere righe come la seguente:
Java
import java.lang.ProcessBuilder.Redirect;
...
processbuilder.redirectError(Redirect.appendTo(errfile));
processbuilder.redirectOutput(Redirect.appendTo(outFile));
Python
# Requires Apache Beam 2.34 or later.
stopping_times, bad_values = (
integers
| beam.Map(collatz.total_stopping_time).with_exception_handling(
use_subprocess=True))
# Write the bad values to a side channel.
bad_values | 'WriteBadValues' >> beam.io.WriteToText(
os.path.splitext(output_path)[0] + '-bad.txt')
Gestire le informazioni di logging
Molti casi d'uso richiedono l'elaborazione di milioni di elementi. L'elaborazione corretta genera log con scarso o nessun valore, pertanto devi prendere una decisione aziendale in merito alla conservazione dei dati dei log. Ad esempio, prendi in considerazione queste alternative per conservare tutti i dati dei log:
- Se le informazioni contenute nei log relativi all'elaborazione corretta degli elementi non sono importanti, non conservarle.
- Crea una logica che esegue il campionamento dei dati dei log, ad esempio il campionamento ogni 10.000 voci di log. Se l'elaborazione è omogenea, ad esempio quando molte iterazioni del codice generano dati di log essenzialmente identici, questo approccio offre un equilibrio efficace tra il mantenimento dei dati di log e l'ottimizzazione dell'elaborazione.
Per le condizioni di errore, la quantità di dati scaricati nei log potrebbe essere elevata. Una strategia efficace per gestire grandi quantità di dati dei log di errore è leggere le prime righe della voce di log e inviare solo queste righe a Cloud Logging. Puoi caricare il resto del file di log nei bucket Cloud Storage. Questo approccio ti consente di esaminare in un secondo momento le prime righe dei log degli errori e, se necessario, fare riferimento a Cloud Storage per l'intero file.
È utile anche controllare le dimensioni del file di log. Se le dimensioni del file sono pari a zero, puoi ignorarlo tranquillamente o registrare un semplice messaggio di log che indica che il file non conteneva dati.
Acquisisci i dati dall'elaborazione completata
Non è consigliabile utilizzare stdout
per passare il risultato del calcolo alla funzione DoFn
. Anche altro codice chiamato dal codice C++ e persino il tuo codice potrebbe inviare messaggi a stdout
, contaminando lo stream stdoutput
che altrimenti contiene i dati di logging. È invece consigliabile apportare una modifica al codice del wrapper C++ per consentire al codice di accettare un parametro che indichi dove creare il file che memorizza il valore. Idealmente, questo
file deve essere archiviato in modo indipendente dal linguaggio utilizzando
protocolli buffer,
che consente al codice C++ di restituire un oggetto al codice Java o Python. L'oggetto DoFn
può leggere il risultato direttamente dal file e passare le informazioni sul risultato alla propria chiamata output
.
L'esperienza ha dimostrato l'importanza di eseguire test delle unità relativi al procedura stessa. È importante implementare un test unitario che esegua il processo indipendentemente dalla pipeline Dataflow. Il debugging della biblioteca può essere eseguito in modo molto più efficiente se è autonoma e non deve eseguire l'intera pipeline.
Progetta l'elaborazione per cicli CPU brevi
La chiamata di un sottoprocesso comporta un overhead. A seconda del carico di lavoro, potresti dover svolgere un lavoro aggiuntivo per ridurre il rapporto tra il lavoro svolto e il carico amministrativo di avvio e arresto del processo.
Nel caso d'uso dei media, le dimensioni dell'elemento di dati di guida potrebbero essere molto elevate, in megabyte o gigabyte. Di conseguenza, l'elaborazione di ogni elemento di dato può richiedere molti minuti. In questo caso, il costo della chiamata al sottoprocesso è irrilevante rispetto al tempo di elaborazione complessivo. L'approccio migliore in questa situazione è fare in modo che un singolo elemento avvii il proprio processo.
Tuttavia, in altri casi d'uso, come la finanza, l'elaborazione richiede unità di tempo della CPU molto piccole (decine di millisecondi). In questo caso, il sovraccarico di chiamata del sottoprocesso è sproporzionatamente elevato. Una soluzione a questo
problema è utilizzare la trasformazione
GroupByKey
di Apache Beam per creare batch di elementi tra 50 e 100 da inserire nel
processo. Ad esempio, puoi procedere nel seguente modo:
- In una funzione
DoFn
crea una coppia chiave-valore. Se stai elaborando operazioni finanziarie, puoi utilizzare il numero di transazione come chiave. Se non hai un numero unico da utilizzare come chiave, puoi generare un checksum dai dati e utilizzare una funzione modulo per creare partizioni di 50 elementi. - Invia la chiave a una funzione
GroupByKey.create
, che restituisce una raccoltaKV<key,Iterable<data>>
contenente i 50 elementi che puoi inviare al processo.
Limita il parallelismo dei worker
Quando utilizzi un linguaggio supportato in modo nativo nel programma di esecuzione Dataflow, non devi mai pensare a cosa sta succedendo al worker. Dataflow ha molti processi che supervisionano il controllo del flusso e i thread in modalità batch o stream.
Tuttavia, se utilizzi un linguaggio esterno come C++, tieni presente che lo stai facendo qualcosa di un po' fuori dall'ordinario avviando sottoprocessi. In modalità batch, il programma di esecuzione Dataflow utilizza un rapporto ridotto tra thread di lavoro e CPU rispetto alla modalità di streaming. Ti consigliamo, soprattutto in modalità di streaming, di creare un semaforo all'interno della classe per controllare in modo più diretto il parallelismo di un singolo utente.
Ad esempio, con l'elaborazione multimediale, potresti non volere che centinaia di elementi di transcodifica vengano elaborati in parallelo da un singolo worker. In questi casi,
puoi creare una classe di utilità che fornisca i permessi alla funzione DoFn
per
il lavoro da svolgere. L'utilizzo di questa classe ti consente di assumere il controllo diretto dei thread worker all'interno della pipeline.
Utilizzare i sink di dati ad alta capacità in Google Cloud
Una volta elaborati, i dati vengono inviati a un'area di destinazione dei dati. L'area di destinazione deve essere in grado di gestire il volume di risultati creati dalla soluzione di elaborazione della griglia.
Il seguente diagramma mostra alcuni dei sink disponibili in Google Cloud quando Dataflow esegue un carico di lavoro della griglia.
Bigtable, BigQuery e Pub/Sub possono gestire flussi di dati di grandi dimensioni. Ad esempio, ogni nodo Bigtable può gestire 10.000 inserimenti al secondo di dimensioni fino a 1 KB con una facile scalabilità orizzontale. Di conseguenza, un cluster Bigtable di 100 nodi può assorbire 1.000.000 di messaggi al secondo generati dalla griglia Dataflow.
Gestire gli errori di segfault
Quando utilizzi codice C++ all'interno di una pipeline, devi decidere come gestire i segfault, perché hanno ripercussioni non locali se non vengono gestiti correttamente. Il programma di esecuzione Dataflow crea i processi in base alle esigenze in Java, Python o Go, quindi assegna il lavoro ai processi sotto forma di bundle.
Se la chiamata al codice C++ viene eseguita utilizzando strumenti strettamente accoppiati, come JNI o Cython, e il processo C++ genera errori di segfault, anche il processo di chiamata e la Java Virtual Machine (JVM) si arrestano in modo anomalo. In questo scenario, i punti dati errati non sono rilevabili. Per rilevare i punti dati errati, utilizza un accoppiamento meno stretto, che rimuove i dati errati e consente alla pipeline di continuare. Tuttavia, con un codice C++ maturo sottoposto a test completi su tutte le varianti di dati, puoi utilizzare meccanismi come Cython.
Passaggi successivi
Segui il tutorial per creare una pipeline che utilizza contenitori personalizzati con librerie C++.
Visualizza il codice di esempio per questa pagina nel repository GitHub di Apache Beam.
Scopri di più sulla creazione di pipeline con Apache Beam.