Questo documento descrive come aggiornare un job di streaming in corso. Potresti voler aggiornare il job Dataflow esistente per i seguenti motivi:
- Vuoi migliorare o perfezionare il codice della pipeline.
- Vuoi correggere i bug nel codice della pipeline.
- Vuoi aggiornare la pipeline per gestire le modifiche al formato dei dati o per tenere conto della versione o di altre modifiche nell'origine dati.
- Vuoi applicare una patch a una vulnerabilità di sicurezza relativa a Container-Optimized OS per tutti i worker Dataflow.
- Vuoi scalare una pipeline Apache Beam di streaming per utilizzare un numero diverso di worker.
Puoi aggiornare i job in due modi:
- Aggiornamento del job in corso: per i job di streaming che utilizzano
Streaming Engine, puoi aggiornare le opzioni
min-num-workers
emax-num-workers
del job senza interrompere il job o modificare l'ID job. - Job di sostituzione: per eseguire il codice della pipeline aggiornato o per aggiornare le opzioni del job che gli aggiornamenti dei job in corso non supportano, avvia un nuovo job che sostituisce quello esistente. Per verificare se un job di sostituzione è valido, prima di avviare il nuovo job, convalida il relativo grafico del job.
Quando aggiorni il job, il servizio Dataflow esegue un controllo di compatibilità tra il job attualmente in esecuzione e il potenziale job sostitutivo. Il controllo di compatibilità garantisce che elementi come le informazioni sullo stato intermedio e i dati memorizzati nel buffer possano essere trasferiti dal job precedente al job di sostituzione.
Puoi anche utilizzare l'infrastruttura di logging integrata dell'SDK Apache Beam
per registrare le informazioni quando aggiorni il job. Per ulteriori informazioni, consulta
Utilizzare i log delle pipeline.
Per identificare i problemi con il codice della pipeline, utilizza il
livello di logging DEBUG
.
- Per istruzioni sull'aggiornamento dei job di streaming che utilizzano modelli classici, consulta Aggiornare un job di streaming con modello personalizzato.
- Per istruzioni sull'aggiornamento dei job di streaming che utilizzano modelli flessibili, segui le istruzioni di gcloud CLI in questa pagina oppure consulta Aggiornare un job del modello flessibile.
Aggiornamento dell'opzione del job in volo
Per un job di streaming che utilizza Streaming Engine, puoi aggiornare le seguenti opzioni del job senza interrompere il job o modificare l'ID job:
min-num-workers
: il numero minimo di istanze Compute Engine.max-num-workers
: il numero massimo di istanze Compute Engine.worker-utilization-hint
: l'utilizzo CPU target, nell'intervallo [0,1, 0,9]
Per altri aggiornamenti dei job, devi sostituire il job attuale con quello aggiornato. Per maggiori informazioni, vedi Avviare un job di sostituzione.
Eseguire un aggiornamento in volo
Per eseguire un aggiornamento dell'opzione di job in volo, segui questi passaggi.
gcloud
Utilizza il comando gcloud dataflow jobs update-options
:
gcloud dataflow jobs update-options \ --region=REGION \ --min-num-workers=MINIMUM_WORKERS \ --max-num-workers=MAXIMUM_WORKERS \ --worker-utilization-hint=TARGET_UTILIZATION \ JOB_ID
Sostituisci quanto segue:
- REGION: l'ID della regione del job
- MINIMUM_WORKERS: il numero minimo di istanze Compute Engine
- MAXIMUM_WORKERS: il numero massimo di istanze Compute Engine
- TARGET_UTILIZATION: un valore nell'intervallo [0,1, 0,9]
- JOB_ID: l'ID del job da aggiornare
Puoi anche aggiornare --min-num-workers
, --max-num-workers
e
worker-utilization-hint
singolarmente.
REST
Utilizza il
metodo projects.locations.jobs.update
:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=MASK { "runtime_updatable_params": { "min_num_workers": MINIMUM_WORKERS, "max_num_workers": MAXIMUM_WORKERS, "worker_utilization_hint": TARGET_UTILIZATION } }
Sostituisci quanto segue:
- MASK: un elenco separato da virgole di parametri da aggiornare, tra i
seguenti:
runtime_updatable_params.max_num_workers
runtime_updatable_params.min_num_workers
runtime_updatable_params.worker_utilization_hint
- PROJECT_ID: l'ID progetto Google Cloud del job Dataflow
- REGION: l'ID della regione del job
- JOB_ID: l'ID del job da aggiornare
- MINIMUM_WORKERS: il numero minimo di istanze Compute Engine
- MAXIMUM_WORKERS: il numero massimo di istanze Compute Engine
- TARGET_UTILIZATION: un valore nell'intervallo [0,1, 0,9]
Puoi anche aggiornare min_num_workers
, max_num_workers
e worker_utilization_hint
singolarmente.
Specifica i parametri da aggiornare nel parametro di query updateMask
e includi i valori aggiornati nel campo runtimeUpdatableParams
del corpo della richiesta. Nell'esempio riportato di seguito viene aggiornato min_num_workers
:
PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": 5 } }
Un job deve essere in stato di esecuzione per essere idoneo agli aggiornamenti in volo. Si verifica un errore se il job non è stato avviato o è già stato annullato. Allo stesso modo, se avvii un job di sostituzione, attendi che inizi l'esecuzione prima di inviare eventuali aggiornamenti in corso al nuovo job.
Dopo aver inviato una richiesta di aggiornamento, ti consigliamo di attendere il completamento della richiesta prima di inviare un altro aggiornamento. Visualizza i log dei job per vedere quando la richiesta viene completata.
Convalida di un job di sostituzione
Per verificare se un job di sostituzione è valido, prima di avviare il nuovo job, convalida il relativo grafico del job. In Dataflow, un grafico del job è una rappresentazione grafica di una pipeline. Con la convalida del grafico del job, riduci il rischio che la pipeline riscontri errori o che non funzioni dopo l'aggiornamento. Inoltre, puoi convalidare gli aggiornamenti senza dover interrompere il job originale, in modo che non subisca tempi di inattività.
Per convalidare il grafico dei job, segui i passaggi per
avviare un job di sostituzione. Includi l'opzione di servizio Dataflow graph_validate_only
nel comando di aggiornamento.
Java
- Passa l'opzione
--update
. - Imposta l'opzione
--jobName
inPipelineOptions
con lo stesso nome del job che vuoi aggiornare. - Imposta l'opzione
--region
sulla stessa regione del job che vuoi aggiornare. - Includi l'opzione di servizio
--dataflowServiceOptions=graph_validate_only
. - Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una
mappatura delle trasformazioni e passarla utilizzando l'opzione
--transformNameMapping
. - Se invii un job di sostituzione che utilizza una versione successiva dell'SDK Apache Beam, imposta
--updateCompatibilityVersion
sulla versione dell'SDK Apache Beam utilizzata nel job originale.
Python
- Passa l'opzione
--update
. - Imposta l'opzione
--job_name
inPipelineOptions
con lo stesso nome del job che vuoi aggiornare. - Imposta l'opzione
--region
sulla stessa regione del job che vuoi aggiornare. - Includi l'opzione di servizio
--dataflow_service_options=graph_validate_only
. - Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una
mappatura delle trasformazioni e passarla utilizzando l'opzione
--transform_name_mapping
. - Se invii un job di sostituzione che utilizza una versione successiva dell'SDK Apache Beam, imposta
--updateCompatibilityVersion
sulla versione dell'SDK Apache Beam utilizzata nel job originale.
Vai
- Passa l'opzione
--update
. - Imposta l'opzione
--job_name
con lo stesso nome del job che vuoi aggiornare. - Imposta l'opzione
--region
sulla stessa regione del job che vuoi aggiornare. - Includi l'opzione di servizio
--dataflow_service_options=graph_validate_only
. - Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una
mappatura delle trasformazioni e passarla utilizzando l'opzione
--transform_name_mapping
.
gcloud
Per convalidare il grafico dei job per un job del modello flessibile, utilizza il comando
gcloud dataflow flex-template run
con l'opzione additional-experiments
:
- Passa l'opzione
--update
. - Imposta JOB_NAME con lo stesso nome del job che vuoi aggiornare.
- Imposta l'opzione
--region
sulla stessa regione del job che vuoi aggiornare. - Includi l'opzione
--additional-experiments=graph_validate_only
. - Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una
mappatura delle trasformazioni e passarla utilizzando l'opzione
--transform-name-mappings
.
Ad esempio:
gcloud dataflow flex-template run JOB_NAME --additional-experiments=graph_validate_only
Sostituisci JOB_NAME con il nome del job che vuoi aggiornare.
REST
Utilizza il campo additionalExperiments
nell'oggetto
FlexTemplateRuntimeEnvironment
(modelli flessibili) o
RuntimeEnvironment
.
{
additionalExperiments : ["graph_validate_only"]
...
}
L'opzione di servizio graph_validate_only
convalida solo gli aggiornamenti della pipeline. Non utilizzare questa opzione durante la creazione o
l'avvio delle pipeline. Per aggiornare la pipeline,
avvia un job di sostituzione senza l'opzione di servizio
graph_validate_only
.
Quando la convalida del grafico del job ha esito positivo, lo stato del job e i log del job mostrano i seguenti stati:
- Lo stato del job è
JOB_STATE_DONE
. - Nella console Google Cloud , lo Stato del job
è
Succeeded
. Nei log dei job viene visualizzato il seguente messaggio:
Workflow job: JOB_ID succeeded validation. Marking graph_validate_only job as Done.
Quando la convalida del grafico dei job non va a buon fine, lo stato del job e i log del job mostrano i seguenti stati:
- Lo stato del job è
JOB_STATE_FAILED
. - Nella console Google Cloud , lo Stato del job
è
Failed
. - Nei log dei job viene visualizzato un messaggio che descrive l'errore di incompatibilità. Il contenuto del messaggio dipende dall'errore.
Avviare un job di sostituzione
Potresti sostituire un job esistente per i seguenti motivi:
- Per eseguire il codice della pipeline aggiornato.
- Per aggiornare le opzioni dei job che non supportano gli aggiornamenti in volo.
Per verificare se un job di sostituzione è valido, prima di avviare il nuovo job, convalida il relativo grafico del job.
Quando avvii un job di sostituzione, imposta le seguenti opzioni della pipeline per eseguire il processo di aggiornamento oltre alle normali opzioni del job:
Java
- Passa l'opzione
--update
. - Imposta l'opzione
--jobName
inPipelineOptions
con lo stesso nome del job che vuoi aggiornare. - Imposta l'opzione
--region
sulla stessa regione del job che vuoi aggiornare. - Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una
mappatura delle trasformazioni e passarla utilizzando l'opzione
--transformNameMapping
. - Se invii un job di sostituzione che utilizza una versione successiva dell'SDK Apache Beam, imposta
--updateCompatibilityVersion
sulla versione dell'SDK Apache Beam utilizzata nel job originale.
Python
- Passa l'opzione
--update
. - Imposta l'opzione
--job_name
inPipelineOptions
con lo stesso nome del job che vuoi aggiornare. - Imposta l'opzione
--region
sulla stessa regione del job che vuoi aggiornare. - Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una
mappatura delle trasformazioni e passarla utilizzando l'opzione
--transform_name_mapping
. - Se invii un job di sostituzione che utilizza una versione successiva dell'SDK Apache Beam, imposta
--updateCompatibilityVersion
sulla versione dell'SDK Apache Beam utilizzata nel job originale.
Vai
- Passa l'opzione
--update
. - Imposta l'opzione
--job_name
con lo stesso nome del job che vuoi aggiornare. - Imposta l'opzione
--region
sulla stessa regione del job che vuoi aggiornare. - Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una
mappatura delle trasformazioni e passarla utilizzando l'opzione
--transform_name_mapping
.
gcloud
Per aggiornare un job Flex Template utilizzando gcloud CLI, utilizza il
comando gcloud dataflow flex-template run
. L'aggiornamento di altri job utilizzando gcloud CLI non è supportato.
- Passa l'opzione
--update
. - Imposta JOB_NAME con lo stesso nome del job che vuoi aggiornare.
- Imposta l'opzione
--region
sulla stessa regione del job che vuoi aggiornare. - Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una
mappatura delle trasformazioni e passarla utilizzando l'opzione
--transform-name-mappings
.
REST
Queste istruzioni mostrano come aggiornare i job non basati su modelli utilizzando l'API REST. Per utilizzare l'API REST per aggiornare un job modello classico, consulta Aggiornare un job di streaming modello personalizzato. Per utilizzare l'API REST per aggiornare un job del modello flessibile, consulta Aggiornare un job del modello flessibile.
Recupera la risorsa
job
per il job che vuoi sostituire utilizzando il metodoprojects.locations.jobs.get
. Includi il parametro di queryview
con il valoreJOB_VIEW_DESCRIPTION
. L'inclusione diJOB_VIEW_DESCRIPTION
limita la quantità di dati nella risposta in modo che la richiesta successiva non superi i limiti di dimensioni. Se hai bisogno di informazioni più dettagliate sul job, utilizza il valoreJOB_VIEW_ALL
.GET https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?view=JOB_VIEW_DESCRIPTION
Sostituisci i seguenti valori:
- PROJECT_ID: l'ID progetto Google Cloud del job Dataflow
- REGION: la regione del job che vuoi aggiornare
- JOB_ID: l'ID del job che vuoi aggiornare
Per aggiornare il job, utilizza il metodo
projects.locations.jobs.create
. Nel corpo della richiesta, utilizza la risorsajob
che hai recuperato.POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs { "id": JOB_ID, "replaceJobId": JOB_ID, "name": JOB_NAME, "type": "JOB_TYPE_STREAMING", "transformNameMapping": { string: string, ... }, }
Sostituisci quanto segue:
- JOB_ID: lo stesso ID job dell'ID del job che vuoi aggiornare.
- JOB_NAME: lo stesso nome del job del job che vuoi aggiornare.
Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una mappatura delle trasformazioni e passarla utilizzando il campo
transformNameMapping
.(Facoltativo) Per inviare la richiesta utilizzando curl (Linux, macOS o Cloud Shell), salva la richiesta in un file JSON, quindi esegui il seguente comando:
curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
Sostituisci FILE_PATH con il percorso del file JSON contenente il corpo della richiesta.
Specifica il nome del job di sostituzione
Java
Quando avvii il job di sostituzione, il valore che passi per l'opzione --jobName
deve corrispondere esattamente al nome del job che vuoi sostituire.
Python
Quando avvii il job di sostituzione, il valore che passi per l'opzione --job_name
deve corrispondere esattamente al nome del job che vuoi sostituire.
Vai
Quando avvii il job di sostituzione, il valore che passi per l'opzione --job_name
deve corrispondere esattamente al nome del job che vuoi sostituire.
gcloud
Quando avvii il job di sostituzione, il JOB_NAME deve corrispondere esattamente al nome del job che vuoi sostituire.
REST
Imposta il valore del campo replaceJobId
sullo stesso ID job del job che vuoi
aggiornare. Per trovare il valore corretto del nome del job, seleziona il job precedente nell'interfaccia di monitoraggio di Dataflow.
Poi, nel riquadro laterale Informazioni sul job, trova il campo ID job.
Per trovare il valore corretto del nome del job, seleziona il job precedente nell'interfaccia di monitoraggio di Dataflow. Poi, nel riquadro laterale Informazioni sul job, trova il campo Nome job:

