Questa pagina fornisce indicazioni e consigli per l'upgrade delle pipeline di streaming. Ad esempio, potresti dover eseguire l'upgrade a una versione più recente dell'SDK Apache Beam o aggiornare il codice della pipeline. Vengono fornite diverse opzioni per adattarsi a scenari diversi.
Mentre le pipeline batch si arrestano al termine del job, le pipeline di streaming spesso vengono eseguite ininterrottamente per fornire un'elaborazione ininterrotta. Pertanto, quando esegui l'upgrade delle pipeline di streaming, devi tenere conto dei seguenti aspetti:
- Potresti dover ridurre al minimo o evitare interruzioni alla pipeline. In alcuni casi, potresti essere in grado di tollerare un'interruzione temporanea dell'elaborazione durante il deployment di una nuova versione di una pipeline. In altri casi, la tua applicazione potrebbe non essere in grado di tollerare interruzioni.
- I processi di aggiornamento della pipeline devono gestire le modifiche dello schema in modo da ridurre al minimo le interruzioni dell'elaborazione dei messaggi e degli altri sistemi collegati. Ad esempio, se lo schema dei messaggi in una pipeline di elaborazione degli eventi cambia, potrebbero essere necessarie modifiche dello schema anche nei sink di dati downstream.
Puoi utilizzare uno dei seguenti metodi per aggiornare le pipeline di streaming, a seconda della pipeline e dei requisiti di aggiornamento:
Per saperne di più sui problemi che potresti riscontrare durante un aggiornamento e su come prevenirli, consulta Convalida di un job di sostituzione e Controllo di compatibilità del job.
Best practice
- Esegui l'upgrade della versione dell'SDK Apache Beam separatamente da qualsiasi modifica del codice della pipeline.
- Testa la pipeline dopo ogni modifica prima di apportare aggiornamenti aggiuntivi.
- Esegui regolarmente l'upgrade della versione dell'SDK Apache Beam utilizzata dalla pipeline.
- Utilizza metodi automatizzati ove possibile, ad esempio aggiornamenti in volo o aggiornamenti automatici della pipeline parallela.
Esecuzione di aggiornamenti in volo
Puoi aggiornare alcune pipeline di streaming in corso senza interrompere il job. Questo scenario è chiamato aggiornamento del job in corso. Gli aggiornamenti dei job in volo sono disponibili solo in circostanze limitate:
- Il job deve utilizzare Streaming Engine.
- Il job deve essere in esecuzione.
- Stai modificando solo il numero di worker utilizzati dal job.
Per maggiori informazioni, consulta Impostare l'intervallo di scalabilità automatica nella pagina Scalabilità automatica orizzontale.
Per istruzioni su come eseguire un aggiornamento del job in volo, consulta Aggiornare una pipeline esistente.
Avviare un job di sostituzione
Se il job aggiornato è compatibile con quello esistente, puoi aggiornare la pipeline utilizzando l'opzione update
. Quando sostituisci un job esistente, un nuovo
job esegue il codice della pipeline aggiornato.
Il servizio Dataflow conserva il nome del job, ma esegue il job di sostituzione con un ID job aggiornato. Questo processo potrebbe causare tempi di inattività
mentre il job esistente si arresta, viene eseguito il controllo di compatibilità e il nuovo job
inizia. Per maggiori dettagli, vedi
Effetti della sostituzione di un job.
Dataflow esegue un controllo di compatibilità per assicurarsi che il codice della pipeline aggiornato possa essere implementato in sicurezza nella pipeline in esecuzione. Alcune modifiche al codice causano l'esito negativo del controllo di compatibilità, ad esempio quando gli input laterali vengono aggiunti o rimossi da un passaggio esistente. Se il controllo di compatibilità non va a buon fine, non puoi eseguire un aggiornamento in loco del job.
Per istruzioni su come avviare un job di sostituzione, consulta Avviare un job di sostituzione.
Se l'aggiornamento della pipeline non è compatibile con il job corrente, devi interrompere e sostituire la pipeline. Se la pipeline non può tollerare tempi di inattività, esegui pipeline parallele.
Arresta e sostituisci le pipeline
Se puoi interrompere temporaneamente l'elaborazione, puoi annullare o svuotare la pipeline e poi sostituirla con quella aggiornata. L'annullamento di una pipeline fa sì che Dataflow interrompa immediatamente l'elaborazione e chiuda le risorse il più rapidamente possibile, il che può causare la perdita di alcuni dati in fase di elaborazione, noti come dati in transito. Per evitare la perdita di dati, nella maggior parte dei casi lo svuotamento è l'azione preferita. Puoi anche utilizzare gli snapshot di Dataflow per salvare lo stato di una pipeline di streaming, il che ti consente di avviare una nuova versione del job Dataflow senza perdere lo stato. Per saperne di più, consulta Utilizzare gli snapshot di Dataflow.
Lo svuotamento di una pipeline chiude immediatamente tutte le finestre in corso di elaborazione e attiva tutti i trigger. Sebbene i dati in volo non vengano persi, lo svuotamento potrebbe causare la presenza di dati incompleti nelle finestre. In questo caso, le finestre in-process restituiscono risultati parziali o incompleti. Per saperne di più, consulta la sezione Effetti dello svuotamento di un job. Al termine del job esistente, avvia un nuovo job in modalità flusso che contenga il codice della pipeline aggiornato, in modo da riprendere l'elaborazione.
Con questo metodo, si verifica un periodo di inattività tra il momento in cui il job di streaming esistente si interrompe e quello in cui la pipeline di sostituzione è pronta a riprendere l'elaborazione dei dati. Tuttavia, annullare o svuotare una pipeline esistente e poi avviare un nuovo job con la pipeline aggiornata è meno complicato rispetto all'esecuzione di pipeline parallele.
Per istruzioni più dettagliate, vedi Svuotare un job Dataflow. Dopo aver svuotato il job corrente, avvia un nuovo job con lo stesso nome.
Rielaborazione dei messaggi con Pub/Sub Snapshot e Seek
In alcune situazioni, dopo aver sostituito o annullato una pipeline esaurita, potrebbe essere necessario rielaborare i messaggi Pub/Sub inviati in precedenza. Ad esempio, potresti dover utilizzare una logica di business aggiornata per rielaborare i dati. Pub/Sub Seek è una funzionalità che consente di riprodurre i messaggi da uno snapshot Pub/Sub. Puoi utilizzare Pub/Sub Seek con Dataflow per rielaborare i messaggi dal momento in cui viene creato lo snapshot della sottoscrizione.
Durante lo sviluppo e il test, puoi anche utilizzare Pub/Sub Seek per riprodurre ripetutamente i messaggi noti per verificare l'output della pipeline. Quando utilizzi Pub/Sub Seek, non cercare uno snapshot della sottoscrizione quando la sottoscrizione viene utilizzata da una pipeline. In questo caso, la ricerca può invalidare la logica del watermark di Dataflow e potrebbe influire sull'elaborazione "exactly-once" dei messaggi Pub/Sub.
Un flusso di lavoro gcloud CLI consigliato per l'utilizzo di Pub/Sub Seek con le pipeline Dataflow in una finestra del terminale è il seguente:
Per creare uno snapshot dell'abbonamento, utilizza il comando
gcloud pubsub snapshots create
:gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
Per svuotare o annullare la pipeline, utilizza il comando
gcloud dataflow jobs drain
o il comandogcloud dataflow jobs cancel
:gcloud dataflow jobs drain JOB_ID
o
gcloud dataflow jobs cancel JOB_ID
Per passare allo snapshot, utilizza il comando
gcloud pubsub subscriptions seek
:gcloud pubsub subscriptions seek SNAPSHOT_NAME
Esegui il deployment di una nuova pipeline che utilizza l'abbonamento.
Esegui pipeline parallele
Se devi evitare interruzioni alla pipeline di streaming durante un aggiornamento, puoi eseguire pipeline parallele. Questo approccio ti consente di avviare un nuovo job di streaming con il codice della pipeline aggiornato ed eseguirlo in parallelo con il job esistente. Puoi utilizzare il flusso di lavoro di deployment dell'aggiornamento della pipeline parallela automatizzata di Dataflow oppure eseguire i passaggi manualmente.
Panoramica delle pipeline parallele
Quando crei la nuova pipeline, utilizza la stessa strategia di finestre che hai utilizzato per la pipeline esistente. Per il flusso di lavoro manuale, lascia che la pipeline esistente continui a essere eseguita finché la filigrana non supera il timestamp della finestra completa meno recente elaborata dalla pipeline aggiornata. Poi, svuota o annulla la pipeline esistente. Se utilizzi il flusso di lavoro automatizzato, questo lavoro viene svolto per te. La pipeline aggiornata continua a essere eseguita al suo posto e gestisce autonomamente l'elaborazione.
Il seguente diagramma illustra questa procedura.
Nel diagramma, Pipeline B è il job aggiornato che sostituisce Pipeline A. Il valore t è il timestamp della prima finestra completa elaborata dalla pipeline B. Il valore w è la filigrana per Pipeline A. Per semplicità, si presuppone una filigrana perfetta senza dati in ritardo. L'elaborazione e tempo totale di esecuzionee sono rappresentati sull'asse orizzontale. Entrambe le pipeline utilizzano finestre fisse (in sequenza) di cinque minuti. I risultati vengono attivati dopo che la filigrana supera la fine di ogni finestra.
Poiché l'output simultaneo si verifica durante il periodo di tempo in cui le due pipeline si sovrappongono, configura le due pipeline in modo che scrivano i risultati in destinazioni diverse. I sistemi downstream possono quindi utilizzare un'astrazione sulle due destinazioni sink, ad esempio una visualizzazione del database, per eseguire query sui risultati combinati. Questi sistemi possono anche utilizzare l'astrazione per deduplicare i risultati del periodo sovrapposto. Per maggiori informazioni, vedi Gestire l'output duplicato.
Limitazioni
L'utilizzo di aggiornamenti della pipeline parallela automatici o manuali presenta le seguenti limitazioni:
- Solo aggiornamenti automatici: il nuovo job parallelo deve essere un job Streaming Engine.
- I nomi dei job precedenti e nuovi devono essere diversi perché i job simultanei con lo stesso nome non sono consentiti.
- L'esecuzione di due pipeline in parallelo sullo stesso input può comportare dati duplicati, aggregazioni parziali e potenziali problemi di ordinamento quando i dati vengono inseriti nel sink. Il sistema downstream deve essere progettato per prevedere e gestire questi risultati.
- Quando leggi da una sorgente Pub/Sub, l'utilizzo della stessa sottoscrizione per più pipeline non è consigliato e può causare problemi di correttezza. Tuttavia, in alcuni casi d'uso, come le pipeline di estrazione, trasformazione e caricamento (ETL), l'utilizzo dello stesso abbonamento in due pipeline potrebbe ridurre la duplicazione. I problemi con la scalabilità automatica sono probabili ogni volta che fornisci un valore diverso da zero per la durata della sovrapposizione. Questo problema può essere mitigato utilizzando la funzionalità di aggiornamento dei job in corso. Per ulteriori informazioni, consulta Ottimizzare la scalabilità automatica per le pipeline di streaming Pub/Sub.
- Per Apache Kafka, puoi ridurre al minimo i duplicati attivando il commit dell'offset in Kafka. Per attivare il commit dell'offset in Kafka, consulta Commit in Kafka.
Aggiornamenti automatici delle pipeline parallele
Dataflow fornisce il supporto API per l'avvio di un job di sostituzione parallela. Questa API in stile dichiarativo astrae il lavoro manuale di esecuzione dei passaggi procedurali. Dichiari il job che vuoi aggiornare e un nuovo job viene eseguito in parallelo con quello precedente. Dopo l'esecuzione del nuovo job per la durata specificata, il vecchio job viene svuotato. Questa funzionalità elimina le pause di elaborazione durante gli aggiornamenti e riduce lo sforzo operativo necessario per aggiornare le pipeline incompatibili.
Questo metodo di aggiornamento è ideale per le pipeline che possono tollerare alcuni duplicati o aggregazioni parziali e non richiedono un ordinamento rigoroso durante l'inserimento dei dati. È
adatto alle pipeline ETL, nonché a quelle che utilizzano la modalità di streaming
almeno una volta e la
trasformazione Redistribute
con l'opzione Consenti duplicati impostata su true
.
Inviare una richiesta di aggiornamento della pipeline parallela automatizzata
Per utilizzare il flusso di lavoro automatizzato, avvia un nuovo job di streaming con le seguenti opzioni di servizio. Devi avviare il nuovo job con un nome diverso da quello del job precedente.
Java
--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
In alternativa, puoi specificare l'ID job del vecchio job:
--dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Python
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
In alternativa, puoi specificare l'ID job del vecchio job:
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Vai
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
In alternativa, puoi specificare l'ID job del vecchio job:
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
gcloud
--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
In alternativa, puoi specificare l'ID job del vecchio job:
--additional-experiments="parallel_replace_job_id=OLD_JOB_ID" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Sostituisci le seguenti variabili:
- Devi fornire
parallel_replace_job_name
oparallel_replace_job_id
per identificare il job da sostituire.OLD_JOB_NAME
: se utilizziparallel_replace_job_name
, il nome del lavoro da sostituire.OLD_JOB_ID
: Se utilizziparallel_replace_job_id
, l'ID del job da sostituire.
Devi specificare un valore
parallel_replace_job_min_parallel_pipelines_duration
.DURATION
: il periodo di tempo minimo durante il quale le due pipeline vengono eseguite in parallelo come numero intero o in virgola mobile. Al termine di questo periodo, al vecchio job viene inviato un segnale di svuotamento.La durata deve essere compresa tra 0 secondi (
0s
) e 31 giorni (744h
). Utilizzas
,m
eh
per specificare secondi, minuti e ore. Ad esempio,10m
è 10 minuti.
Quando avvii il nuovo job, Dataflow attende il provisioning di tutti i worker prima di iniziare a elaborare i dati. Per monitorare lo stato del deployment, controlla i log dei job Dataflow.
Eseguire manualmente pipeline parallele
Per scenari più complessi o quando hai bisogno di un maggiore controllo sul processo di aggiornamento, puoi eseguire manualmente pipeline parallele. Consenti alla pipeline esistente di continuare a essere eseguita finché la filigrana non supera il timestamp della finestra completa meno recente elaborata dalla pipeline aggiornata. Quindi, svuota o annulla la pipeline esistente.
Gestire l'output duplicato
L'esempio seguente descrive un approccio per gestire l'output duplicato. Le due pipeline scrivono l'output in destinazioni diverse, utilizzano sistemi downstream per eseguire query sui risultati e deduplicare i risultati del periodo sovrapposto. Questo esempio utilizza una pipeline che legge i dati di input da Pub/Sub, esegue alcune elaborazioni e scrive i risultati in BigQuery.
Nello stato iniziale, la pipeline di streaming esistente (pipeline A) è in esecuzione e legge i messaggi da un argomento Pub/Sub (argomento) utilizzando una sottoscrizione (sottoscrizione A). I risultati vengono scritti in una tabella BigQuery (Tabella A). I risultati vengono utilizzati tramite una vista BigQuery, che funge da facciata per mascherare le modifiche alla tabella sottostante. Questo processo è un'applicazione di un metodo di progettazione chiamato pattern facciata. Il seguente diagramma mostra lo stato iniziale.
Crea un nuovo abbonamento (Abbonamento B) per la pipeline aggiornata. Esegui il deployment della pipeline aggiornata (pipeline B), che legge dall'argomento Pub/Sub (argomento) utilizzando sottoscrizione B e scrive in una tabella BigQuery separata (tabella B). Il seguente diagramma illustra questo flusso.
A questo punto, Pipeline A e Pipeline B vengono eseguite in parallelo e scrivono i risultati in tabelle separate. Registri l'ora t come timestamp della prima finestra completa elaborata dalla pipeline B.
Quando il watermark della pipeline A supera il tempo t, svuota la pipeline A. Quando svuoti la pipeline, tutte le finestre aperte si chiudono e l'elaborazione dei dati in transito viene completata. Se la pipeline contiene finestre e le finestre complete sono importanti (supponendo che non ci siano dati in ritardo), prima di svuotare la pipeline A, lascia che entrambe le pipeline vengano eseguite finché non hai finestre sovrapposte complete. Arresta il job di streaming per la pipeline A dopo che tutti i dati in transito sono stati elaborati e scritti nella tabella A. Il seguente diagramma mostra questa fase.
A questo punto, è in esecuzione solo Pipeline B. Puoi eseguire query da una vista BigQuery (vista facciata), che funge da facciata per Tabella A e Tabella B. Per le righe che hanno lo stesso timestamp in entrambe le tabelle, configura la visualizzazione in modo che restituisca le righe della tabella B oppure, se le righe non esistono nella tabella B, torna alla tabella A. Il seguente diagramma mostra la visualizzazione (vista facciata) che legge sia dalla Tabella A sia dalla Tabella B.
A questo punto, puoi eliminare l'abbonamento A.
Quando vengono rilevati problemi con il deployment di una nuova pipeline, la presenza di pipeline parallele può semplificare il rollback. In questo esempio, potresti voler mantenere in esecuzione Pipeline A mentre monitori Pipeline B per verificare il corretto funzionamento. Se si verificano problemi con la pipeline B, puoi eseguire il rollback alla pipeline A.
Gestire le mutazioni dello schema
I sistemi di gestione dei dati spesso devono adattarsi alle mutazioni dello schema nel tempo, a volte a causa di modifiche ai requisiti aziendali e altre volte per motivi tecnici. L'applicazione degli aggiornamenti dello schema in genere richiede un'attenta pianificazione ed esecuzione per evitare interruzioni ai sistemi informativi aziendali.
Considera una pipeline che legge i messaggi contenenti payload JSON da un argomento Pub/Sub. La pipeline converte ogni messaggio in un'istanza TableRow
e poi scrive le righe in una tabella BigQuery. Lo schema
della tabella di output è simile ai messaggi elaborati dalla pipeline.
Nel seguente diagramma, lo schema è indicato come Schema A.
Nel tempo, lo schema del messaggio potrebbe subire modifiche non banali. Ad esempio, i campi vengono aggiunti, rimossi o sostituiti. Lo schema A si evolve in un nuovo schema. Nella discussione che segue, il nuovo schema viene chiamato Schema B. In questo caso, è necessario aggiornare la pipeline A e lo schema della tabella di output deve supportare lo schema B.
Per la tabella di output, puoi eseguire alcune mutazioni dello schema senza tempi di inattività.
Ad esempio, puoi aggiungere nuovi campi o rilassare
le modalità delle colonne,
ad esempio modificando REQUIRED
in NULLABLE
, senza tempi di inattività.
Queste mutazioni di solito non influiscono sulle query esistenti. Tuttavia, le mutazioni dello schema che modificano o rimuovono i campi dello schema esistenti interrompono le query o causano altre interruzioni. Il seguente approccio consente di apportare modifiche senza
richiedere tempi di inattività.
Separa i dati scritti dalla pipeline in una tabella principale e in una o più tabelle di gestione temporanea. La tabella principale memorizza i dati storici scritti dalla pipeline. Le tabelle di staging memorizzano l'ultimo output della pipeline. Puoi definire una vista facciata BigQuery sulle tabelle principale e di staging, che consente ai consumatori di eseguire query sui dati storici e aggiornati.
Il seguente diagramma rivede il flusso della pipeline precedente per includere una tabella di staging (Staging Table A), una tabella principale e una visualizzazione facciata.
Nel flusso rivisto, la pipeline A elabora i messaggi che utilizzano lo schema A e scrive l'output nella tabella di staging A, che ha uno schema compatibile. La tabella principale contiene i dati storici scritti dalle versioni precedenti della pipeline, nonché i risultati uniti periodicamente dalla tabella di staging. I consumatori possono eseguire query sui dati aggiornati, inclusi quelli storici e in tempo reale, utilizzando la visualizzazione facciata.
Quando lo schema dei messaggi cambia da Schema A a Schema B, potresti aggiornare il codice della pipeline in modo che sia compatibile con i messaggi che utilizzano lo Schema B. La pipeline esistente deve essere aggiornata con la nuova implementazione. Eseguendo pipeline parallele, puoi assicurarti che l'elaborazione dei dati in streaming continui senza interruzioni. L'interruzione e la sostituzione delle pipeline comportano un'interruzione dell'elaborazione, perché nessuna pipeline è in esecuzione per un periodo di tempo.
La pipeline aggiornata scrive in una tabella di gestione temporanea aggiuntiva (Staging Table B) che utilizza Schema B. Puoi utilizzare un flusso di lavoro orchestrato per creare la nuova tabella di staging prima di aggiornare la pipeline. Aggiorna la visualizzazione della facciata in modo da includere i risultati della nuova tabella di gestione temporanea, utilizzando potenzialmente un passaggio del flusso di lavoro correlato.
Il seguente diagramma mostra il flusso aggiornato che mostra la tabella di staging B con lo schema B e come la visualizzazione facciata viene aggiornata per includere i contenuti della tabella principale e di entrambe le tabelle di staging.
Come processo separato dall'aggiornamento della pipeline, puoi unire le tabelle di staging nella tabella principale, periodicamente o in base alle esigenze. Il seguente diagramma mostra come la tabella di staging A viene unita alla tabella principale.
Passaggi successivi
- Scopri la procedura dettagliata per aggiornare una pipeline esistente.