Modello di flussi di modifiche di Spanner a BigQuery

Il modello Spanner modifiche in tempo reale to BigQuery è una pipeline di streaming che trasmette i record delle modifiche dei dati Spanner e li scrive in tabelle BigQuery utilizzando Dataflow Runner V2.

Tutte le colonne monitorate del flusso di modifiche sono incluse in ogni riga della tabella BigQuery, indipendentemente dal fatto che vengano modificate da una transazione Spanner. Le colonne non osservate non sono incluse nella riga BigQuery. Tutte le modifiche a Spanner precedenti al watermark Dataflow vengono applicate correttamente alle tabelle BigQuery o vengono archiviate nella coda dei messaggi non recapitabili per il nuovo tentativo. Le righe BigQuery vengono inserite in modo non ordinato rispetto all'ordinamento originale dei timestamp di commit di Spanner.

Se le tabelle BigQuery necessarie non esistono, la pipeline le crea. In caso contrario, vengono utilizzate le tabelle BigQuery esistenti. Lo schema delle tabelle BigQuery esistenti deve contenere le colonne monitorate corrispondenti delle tabelle Spanner e qualsiasi colonna di metadati aggiuntiva che non viene ignorata esplicitamente dall'opzione ignoreFields. Consulta la descrizione dei campi dei metadati nell'elenco seguente. Ogni nuova riga BigQuery include tutte le colonne monitorate dallo stream delle modifiche dalla riga corrispondente nella tabella Spanner al timestamp del record di modifica.

I seguenti campi di metadati vengono aggiunti alle tabelle BigQuery. Per maggiori dettagli su questi campi, consulta Record di modifica dei dati in "Partizioni, record e query dei flussi di modifiche".

  • _metadata_spanner_mod_type: il tipo di modifica (inserimento, aggiornamento o eliminazione) della transazione Spanner. Estratto dal record di modifica dei dati del flusso di modifiche.
  • _metadata_spanner_table_name: il nome della tabella Spanner. Questo campo non è il nome della tabella dei metadati del connettore.
  • _metadata_spanner_commit_timestamp: il timestamp di commit di Spanner, ovvero l'ora in cui viene eseguito il commit di una modifica. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche.
  • _metadata_spanner_server_transaction_id: una stringa univoca a livello globale che rappresenta la transazione Spanner in cui è stata eseguita la modifica. Utilizza questo valore solo nel contesto dell'elaborazione dei record dello stream di modifiche. Non è correlato all'ID transazione nell'API di Spanner. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche.
  • _metadata_spanner_record_sequence: il numero di sequenza del record all'interno della transazione Spanner. I numeri di sequenza sono garantiti per essere univoci e in aumento monotono, ma non necessariamente contigui, all'interno di una transazione. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche.
  • _metadata_spanner_is_last_record_in_transaction_in_partition: indica se il record è l'ultimo record per una transazione Spanner nella partizione corrente. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche.
  • _metadata_spanner_number_of_records_in_transaction: il numero di record di modifica dei dati che fanno parte della transazione Spanner in tutte le partizioni del flusso di modifiche. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche.
  • _metadata_spanner_number_of_partitions_in_transaction: Il numero di partizioni che restituiscono record di modifica dei dati per la transazione Spanner. Questo valore viene estratto dal record di modifica dei dati del flusso di modifiche.
  • _metadata_big_query_commit_timestamp: il timestamp di commit quando la riga viene inserita in BigQuery. Se useStorageWriteApi è true, questa colonna non viene creata automaticamente nella tabella del log delle modifiche dalla pipeline. In questo caso, devi aggiungere manualmente questa colonna nella tabella del log delle modifiche e impostare CURRENT_TIMESTAMP come valore predefinito, se necessario.

Quando utilizzi questo modello, tieni presente i seguenti dettagli:

  • Puoi utilizzare questo modello per propagare nuove colonne nelle tabelle esistenti o nuove tabelle da Spanner a BigQuery. Per saperne di più, consulta Gestire l'aggiunta di tabelle o colonne di monitoraggio.
  • Per i tipi di acquisizione dei valori OLD_AND_NEW_VALUES e NEW_VALUES, quando il record di modifica dei dati contiene una modifica UPDATE, il modello deve eseguire una lettura obsoleta di Spanner all'ora del commit del record di modifica dei dati per recuperare le colonne invariate ma monitorate. Assicurati di configurare correttamente il parametro "version_retention_period" del database per la lettura obsoleta. Per il tipo di acquisizione del valore NEW_ROW, il modello è più efficiente, perché il record di modifica dei dati acquisisce l'intera nuova riga, incluse le colonne che non vengono aggiornate nelle richieste UPDATE, e il modello non deve eseguire una lettura obsoleta.
  • Per ridurre al minimo la latenza di rete e i costi di trasporto di rete, esegui il job Dataflow dalla stessa regione dell'istanza Spanner o delle tabelle BigQuery. Se utilizzi origini, sink, posizioni dei file di staging o posizioni dei file temporanei che si trovano al di fuori della regione del job, i tuoi dati potrebbero essere inviati tra regioni diverse. Per ulteriori informazioni, consulta Regioni Dataflow.
  • Questo modello supporta tutti i tipi di dati Spanner validi. Se il tipo BigQuery è più preciso del tipo Spanner, durante la trasformazione potrebbe verificarsi una perdita di precisione. In particolare:
    • Per il tipo JSON di Spanner, l'ordine dei membri di un oggetto è in ordine lessicografico, ma non esiste una garanzia simile per il tipo JSON di BigQuery.
    • Spanner supporta il tipo TIMESTAMP in nanosecondi, ma BigQuery supporta solo il tipo TIMESTAMP in microsecondi.