In alternativa, esegui una query su un elenco di job esistenti utilizzando l'interfaccia a riga di comando Dataflow.
Inserisci il comando gcloud dataflow jobs list
nella finestra della shell o del terminale per ottenere un elenco dei job Dataflow nel tuo progetto Google Cloude trova il campo NAME
per il job che vuoi sostituire:
JOB_ID NAME TYPE CREATION_TIME STATE REGION 2020-12-28_12_01_09-yourdataflowjobid ps-topic Streaming 2020-12-28 20:01:10 Running us-central1
Crea una mappatura di trasformazione
Se la pipeline di sostituzione modifica i nomi delle trasformazioni rispetto a quelli della pipeline precedente, il servizio Dataflow richiede un mapping delle trasformazioni. Il mapping di trasformazione mappa le trasformazioni denominate nel codice della pipeline precedente ai nomi nel codice della pipeline di sostituzione.
Java
Passa il mapping utilizzando l'opzione della riga di comando --transformNameMapping
,
utilizzando il seguente formato generale:
--transformNameMapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Devi fornire solo le voci di mappatura in --transformNameMapping
per i nomi delle trasformazioni che sono cambiati tra la pipeline precedente e quella sostitutiva.
Quando esegui --transformNameMapping
,
potresti dover eseguire l'escape
delle virgolette in base alla shell. Ad esempio, in Bash:
--transformNameMapping='{"oldTransform1":"newTransform1",...}'
Python
Passa il mapping utilizzando l'opzione della riga di comando --transform_name_mapping
,
utilizzando il seguente formato generale:
--transform_name_mapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Devi fornire solo le voci di mappatura in --transform_name_mapping
per i nomi delle trasformazioni che sono cambiati tra la pipeline precedente e quella sostitutiva.
Quando esegui --transform_name_mapping
,
potresti dover eseguire l'escape
delle virgolette in base alla shell. Ad esempio, in Bash:
--transform_name_mapping='{"oldTransform1":"newTransform1",...}'
Vai
Passa il mapping utilizzando l'opzione della riga di comando --transform_name_mapping
,
utilizzando il seguente formato generale:
--transform_name_mapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Devi fornire solo le voci di mappatura in --transform_name_mapping
per i nomi delle trasformazioni che sono cambiati tra la pipeline precedente e quella sostitutiva.
Quando esegui --transform_name_mapping
,
potresti dover eseguire l'escape
delle virgolette in base alla shell. Ad esempio, in Bash:
--transform_name_mapping='{"oldTransform1":"newTransform1",...}'
gcloud
Trasferisci la mappatura utilizzando l'opzione --transform-name-mappings
con il seguente formato generale:
--transform-name-mappings= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Devi fornire solo le voci di mappatura in --transform-name-mappings
per i nomi delle trasformazioni che sono cambiati tra la pipeline precedente e quella sostitutiva.
Quando esegui il comando con --transform-name-mappings
,
potresti dover eseguire l'escape delle virgolette in base alla shell. Ad
esempio, in Bash:
--transform-name-mappings='{"oldTransform1":"newTransform1",...}'
REST
Trasferisci la mappatura utilizzando il campo transformNameMapping
con il seguente formato generale:
"transformNameMapping": {
oldTransform1: newTransform1,
oldTransform2: newTransform2,
...
}
Devi fornire solo le voci di mappatura in transformNameMapping
per i nomi delle trasformazioni che sono cambiati tra la pipeline precedente e quella sostitutiva.
Determinare i nomi delle trasformazioni
Il nome della trasformazione in ogni istanza della mappa è il nome che hai fornito quando hai applicato la trasformazione nella pipeline. Ad esempio:
Java
.apply("FormatResults", ParDo
.of(new DoFn<KV<String, Long>>, String>() {
...
}
}))
Python
| 'FormatResults' >> beam.ParDo(MyDoFn())
Vai
// In Go, this is always the package-qualified name of the DoFn itself.
// For example, if the FormatResults DoFn is in the main package, its name
// is "main.FormatResults".
beam.ParDo(s, FormatResults, results)
Puoi anche ottenere i nomi delle trasformazioni per il job precedente esaminando il grafico di esecuzione del job nell'interfaccia di monitoraggio di Dataflow:

