Questa pagina fornisce indicazioni e consigli per eseguire l'upgrade delle pipeline di streaming. Ad esempio, potresti dover eseguire l'upgrade a una versione più recente dell'SDK Apache Beam o potresti voler aggiornare il codice della pipeline. Sono disponibili opzioni diverse per adattarsi a scenari diversi.
Mentre le pipeline batch si arrestano al termine del job, le pipeline di streaming spesso vengono eseguite in modo continuo per fornire un'elaborazione ininterrotta. Pertanto, quando esegui l'upgrade delle pipeline di streaming, devi tenere conto delle seguenti considerazioni:
- Potresti dover ridurre al minimo o evitare interruzioni della pipeline. In alcuni casi, potresti essere in grado di tollerare un'interruzione temporanea dell'elaborazione durante il deployment di una nuova versione di una pipeline. In altri casi, la tua applicazione potrebbe non essere in grado di tollerare interruzioni.
- I processi di aggiornamento della pipeline devono gestire le modifiche allo schema in modo da minimizzare l'interruzione dell'elaborazione dei messaggi e di altri sistemi collegati. Ad esempio, se lo schema dei messaggi in una pipeline di elaborazione degli eventi cambia, potrebbero essere necessarie modifiche dello schema anche nei sink di dati a valle.
Puoi utilizzare uno dei seguenti metodi per aggiornare le pipeline di streaming, a seconda della pipeline e dei requisiti di aggiornamento:
Per ulteriori informazioni sui problemi che potresti riscontrare durante un update e su come evitarli, consulta Convalidare un job sostitutivo e Controllo di compatibilità dei job.
Best practice
- Esegui l'upgrade della versione dell'SDK Apache Beam separatamente da eventuali modifiche al codice della pipeline.
- Testa la pipeline dopo ogni modifica prima di apportare aggiornamenti aggiuntivi.
- Esegui regolarmente l'upgrade della versione dell'SDK Apache Beam utilizzata dalla pipeline.
Eseguire aggiornamenti in volo
Puoi aggiornare alcune pipeline di streaming in corso senza interrompere il job. Questo scenario è chiamato aggiornamento di un job in esecuzione. Gli aggiornamenti dei job in esecuzione sono disponibili solo in circostanze limitate:
- Il job deve utilizzare Streaming Engine.
- Il job deve essere in stato di esecuzione.
- Modificherai solo il numero di worker utilizzati dal job.
Per ulteriori informazioni, consulta Impostare l'intervallo di scalabilità automatica nella pagina Scalabilità automatica orizzontale.
Per istruzioni su come eseguire un aggiornamento di un job in esecuzione, consulta Aggiornare una pipeline esistente.
Avvia un job sostitutivo
Se il job aggiornato è compatibile con quello esistente, puoi aggiornare la pipeline utilizzando l'opzione update
. Quando sostituisci un job esistente, un nuovo job esegue il codice della pipeline aggiornato.
Il servizio Dataflow mantiene il nome del job, ma esegue il job di sostituzione con un ID job aggiornato. Questo processo potrebbe causare un tempo di riposo
mentre il job esistente si arresta, viene eseguito il controllo di compatibilità e viene avviato il nuovo job. Per maggiori dettagli, consulta
Effetti della sostituzione di un job.
Dataflow esegue un controllo di compatibilità per assicurarsi che il codice della pipeline aggiornato possa essere disegnato in sicurezza nella pipeline in esecuzione. Alcune modifiche al codice causano il fallimento del controllo di compatibilità, ad esempio quando gli input laterali vengono aggiunti o rimossi da un passaggio esistente. Se il controllo di compatibilità non va a buon fine, non puoi eseguire un aggiornamento in situ del job.
Per istruzioni su come avviare un job sostitutivo, consulta Avvia un job sostitutivo.
Se l'aggiornamento della pipeline non è compatibile con il job corrente, devi interrompere e sostituire la pipeline. Se la pipeline non può tollerare il tempo di riposo, esegui pipeline parallele.
Arrestare e sostituire le pipeline
Se puoi interrompere temporaneamente l'elaborazione, puoi annullare o svuotare la pipeline, quindi sostituirla con quella aggiornata. L'annullamento di una pipeline fa sì che Dataflow interrompa immediatamente l'elaborazione e arresti le risorse il più rapidamente possibile, il che può causare una certa perdita di dati in fase di elaborazione, noti come dati in transito. Per evitare la perdita di dati, nella maggior parte dei casi lo svuotamento è l'azione preferita. Puoi anche utilizzare gli snapshot di Dataflow per salvare lo stato di una pipeline di inserimento flussi, consentendoti di avviare una nuova versione del job Dataflow senza perdere lo stato. Per ulteriori informazioni, consulta Utilizzare gli snapshot di Dataflow.
Lo svuotamento di una pipeline chiude immediatamente tutte le finestre in elaborazione e attiva tutti gli attivatori. Sebbene i dati in transito non vadano persi, lo svuotamento potrebbe causare dati incomplete nelle finestre. In questo caso, le finestre in-process emettono risultati parziali o incompleti. Per saperne di più, consulta la sezione Effetti dello svuotamento di un job. Al termine del job esistente, avvia un nuovo job in modalità flusso che contenga il codice aggiornato della pipeline, in modo da riprendere l'elaborazione.
Con questo metodo, si verifica un certo tempo di riposo tra il momento in cui si interrompe il job di streaming esistente e il momento in cui la pipeline sostitutiva è pronta a riprendere l'elaborazione dei dati. Tuttavia, annullare o svuotare una pipeline esistente e poi lanciare un nuovo job con la pipeline aggiornata è meno complicato rispetto all'esecuzione di pipeline parallele.
Per istruzioni più dettagliate, consulta Svuotare un job Dataflow. Dopo aver svuotato il job corrente, avvia un nuovo job con lo stesso nome.
Nuovo trattamento dei messaggi con snapshot e ricerca di Pub/Sub
In alcuni casi, dopo aver sostituito o annullato una pipeline svuotata, potrebbe essere necessario rielaborare i messaggi Pub/Sub inviati in precedenza. Ad esempio, potresti dover utilizzare una logica aziendale aggiornata per rielaborare i dati. La ricerca Pub/Sub è una funzionalità che consente di riprodurre i messaggi da uno snapshot Pub/Sub. Puoi utilizzare la ricerca Pub/Sub con Dataflow per rielaborare i messaggi dal momento in cui viene creato lo snapshot della sottoscrizione.
Durante lo sviluppo e i test, puoi anche utilizzare la ricerca Pub/Sub per riprodurre ripetutamente i messaggi noti al fine di verificare l'output della pipeline. Quando utilizzi Pub/Sub Seek, non cercare uno snapshot dell'abbonamento quando l'abbonamento viene utilizzato da una pipeline. In questo caso, la ricerca può invalidare la logica della marcatura temporale di Dataflow e potrebbe influire sull'elaborazione "exactly-once" dei messaggi Pub/Sub.
Un flusso di lavoro consigliato per gcloud CLI per l'utilizzo di Pub/Sub Seek con le pipeline Dataflow in una finestra del terminale è il seguente:
Per creare uno snapshot dell'abbonamento, utilizza il comando
gcloud pubsub snapshots create
:gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
Per svuotare o annullare la pipeline, utilizza il comando
gcloud dataflow jobs drain
o il comandogcloud dataflow jobs cancel
:gcloud dataflow jobs drain JOB_ID
o
gcloud dataflow jobs cancel JOB_ID
Per eseguire la ricerca fino allo snapshot, utilizza il comando
gcloud pubsub subscriptions seek
:gcloud pubsub subscriptions seek SNAPSHOT_NAME
Esegui il deployment di una nuova pipeline che utilizza l'abbonamento.
Esegui pipeline parallele
Se devi evitare interruzioni della pipeline di streaming durante un aggiornamento, esegui pipeline parallele. Crea un nuovo job di streaming con il codice della pipeline aggiornato ed esegui la nuova pipeline in parallelo con quella esistente.
Quando crei la nuova pipeline, utilizza la stessa strategia di definizione delle finestre utilizzata per la pipeline esistente. Lascia in esecuzione la pipeline esistente finché la sua filigrana non supera il timestamp della finestra completa più antica elaborata dalla pipeline aggiornata. Quindi, svuota o annulla la pipeline esistente. La pipeline aggiornata continua a funzionare al suo posto e assume autonomamente il controllo dell'elaborazione.
Il seguente diagramma illustra questa procedura.
Nel diagramma, Pipeline B è il job aggiornato che prende il posto di Pipeline A. Il valore t è il timestamp della finestra completa più antica elaborata dalla pipeline B. Il valore w è la filigrana per la pipeline A. Per semplicità, si assume una filigrana perfetta senza dati in ritardo. L'elaborazione e tempo totale di esecuzione sono rappresentati sull'asse orizzontale. Entrambe le pipeline utilizzano finestre fisse (a cascata) di cinque minuti. I risultati vengono attivati dopo che la filigrana supera la fine di ogni finestra.
Poiché l'output simultaneo si verifica durante il periodo di tempo in cui le due pipeline si sovrappongono, configura le due pipeline in modo che scrivano i risultati in destinazioni diverse. I sistemi a valle possono quindi utilizzare un'astrazione sui due flussi di destinazione, ad esempio una vista del database, per eseguire query sui risultati combinati. Questi sistemi possono anche utilizzare l'astrazione per deduplicare i risultati del periodo in sovrapposizione.
L'esempio seguente descrive l'approccio di utilizzo di una pipeline che legge i dati di input da Pub/Sub, esegue alcune elaborazioni e scrive i risultati in BigQuery.
Nello stato iniziale, la pipeline di streaming esistente (Pipeline A) è in esecuzione e legge i messaggi da un argomento Pub/Sub (Topic) utilizzando una sottoscrizione (Subscription A). I risultati vengonoscritti in una tabella BigQuery (Tabella A). I risultati vengono consumati tramite una vista BigQuery, che funge da facciata per mascherare le modifiche alla tabella sottostante. Questa procedura è un'applicazione di un metodo di progettazione chiamato pattern di facciata. Il seguente diagramma mostra lo stato iniziale.
Crea una nuova sottoscrizione (Abbonamento B) per la pipeline aggiornata. Esegui il deployment della pipeline aggiornata (Pipeline B), che legge dall'argomento Pub/Sub (Topic) utilizzando la sottoscrizione B e scrive in una tabella BigQuery separata (Tabella B). Il seguente diagramma illustra questo flusso.
A questo punto, le pipeline A e B vengono eseguite in parallelo e scrivono i risultati in tabelle separate. Registra il momento t come timestamp della finestra completa più antica elaborata dalla pipeline B.
Quando la filigrana della pipeline A supera il tempo t, svuota la pipeline A. Quando svuoti la pipeline, tutte le finestre aperte vengono chiuse e l'elaborazione dei dati in transito viene completata. Se la pipeline contiene finestre e le finestre complete sono importanti (supponendo che non ci siano dati in ritardo), prima di svuotare la pipeline A, lascia che entrambe le pipeline vengano eseguite fino a quando non hai finestre sovrapposte complete. Interrompi il job di streaming per la pipeline A dopo che tutti i dati in transito sono stati elaborati e scritti nella tabella A. Questo diagramma mostra questa fase.
A questo punto, è in esecuzione solo la pipeline B. Puoi eseguire query da una visualizzazione BigQuery (visualizzazione facciata), che funge da facciata per Tabella A e Tabella B. Per le righe con lo stesso timestamp in entrambe le tabelle, configura la visualizzazione in modo da restituire le righe della tabella B oppure, se le righe non esistono nella tabella B, utilizza la tabella A. Il seguente diagramma mostra la vista (Visualizzazione facciata) che legge sia dalla Tabella A sia dalla Tabella B.
A questo punto, puoi eliminare l'abbonamento A.
Quando vengono rilevati problemi con il deployment di una nuova pipeline, avere pipeline parallele può semplificare il rollback. In questo esempio, potresti mantenere in esecuzione la pipeline A mentre monitori il corretto funzionamento della pipeline B. In caso di problemi con la pipeline B, puoi eseguire il rollback alla pipeline A.
Limitazioni
Questo approccio presenta i seguenti limiti:
- L'esecuzione di due pipeline sullo stesso input è probabile che generi dati duplicati all'output. Il sistema a valle deve essere a conoscenza dei dati duplicati e in grado di tollerarli.
- Quando leggi da un'origine Pub/Sub, l'utilizzo della stessa sottoscrizione per più pipeline non è consigliato e può causare problemi di correttezza. Tuttavia, in alcuni casi d'uso, come le pipeline di estrazione, trasformazione e caricamento (ETL), l'utilizzo dello stesso abbonamento in due pipeline potrebbe ridurre la duplicazione. In questo scenario sono probabili problemi di scalabilità automatica, ma possono essere attenuati utilizzando la funzionalità di aggiornamento dei job in esecuzione. Per ulteriori informazioni, consulta Ottimizzare la scalabilità automatica per le pipeline di streaming Pub/Sub.
- Quando leggi da un'origine Pub/Sub, l'utilizzo di un secondo abbonamento genera duplicati, ma non causa problemi di correttezza dei dati e di scalabilità automatica.
Gestire le mutazioni dello schema
I sistemi di gestione dei dati devono spesso adattarsi alle mutazioni dello schema nel tempo, talvolta a causa di modifiche dei requisiti aziendali e altre volte per motivi tecnici. L'applicazione degli aggiornamenti dello schema in genere richiede un'attenta pianificazione ed esecuzione per evitare interruzioni dei sistemi informativi aziendali.
Prendiamo in considerazione una pipeline che legge i messaggi contenenti payload JSON da un argomento Pub/Sub. La pipeline converte ogni messaggio in un'istanza
TableRow
e poi scrive le righe in una tabella BigQuery. Lo schema
della tabella di output è simile ai messaggi elaborati dalla pipeline.
Nel seguente diagramma, lo schema è denominato Schema A.
Nel tempo, lo schema del messaggio potrebbe subire mutazioni in modi non banali. Ad esempio, i campi vengono aggiunti, rimossi o sostituiti. Schema A si evolve in un nuovo schema. Nella discussione che segue, il nuovo schema è indicato come Schema B. In questo caso, la pipeline A deve essere aggiornata e lo schema della tabella di output deve supportare lo schema B.
Per la tabella di output, puoi eseguire alcune mutazioni dello schema senza downtown.
Ad esempio, puoi aggiungere nuovi campi o allentare le modalità delle colonne, ad esempio passare da REQUIRED
a NULLABLE
, senza tempi di inattività.
In genere, queste mutazioni non influiscono sulle query esistenti. Tuttavia, le mutazioni dello schema che modificano o rimuovono i campi dello schema esistente interrompono le query o causano altre interruzioni. Il seguente approccio consente di apportare modifiche senza richiedere il tempo di riposo.
Separa i dati scritti dalla pipeline in una tabella principale e in una o più tabelle di staging. La tabella principale memorizza i dati storici scritti dalla pipeline. Le tabelle di staging memorizzano l'output più recente della pipeline. Puoi definire una vista di facciata BigQuery sulle tabelle principali e di staging, che consente ai consumatori di eseguire query sia sui dati storici che su quelli aggiornati.
Il seguente diagramma rivede il flusso della pipeline precedente in modo da includere una tabella intermedia (Tabella intermedia A), una tabella principale e una vista di facciata.
Nel flusso rivisto, la pipeline A elabora i messaggi che utilizzano lo schema A e scrive l'output nella tabella di staging A, che ha uno schema compatibile. La tabella principale contiene i dati storici scritti dalle versioni precedenti della pipeline, nonché i risultati uniti periodicamente dalla tabella di staging. I consumatori possono eseguire query su dati aggiornati, inclusi dati storici e in tempo reale, utilizzando la vista della facciata.
Quando lo schema del messaggio passa da Schema A a Schema B, potresti aggiornare il codice della pipeline in modo che sia compatibile con i messaggi che utilizzano Schema B. La pipeline esistente deve essere aggiornata con la nuova implementazione. Se esegui pipeline parallele, puoi assicurarti che l'elaborazione dei dati in streaming continui senza interruzioni. L'interruzione e la sostituzione delle pipeline provocano un'interruzione dell'elaborazione, perché nessuna pipeline è in esecuzione per un determinato periodo di tempo.
La pipeline aggiornata scrive in un'altra tabella intermedia (Tabella intermedia B) che utilizza lo schema B. Puoi utilizzare un flusso di lavoro orchestrato per creare la nuova tabella di staging prima di aggiornare la pipeline. Aggiorna la vista della facciata per includere i risultati della nuova tabella di staging, eventualmente utilizzando un passaggio del flusso di lavoro correlato.
Il seguente diagramma mostra il flusso aggiornato che mostra la tabella di staging B con lo schema B e come la visualizzazione della facciata viene aggiornata per includere i contenuti della tabella principale e di entrambe le tabelle di staging.
Come procedura separata dall'aggiornamento della pipeline, puoi unire le tabelle intermediarie alla tabella principale, periodicamente o in base alle esigenze. Il seguente diagramma mostra come la tabella di staging A viene unita alla tabella principale.
Passaggi successivi
- Consulta la procedura dettagliata per aggiornare una pipeline esistente.