Il modello Spanner modifiche in tempo reale to BigQuery è una pipeline di streaming che trasmette i record delle modifiche dei dati Spanner e li scrive in tabelle BigQuery utilizzando Dataflow Runner V2.
Tutte le colonne monitorate del flusso di modifiche sono incluse in ogni riga della tabella BigQuery, indipendentemente dal fatto che vengano modificate da una transazione Spanner. Le colonne non osservate non sono incluse nella riga BigQuery. Tutte le modifiche a Spanner precedenti al watermark Dataflow vengono applicate correttamente alle tabelle BigQuery o vengono archiviate nella coda dei messaggi non recapitabili per il nuovo tentativo. Le righe BigQuery vengono inserite in modo non ordinato rispetto all'ordinamento originale dei timestamp di commit di Spanner.
Se le tabelle BigQuery necessarie non esistono, la pipeline le crea. In caso contrario,
vengono utilizzate le tabelle BigQuery esistenti. Lo schema delle tabelle BigQuery esistenti deve
contenere le colonne monitorate corrispondenti delle tabelle Spanner e qualsiasi colonna di metadati aggiuntiva
che non viene ignorata esplicitamente dall'opzione ignoreFields
.
Consulta la descrizione dei campi dei metadati nell'elenco seguente.
Ogni nuova riga BigQuery include tutte le colonne
monitorate dallo stream delle modifiche dalla riga corrispondente nella tabella Spanner al
timestamp del record di modifica.
I seguenti campi di metadati vengono aggiunti alle tabelle BigQuery. Per maggiori dettagli su questi campi, consulta Record di modifica dei dati in "Partizioni, record e query dei flussi di modifiche".
_metadata_spanner_mod_type
: il tipo di modifica (inserimento, aggiornamento o eliminazione) della transazione Spanner. Estratto dal record di modifica dei dati del flusso di modifiche._metadata_spanner_table_name
: il nome della tabella Spanner. Questo campo non è il nome della tabella dei metadati del connettore._metadata_spanner_commit_timestamp
: il timestamp di commit di Spanner, ovvero l'ora in cui viene eseguito il commit di una modifica. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche._metadata_spanner_server_transaction_id
: una stringa univoca a livello globale che rappresenta la transazione Spanner in cui è stata eseguita la modifica. Utilizza questo valore solo nel contesto dell'elaborazione dei record dello stream di modifiche. Non è correlato all'ID transazione nell'API di Spanner. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche._metadata_spanner_record_sequence
: il numero di sequenza del record all'interno della transazione Spanner. I numeri di sequenza sono garantiti per essere univoci e in aumento monotono, ma non necessariamente contigui, all'interno di una transazione. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche._metadata_spanner_is_last_record_in_transaction_in_partition
: indica se il record è l'ultimo record per una transazione Spanner nella partizione corrente. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche._metadata_spanner_number_of_records_in_transaction
: il numero di record di modifica dei dati che fanno parte della transazione Spanner in tutte le partizioni del flusso di modifiche. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche._metadata_spanner_number_of_partitions_in_transaction
: Il numero di partizioni che restituiscono record di modifica dei dati per la transazione Spanner. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche._metadata_big_query_commit_timestamp
: il timestamp di commit quando la riga viene inserita in BigQuery. SeuseStorageWriteApi
ètrue
, questa colonna non viene creata automaticamente nella tabella del log delle modifiche dalla pipeline. In questo caso, devi aggiungere manualmente questa colonna nella tabella del log delle modifiche e impostareCURRENT_TIMESTAMP
come valore predefinito, se necessario.
Quando utilizzi questo modello, tieni presente i seguenti dettagli:
- Puoi utilizzare questo modello per propagare nuove colonne nelle tabelle esistenti o nuove tabelle da Spanner a BigQuery. Per saperne di più, consulta Gestire l'aggiunta di tabelle o colonne di monitoraggio.
- Per i tipi di acquisizione dei valori
OLD_AND_NEW_VALUES
eNEW_VALUES
, quando il record di modifica dei dati contiene una modifica UPDATE, il modello deve eseguire una lettura obsoleta di Spanner all'ora del commit del record di modifica dei dati per recuperare le colonne invariate ma monitorate. Assicurati di configurare correttamente il parametro "version_retention_period" del database per la lettura obsoleta. Per il tipo di acquisizione del valoreNEW_ROW
, il modello è più efficiente, perché il record di modifica dei dati acquisisce l'intera nuova riga, incluse le colonne che non vengono aggiornate nelle richieste UPDATE, e il modello non deve eseguire una lettura obsoleta. - Per ridurre al minimo la latenza di rete e i costi di trasporto di rete, esegui il job Dataflow dalla stessa regione dell'istanza Spanner o delle tabelle BigQuery. Se utilizzi origini, sink, posizioni dei file di staging o posizioni dei file temporanei che si trovano al di fuori della regione del job, i tuoi dati potrebbero essere inviati tra regioni diverse. Per ulteriori informazioni, consulta Regioni Dataflow.
- Questo modello supporta tutti i tipi di dati Spanner validi. Se il tipo BigQuery
è più preciso del tipo Spanner, durante la
trasformazione potrebbe verificarsi una perdita di precisione. In particolare:
- Per il tipo JSON di Spanner, l'ordine dei membri di un oggetto è in ordine lessicografico, ma non esiste una garanzia simile per il tipo JSON di BigQuery.
- Spanner supporta il tipo TIMESTAMP in nanosecondi, ma BigQuery supporta solo il tipo TIMESTAMP in microsecondi.
Scopri di più sui flussi di modifiche, su come creare pipeline Dataflow per i flussi di modifiche e sulle best practice.
Requisiti della pipeline
- L'istanza Spanner deve esistere prima dell'esecuzione della pipeline.
- Il database Spanner deve esistere prima dell'esecuzione della pipeline.
- L'istanza dei metadati di Spanner deve esistere prima dell'esecuzione della pipeline.
- Il database dei metadati Spanner deve esistere prima dell'esecuzione della pipeline.
- Lo stream di modifiche Spanner deve esistere prima dell'esecuzione della pipeline.
- Il set di dati BigQuery deve esistere prima dell'esecuzione della pipeline.
Gestire l'aggiunta di tabelle o colonne di monitoraggio
Questa sezione descrive le best practice per la gestione dell'aggiunta di tabelle e colonne di monitoraggio Spanner durante l'esecuzione della pipeline. La versione del modello più vecchia supportata
per questa funzionalità è 2024-09-19-00_RC00
.
- Prima di aggiungere una nuova colonna a un ambito di stream delle modifiche di Spanner,
aggiungi prima la colonna alla tabella del log delle modifiche di BigQuery. La colonna
aggiunta deve avere un tipo di dati corrispondente ed essere
NULLABLE
. Attendi almeno 10 minuti prima di continuare a creare la nuova colonna o tabella in Spanner. La scrittura nella nuova colonna senza attendere potrebbe comportare un record non elaborato con un codice di errore non valido nella directory della coda di messaggi non recapitati. - Per aggiungere una nuova tabella, devi prima aggiungerla al database Spanner. La tabella viene creata automaticamente in BigQuery quando la pipeline riceve un record per la nuova tabella.
- Dopo aver aggiunto le nuove colonne o tabelle nel database Spanner, assicurati di modificare lo stream di modifiche per monitorare le nuove colonne o tabelle che ti interessano se non sono già monitorate in modo implicito.
- Il modello non elimina tabelle o colonne da BigQuery. Se una colonna viene eliminata dalla tabella Spanner, i valori null vengono inseriti nelle colonne del log delle modifiche BigQuery per i record generati dopo l'eliminazione delle colonne dalla tabella Spanner, a meno che tu non elimini manualmente la colonna da BigQuery.
- Il modello non supporta gli aggiornamenti del tipo di colonna. Anche se
Spanner supporta la modifica di una colonna
STRING
in una colonnaBYTES
o di una colonnaBYTES
in una colonnaSTRING
, non puoi modificare il tipo di dati di una colonna esistente o utilizzare lo stesso nome di colonna con tipi di dati diversi in BigQuery. Se elimini e ricrei una colonna con lo stesso nome ma un tipo diverso in Spanner, i dati potrebbero essere scritti nella colonna BigQuery esistente, ma il tipo rimane invariato. - Questo modello non supporta gli aggiornamenti della modalità a colonne. Le colonne dei metadati
replicati in BigQuery sono impostate sulla modalità
REQUIRED
. Tutte le altre colonne replicate in BigQuery sono impostate suNULLABLE
, indipendentemente dal fatto che siano definite comeNOT NULL
nella tabella Spanner. Non puoi aggiornare le colonneNULLABLE
alla modalitàREQUIRED
in BigQuery. - La modifica del tipo di acquisizione dei valori di uno stream di modifiche non è supportata per le pipeline in esecuzione.
Parametri del modello
Parametri obbligatori
- spannerInstanceId: l'istanza Spanner da cui leggere modifiche in tempo reale.
- spannerDatabase: il database Spanner da cui leggere modifiche in tempo reale.
- spannerMetadataInstanceId: l'istanza Spanner da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale.
- spannerMetadataDatabase: il database Spanner da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale.
- spannerChangeStreamName: il nome della modifica in tempo reale di Spanner da cui leggere.
- bigQueryDataset: il set di dati BigQuery per l'output modifiche in tempo reale.
Parametri facoltativi
- spannerProjectId: il progetto da cui leggere modifiche in tempo reale. Questo valore è anche il progetto in cui viene creata la tabella dei metadati del connettore di modifiche in tempo reale. Il valore predefinito di questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
- spannerDatabaseRole: il ruolo database di Spanner da utilizzare durante l'esecuzione del modello. Questo parametro è obbligatorio solo quando l'entità IAM che esegue il modello è un utente con controllo dell'accesso dell'accesso granulare. Il ruolo del database deve disporre del privilegio
SELECT
sul flusso di modifiche e del privilegioEXECUTE
sulla funzione di lettura del flusso di modifiche. Per saperne di più, consulta Controllo dell'accesso granulare per gli modifiche in tempo reale (https://cloud.google.com/spanner/docs/fgac-change-streams). - spannerMetadataTableName: il nome della tabella dei metadati del connettore Spanner modifiche in tempo reale da utilizzare. Se non viene fornita, durante il flusso della pipeline viene creata automaticamente una tabella di metadati del connettore di modifiche in tempo reale Spanner. Devi fornire questo parametro quando aggiorni una pipeline esistente. In caso contrario, non fornire questo parametro.
- rpcPriority: la priorità della richiesta per le chiamate Spanner. Il valore deve essere uno dei seguenti:
HIGH
,MEDIUM
oLOW
. Il valore predefinito èHIGH
. - spannerHost: l'endpoint Cloud Spanner da chiamare nel modello. Utilizzato solo per i test. Ad esempio,
https://batch-spanner.googleapis.com
. - startTimestamp: la data e l'ora di inizio (https://datatracker.ietf.org/doc/html/rfc3339), inclusa, da utilizzare per la lettura dei modifiche in tempo reale. Ex-2021-10-12T07:20:50.52Z. Il valore predefinito è il timestamp dell'inizio della pipeline, ovvero l'ora attuale.
- endTimestamp: la data e l'ora di fine (https://datatracker.ietf.org/doc/html/rfc3339), inclusa, da utilizzare per la lettura dei flussi di modifiche.Esempio: 2021-10-12T07:20:50.52Z. Il valore predefinito è un tempo infinito nel futuro.
- bigQueryProjectId: il progetto BigQuery. Il valore predefinito è il progetto per il job Dataflow.
- bigQueryChangelogTableNameTemplate: il modello per il nome della tabella BigQuery che contiene il log delle modifiche. Il valore predefinito è: {_metadata_spanner_table_name}_changelog.
- deadLetterQueueDirectory: il percorso in cui archiviare i record non elaborati. Il percorso predefinito è una directory nella posizione temporanea del job Dataflow. Il valore predefinito è solitamente sufficiente.
- dlqRetryMinutes: il numero di minuti tra i tentativi di ripetizione della coda dei messaggi non recapitabili. Il valore predefinito è
10
. - ignoreFields: un elenco di campi (sensibili alle maiuscole) separati da virgole da ignorare. Questi campi potrebbero essere campi di tabelle monitorate o campi di metadati aggiunti dalla pipeline. I campi ignorati non vengono inseriti in BigQuery. Quando ignori il campo _metadata_spanner_table_name, viene ignorato anche il parametro bigQueryChangelogTableNameTemplate. Il valore predefinito è vuoto.
- disableDlqRetries: indica se disattivare o meno i tentativi per la DLQ. Il valore predefinito è false.
- useStorageWriteApi: se è true, la pipeline utilizza l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è
false
. Per ulteriori informazioni, consulta Utilizzo dell'API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - useStorageWriteApiAtLeastOnce: quando utilizzi l'API Storage Write, specifica la semantica di scrittura. Per utilizzare la semantica almeno una volta (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), imposta questo parametro su
true
. Per utilizzare la semantica exactly-once, imposta il parametro sufalse
. Questo parametro si applica solo quandouseStorageWriteApi
ètrue
. Il valore predefinito èfalse
. - numStorageWriteApiStreams: quando utilizzi l'API Storage Write, specifica il numero di flussi di scrittura. Se
useStorageWriteApi
ètrue
euseStorageWriteApiAtLeastOnce
èfalse
, devi impostare questo parametro. Il valore predefinito è 0. - storageWriteApiTriggeringFrequencySec: quando si utilizza l'API Storage Write, specifica la frequenza di attivazione, in secondi. Se
useStorageWriteApi
ètrue
euseStorageWriteApiAtLeastOnce
èfalse
, devi impostare questo parametro.
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione
predefinita è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Dal menu a discesa Modello di dataflow, seleziona the Cloud Spanner change streams to BigQuery template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ bigQueryDataset=BIGQUERY_DATASET
Sostituisci quanto segue:
JOB_NAME
: un nome univoco del job a tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare l'ultima versione del modello, disponibile nella cartella principale senza data nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
SPANNER_INSTANCE_ID
: ID istanza SpannerSPANNER_DATABASE
: Database SpannerSPANNER_METADATA_INSTANCE_ID
: ID istanza dei metadati SpannerSPANNER_METADATA_DATABASE
: Database dei metadati di SpannerSPANNER_CHANGE_STREAM
: Modifica in tempo reale di SpannerBIGQUERY_DATASET
: il set di dati BigQuery per l'output degli modifiche in tempo reale
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "bigQueryDataset": "BIGQUERY_DATASET" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery", } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome univoco del job a tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare l'ultima versione del modello, disponibile nella cartella principale senza data nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
SPANNER_INSTANCE_ID
: ID istanza SpannerSPANNER_DATABASE
: Database SpannerSPANNER_METADATA_INSTANCE_ID
: ID istanza dei metadati SpannerSPANNER_METADATA_DATABASE
: Database dei metadati di SpannerSPANNER_CHANGE_STREAM
: Modifica in tempo reale di SpannerBIGQUERY_DATASET
: il set di dati BigQuery per l'output degli modifiche in tempo reale
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.