Scopri di più sui flussi di modifiche, su come creare pipeline Dataflow per i flussi di modifiche e sulle best practice.

Requisiti della pipeline

  • L'istanza Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il database Spanner deve esistere prima dell'esecuzione della pipeline.
  • L'istanza dei metadati di Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il database dei metadati Spanner deve esistere prima dell'esecuzione della pipeline.
  • Lo stream di modifiche Spanner deve esistere prima dell'esecuzione della pipeline.
  • Il set di dati BigQuery deve esistere prima dell'esecuzione della pipeline.

Gestire l'aggiunta di tabelle o colonne di monitoraggio

Questa sezione descrive le best practice per la gestione dell'aggiunta di tabelle e colonne di monitoraggio Spanner durante l'esecuzione della pipeline. La versione del modello più vecchia supportata per questa funzionalità è 2024-09-19-00_RC00.

  • Prima di aggiungere una nuova colonna a un ambito di stream delle modifiche di Spanner, aggiungi prima la colonna alla tabella del log delle modifiche di BigQuery. La colonna aggiunta deve avere un tipo di dati corrispondente ed essere NULLABLE. Attendi almeno 10 minuti prima di continuare a creare la nuova colonna o tabella in Spanner. La scrittura nella nuova colonna senza attendere potrebbe comportare un record non elaborato con un codice di errore non valido nella directory della coda di messaggi non recapitati.
  • Per aggiungere una nuova tabella, devi prima aggiungerla al database Spanner. La tabella viene creata automaticamente in BigQuery quando la pipeline riceve un record per la nuova tabella.
  • Dopo aver aggiunto le nuove colonne o tabelle nel database Spanner, assicurati di modificare lo stream di modifiche per monitorare le nuove colonne o tabelle che ti interessano se non sono già monitorate in modo implicito.
  • Il modello non elimina tabelle o colonne da BigQuery. Se una colonna viene eliminata dalla tabella Spanner, i valori null vengono inseriti nelle colonne del log delle modifiche BigQuery per i record generati dopo l'eliminazione delle colonne dalla tabella Spanner, a meno che tu non elimini manualmente la colonna da BigQuery.
  • Il modello non supporta gli aggiornamenti del tipo di colonna. Anche se Spanner supporta la modifica di una colonna STRING in una colonna BYTES o di una colonna BYTES in una colonna STRING, non puoi modificare il tipo di dati di una colonna esistente o utilizzare lo stesso nome di colonna con tipi di dati diversi in BigQuery. Se elimini e ricrei una colonna con lo stesso nome ma un tipo diverso in Spanner, i dati potrebbero essere scritti nella colonna BigQuery esistente, ma il tipo rimane invariato.
  • Questo modello non supporta gli aggiornamenti della modalità a colonne. Le colonne dei metadati replicati in BigQuery sono impostate sulla modalità REQUIRED. Tutte le altre colonne replicate in BigQuery sono impostate su NULLABLE, indipendentemente dal fatto che siano definite come NOT NULL nella tabella Spanner. Non puoi aggiornare le colonne NULLABLE alla modalità REQUIRED in BigQuery.
  • La modifica del tipo di acquisizione dei valori di uno stream di modifiche non è supportata per le pipeline in esecuzione.

Parametri del modello

Parametri obbligatori

  • spannerInstanceId: l'istanza Spanner da cui leggere modifiche in tempo reale.
  • spannerDatabase: il database Spanner da cui leggere modifiche in tempo reale.
  • spannerMetadataInstanceId: l'istanza Spanner da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale.
  • spannerMetadataDatabase: il database Spanner da utilizzare per la tabella dei metadati del connettore di modifiche in tempo reale.
  • spannerChangeStreamName: il nome della modifica in tempo reale di Spanner da cui leggere.
  • bigQueryDataset: il set di dati BigQuery per l'output modifiche in tempo reale.