Denominazione delle trasformazioni composite
I nomi delle trasformazioni sono gerarchici e si basano sulla gerarchia delle trasformazioni nella pipeline. Se la pipeline ha una
trasformazione composita,
le trasformazioni nidificate vengono denominate in base alla trasformazione contenitore. Ad esempio, supponiamo che la tua pipeline contenga una trasformazione composita denominata CountWidgets
, che contiene una trasformazione interna denominata Parse
. Il nome completo
della trasformazione è CountWidgets/Parse
e devi specificarlo
nella mappatura della trasformazione.
Se la nuova pipeline mappa una trasformazione composita a un nome diverso, anche tutte le trasformazioni nidificate vengono rinominate automaticamente. Devi specificare i nomi modificati per le trasformazioni interne nella mappatura delle trasformazioni.
Riorganizza la gerarchia di trasformazione
Se la pipeline di sostituzione utilizza una gerarchia di trasformazione diversa da quella della pipeline precedente, devi dichiarare esplicitamente il mapping. Potresti avere una gerarchia di trasformazione diversa perché hai eseguito il refactoring delle trasformazioni composite o perché la pipeline dipende da una trasformazione composita di una libreria modificata.
Ad esempio, la pipeline precedente ha applicato una trasformazione composita, CountWidgets
,
che conteneva una trasformazione interna denominata Parse
. La pipeline di sostituzione
refattorizza CountWidgets
e nidifica Parse
all'interno di un'altra trasformazione denominata
Scan
. Perché l'aggiornamento vada a buon fine, devi mappare in modo esplicito il nome completo
della trasformazione nella pipeline precedente (CountWidgets/Parse
) con il nome
della trasformazione nella nuova pipeline (CountWidgets/Scan/Parse
):
Java
--transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
Se elimini completamente una trasformazione nella pipeline di sostituzione, devi
fornire una mappatura null. Supponiamo che la pipeline di sostituzione rimuova completamente la
trasformazione CountWidgets/Parse
:
--transformNameMapping={"CountWidgets/Parse":""}
Python
--transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
Se elimini completamente una trasformazione nella pipeline di sostituzione, devi
fornire una mappatura null. Supponiamo che la pipeline di sostituzione rimuova completamente la
trasformazione CountWidgets/Parse
:
--transform_name_mapping={"CountWidgets/Parse":""}
Vai
--transform_name_mapping={"CountWidgets/main.Parse":"CountWidgets/Scan/main.Parse"}
Se elimini completamente una trasformazione nella pipeline di sostituzione, devi
fornire una mappatura null. Supponiamo che la pipeline di sostituzione rimuova completamente la
trasformazione CountWidgets/Parse
:
--transform_name_mapping={"CountWidgets/main.Parse":""}
gcloud
--transform-name-mappings={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
Se elimini completamente una trasformazione nella pipeline di sostituzione, devi
fornire una mappatura null. Supponiamo che la pipeline di sostituzione rimuova completamente la
trasformazione CountWidgets/Parse
:
--transform-name-mappings={"CountWidgets/main.Parse":""}
REST
"transformNameMapping": {
CountWidgets/Parse: CountWidgets/Scan/Parse
}
Se elimini completamente una trasformazione nella pipeline di sostituzione, devi
fornire una mappatura null. Supponiamo che la pipeline di sostituzione rimuova completamente la
trasformazione CountWidgets/Parse
:
"transformNameMapping": {
CountWidgets/main.Parse: null
}
Effetti della sostituzione di un job
Quando sostituisci un job esistente, un nuovo job esegue il codice della pipeline aggiornato. Il servizio Dataflow conserva il nome del job, ma esegue il job di sostituzione con un ID job aggiornato. Questo processo potrebbe causare tempi di inattività mentre il job esistente si arresta, viene eseguito il controllo di compatibilità e il nuovo job inizia.
Il job di sostituzione conserva i seguenti elementi:
- Dati sullo stato intermedio del job precedente. Le cache in memoria non vengono salvate.
- Record di dati memorizzati nel buffer o metadati attualmente "in volo" dal job precedente. Ad esempio, alcuni record nella pipeline potrebbero essere memorizzati nel buffer in attesa della risoluzione di una finestra.
- Aggiornamenti delle opzioni del job in volo che hai applicato al job precedente.
Dati sullo stato intermedio
I dati dello stato intermedio del job precedente vengono conservati. I dati dello stato non includono le cache in memoria. Se vuoi conservare i dati della cache in memoria quando aggiorni la pipeline, come soluzione alternativa, refattorizza la pipeline per convertire le cache in dati di stato o in input secondari. Per ulteriori informazioni sull'utilizzo degli input secondari, consulta Pattern di input secondari nella documentazione di Apache Beam.
Le pipeline di streaming hanno limiti di dimensioni per ValueState
e per gli input secondari.
Di conseguenza, se hai cache di grandi dimensioni che vuoi conservare, potresti dover
utilizzare una memoria esterna, come Memorystore o Bigtable.
Dati in volo
I dati "in volo" vengono comunque elaborati dalle trasformazioni nella nuova pipeline. Tuttavia, le trasformazioni aggiuntive che aggiungi nel codice della pipeline di sostituzione potrebbero o meno avere effetto, a seconda di dove vengono memorizzati i record nel buffer. In questo esempio, la pipeline esistente ha le seguenti trasformazioni:
Java
p.apply("Read", ReadStrings()) .apply("Format", FormatStrings());
Python
p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription) | 'Format' >> FormatStrings()
Vai
beam.ParDo(s, ReadStrings) beam.ParDo(s, FormatStrings)
Puoi sostituire il job con il nuovo codice della pipeline nel seguente modo:
Java
p.apply("Read", ReadStrings()) .apply("Remove", RemoveStringsStartingWithA()) .apply("Format", FormatStrings());
Python
p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription) | 'Remove' >> RemoveStringsStartingWithA() | 'Format' >> FormatStrings()
Vai
beam.ParDo(s, ReadStrings) beam.ParDo(s, RemoveStringsStartingWithA) beam.ParDo(s, FormatStrings)
Anche se aggiungi una trasformazione per filtrare le stringhe che iniziano con la
lettera "A", la trasformazione successiva (FormatStrings
) potrebbe comunque visualizzare stringhe
in transito o memorizzate nel buffer che iniziano con "A" e che sono state trasferite dal job
precedente.
Modificare la finestra
Puoi modificare le strategie di windowing
e trigger
per gli elementi PCollection
nella pipeline di sostituzione, ma fai attenzione.
La modifica delle strategie di finestra o di trigger non influisce sui dati già memorizzati nel buffer o in transito.
Ti consigliamo di apportare solo modifiche minori al windowing della pipeline, ad esempio modificando la durata delle finestre temporali fisse o scorrevoli. Apportare modifiche significative al windowing o ai trigger, ad esempio la modifica dell'algoritmo di windowing, potrebbe avere risultati imprevedibili sull'output della pipeline.
Controllo della compatibilità del job
Quando avvii il job di sostituzione, il servizio Dataflow esegue un controllo di compatibilità tra il job di sostituzione e il job precedente. Se il controllo di compatibilità viene superato, il job precedente viene interrotto. Il job di sostituzione viene quindi avviato nel servizio Dataflow mantenendo lo stesso nome. Se il controllo di compatibilità non riesce, il job precedente continua a essere eseguito sul servizio Dataflow e il job di sostituzione restituisce un errore.
Java
A causa di una limitazione, devi utilizzare l'esecuzione del blocco per visualizzare gli errori relativi ai tentativi di aggiornamento non riusciti nella console o nel terminale. La soluzione alternativa attuale prevede i seguenti passaggi:
- Utilizza pipeline.run().waitUntilFinish() nel codice della pipeline.
- Esegui il programma di pipeline di sostituzione con l'opzione
--update
. - Attendi che il job di sostituzione superi il controllo di compatibilità.
- Esci dal processo di esecuzione del blocco digitando
Ctrl+C
.
In alternativa, puoi monitorare lo stato del job di sostituzione nell'interfaccia di monitoraggio di Dataflow. Se il job è stato avviato correttamente, ha superato anche il controllo di compatibilità.
Python
A causa di una limitazione, devi utilizzare l'esecuzione del blocco per visualizzare gli errori relativi ai tentativi di aggiornamento non riusciti nella console o nel terminale. La soluzione alternativa attuale prevede i seguenti passaggi:
- Utilizza pipeline.run().wait_until_finish() nel codice della pipeline.
- Esegui il programma di pipeline di sostituzione con l'opzione
--update
. - Attendi che il job di sostituzione superi il controllo di compatibilità.
- Esci dal processo di esecuzione del blocco digitando
Ctrl+C
.
In alternativa, puoi monitorare lo stato del job di sostituzione nell'interfaccia di monitoraggio di Dataflow. Se il job è stato avviato correttamente, ha superato anche il controllo di compatibilità.
Vai
A causa di una limitazione, devi utilizzare l'esecuzione del blocco per visualizzare gli errori relativi ai tentativi di aggiornamento non riusciti nella console o nel terminale.
Nello specifico, devi specificare l'esecuzione non bloccante utilizzando i flag
--execute_async
o --async
. La soluzione alternativa
attuale prevede i seguenti passaggi:
- Esegui il programma della pipeline di sostituzione con l'opzione
--update
e senza i flag--execute_async
o--async
. - Attendi che il job di sostituzione superi il controllo di compatibilità.
- Esci dal processo di esecuzione del blocco digitando
Ctrl+C
.
gcloud
A causa di una limitazione, devi utilizzare l'esecuzione del blocco per visualizzare gli errori relativi ai tentativi di aggiornamento non riusciti nella console o nel terminale. La soluzione alternativa attuale prevede i seguenti passaggi:
- Per le pipeline Java, utilizza pipeline.run().waitUntilFinish() nel codice della pipeline. Per le pipeline Python, utilizza pipeline.run().wait_until_finish() nel codice della pipeline. Per le pipeline Go, segui i passaggi nella scheda Go.
- Esegui il programma di pipeline di sostituzione con l'opzione
--update
. - Attendi che il job di sostituzione superi il controllo di compatibilità.
- Esci dal processo di esecuzione del blocco digitando
Ctrl+C
.
REST
A causa di una limitazione, devi utilizzare l'esecuzione del blocco per visualizzare gli errori relativi ai tentativi di aggiornamento non riusciti nella console o nel terminale. La soluzione alternativa attuale prevede i seguenti passaggi:
- Per le pipeline Java, utilizza pipeline.run().waitUntilFinish() nel codice della pipeline. Per le pipeline Python, utilizza pipeline.run().wait_until_finish() nel codice della pipeline. Per le pipeline Go, segui i passaggi nella scheda Go.
- Esegui il programma della pipeline di sostituzione con il campo
replaceJobId
. - Attendi che il job di sostituzione superi il controllo di compatibilità.
- Esci dal processo di esecuzione del blocco digitando
Ctrl+C
.
Il controllo di compatibilità utilizza il mapping di trasformazione fornito per garantire che Dataflow possa trasferire i dati di stato intermedi dai passaggi del job precedente al job sostitutivo. Il controllo di compatibilità garantisce inoltre
che i PCollection
nella pipeline utilizzino
gli stessi codificatori.
La modifica di un Coder
può causare il mancato superamento del controllo di compatibilità perché i dati in transito o i record memorizzati nel buffer potrebbero non essere serializzati correttamente nella pipeline di sostituzione.
Evitare interruzioni della compatibilità
Alcune differenze tra la pipeline precedente e quella sostitutiva possono causare l'esito negativo del controllo di compatibilità. Queste differenze includono:
- Modifica del grafico della pipeline senza fornire una mappatura. Quando aggiorni un job, Dataflow tenta di abbinare le trasformazioni del job precedente a quelle del job sostitutivo. Questo processo di corrispondenza aiuta Dataflow a trasferire i dati di stato intermedi per ogni passaggio. Se rinomini o rimuovi dei passaggi, devi fornire una mappatura delle trasformazioni in modo che Dataflow possa abbinare i dati di stato di conseguenza.
- Modifica degli input laterali per un passaggio. L'aggiunta di input laterali a una trasformazione nella pipeline di sostituzione o la loro rimozione causa l'esito negativo del controllo di compatibilità.
- Modifica del codificatore per un passaggio. Quando aggiorni un job, Dataflow conserva tutti i record di dati attualmente memorizzati nel buffer e li gestisce nel job di sostituzione. Ad esempio, i dati nel buffer potrebbero verificarsi durante la risoluzione del windowing. Se il job di sostituzione utilizza una codifica dei dati diversa o incompatibile, Dataflow non è in grado di serializzare o deserializzare questi record.
Rimozione di un'operazione "stateful" dalla pipeline. Se rimuovi le operazioni stateful dalla pipeline, il job di sostituzione potrebbe non superare il controllo di compatibilità. Dataflow può unire più passaggi per una maggiore efficienza. Se rimuovi un'operazione dipendente dallo stato da un passaggio unito, il controllo non va a buon fine. Le operazioni con stato includono:
- Trasformazioni che producono o utilizzano input secondari.
- Letture I/O.
- Trasformazioni che utilizzano lo stato con chiave.
- Trasformazioni con unione delle finestre.
Modifica delle variabili
DoFn
stateful. Per i job di streaming in corso, se la pipeline includeDoFn
stateful, la modifica delle variabiliDoFn
stateful potrebbe causare l'errore della pipeline.Tentativo di eseguire il job di sostituzione in una zona geografica diversa. Esegui il job di sostituzione nella stessa zona in cui hai eseguito il job precedente.
Aggiornamento degli schemi
Apache Beam consente agli PCollection
di avere schemi con campi denominati, nel qual caso
non sono necessari codificatori espliciti. Se i nomi e i tipi di campo per un determinato schema
rimangono invariati (inclusi i campi nidificati), lo schema non causa
l'esito negativo del controllo degli aggiornamenti. Tuttavia, l'aggiornamento potrebbe comunque essere bloccato se altri
segmenti della nuova pipeline sono incompatibili.
Evoluzione degli schemi
Spesso è necessario far evolvere lo schema di un PCollection
a causa dell'evoluzione dei requisiti aziendali. Il servizio Dataflow consente di apportare le seguenti modifiche a uno schema durante l'aggiornamento della pipeline:
- Aggiunta di uno o più nuovi campi a uno schema, inclusi i campi nidificati.
- Rendere facoltativo (annullabile) un tipo di campo obbligatorio (non annullabile).
La rimozione dei campi, la modifica dei nomi dei campi o la modifica dei tipi di campi non è consentita durante l'aggiornamento.
Trasferire dati aggiuntivi in un'operazione ParDo esistente
Puoi passare dati aggiuntivi (fuori banda) a un'operazione ParDo esistente utilizzando uno dei seguenti metodi, a seconda del caso d'uso:
- Serializza le informazioni come campi nella sottoclasse
DoFn
. - Qualsiasi variabile a cui fanno riferimento i metodi in un
DoFn
anonimo viene serializzata automaticamente. - Calcola i dati all'interno di
DoFn.startBundle()
. - Trasmetti i dati utilizzando
ParDo.withSideInputs
.
Per maggiori informazioni, consulta le seguenti pagine:
- Guida alla programmazione di Apache Beam: ParDo, in particolare le sezioni sulla creazione di un DoFn e degli input aggiuntivi.
- Riferimento all'SDK Apache Beam per Java: ParDo