Parametri facoltativi

  • spannerProjectId: il progetto da cui leggere modifiche in tempo reale. Questo valore è anche il progetto in cui viene creata la tabella dei metadati del connettore di modifiche in tempo reale. Il valore predefinito di questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
  • spannerDatabaseRole: il ruolo database di Spanner da utilizzare durante l'esecuzione del modello. Questo parametro è obbligatorio solo quando l'entità IAM che esegue il modello è un utente con controllo dell'accesso dell'accesso granulare. Il ruolo del database deve disporre del privilegio SELECT sul flusso di modifiche e del privilegio EXECUTE sulla funzione di lettura del flusso di modifiche. Per saperne di più, consulta Controllo dell'accesso granulare per gli modifiche in tempo reale (https://cloud.google.com/spanner/docs/fgac-change-streams).
  • spannerMetadataTableName: il nome della tabella dei metadati del connettore Spanner modifiche in tempo reale da utilizzare. Se non viene fornita, durante il flusso della pipeline viene creata automaticamente una tabella di metadati del connettore di modifiche in tempo reale Spanner. Devi fornire questo parametro quando aggiorni una pipeline esistente. In caso contrario, non fornire questo parametro.
  • rpcPriority: la priorità della richiesta per le chiamate Spanner. Il valore deve essere uno dei seguenti: HIGH, MEDIUM o LOW. Il valore predefinito è HIGH.
  • spannerHost: l'endpoint Cloud Spanner da chiamare nel modello. Utilizzato solo per i test. Ad esempio, https://batch-spanner.googleapis.com.
  • startTimestamp: la data e l'ora di inizio (https://datatracker.ietf.org/doc/html/rfc3339), inclusa, da utilizzare per la lettura dei modifiche in tempo reale. Ex-2021-10-12T07:20:50.52Z. Il valore predefinito è il timestamp dell'inizio della pipeline, ovvero l'ora attuale.
  • endTimestamp: la data e l'ora di fine (https://datatracker.ietf.org/doc/html/rfc3339), inclusa, da utilizzare per la lettura dei flussi di modifiche.Esempio: 2021-10-12T07:20:50.52Z. Il valore predefinito è un tempo infinito nel futuro.
  • bigQueryProjectId: il progetto BigQuery. Il valore predefinito è il progetto per il job Dataflow.
  • bigQueryChangelogTableNameTemplate: il modello per il nome della tabella BigQuery che contiene il log delle modifiche. Il valore predefinito è: {_metadata_spanner_table_name}_changelog.
  • deadLetterQueueDirectory: il percorso in cui archiviare i record non elaborati. Il percorso predefinito è una directory nella posizione temporanea del job Dataflow. Il valore predefinito è solitamente sufficiente.
  • dlqRetryMinutes: il numero di minuti tra i tentativi di ripetizione della coda dei messaggi non recapitabili. Il valore predefinito è 10.
  • ignoreFields: un elenco di campi (sensibili alle maiuscole) separati da virgole da ignorare. Questi campi potrebbero essere campi di tabelle monitorate o campi di metadati aggiunti dalla pipeline. I campi ignorati non vengono inseriti in BigQuery. Quando ignori il campo _metadata_spanner_table_name, viene ignorato anche il parametro bigQueryChangelogTableNameTemplate. Il valore predefinito è vuoto.
  • disableDlqRetries: indica se disattivare o meno i tentativi per la DLQ. Il valore predefinito è false.
  • useStorageWriteApi: se è true, la pipeline utilizza l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è false. Per ulteriori informazioni, consulta Utilizzo dell'API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: quando utilizzi l'API Storage Write, specifica la semantica di scrittura. Per utilizzare la semantica almeno una volta (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), imposta questo parametro su true. Per utilizzare la semantica exactly-once, imposta il parametro su false. Questo parametro si applica solo quando useStorageWriteApi è true. Il valore predefinito è false.
  • numStorageWriteApiStreams: quando utilizzi l'API Storage Write, specifica il numero di flussi di scrittura. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro. Il valore predefinito è 0.
  • storageWriteApiTriggeringFrequencySec: quando si utilizza l'API Storage Write, specifica la frequenza di attivazione, in secondi. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello di dataflow, seleziona the Cloud Spanner change streams to BigQuery template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

Sostituisci quanto segue:

  • JOB_NAME: un nome univoco del job a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • SPANNER_INSTANCE_ID: ID istanza Spanner
  • SPANNER_DATABASE: Database Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID istanza dei metadati Spanner
  • SPANNER_METADATA_DATABASE: Database dei metadati di Spanner
  • SPANNER_CHANGE_STREAM: Modifica in tempo reale di Spanner
  • BIGQUERY_DATASET: il set di dati BigQuery per l'output degli modifiche in tempo reale

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome univoco del job a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • SPANNER_INSTANCE_ID: ID istanza Spanner
  • SPANNER_DATABASE: Database Spanner
  • SPANNER_METADATA_INSTANCE_ID: ID istanza dei metadati Spanner
  • SPANNER_METADATA_DATABASE: Database dei metadati di Spanner
  • SPANNER_CHANGE_STREAM: Modifica in tempo reale di Spanner
  • BIGQUERY_DATASET: il set di dati BigQuery per l'output degli modifiche in tempo reale

Passaggi successivi