Il modello di modifiche in tempo reale Bigtable a Pub/Sub è una pipeline in modalità flusso che trasmette i flussi di record delle modifiche dei dati di Bigtable e li pubblica in un argomento Pub/Sub utilizzando Dataflow.
Un flusso di modifiche di Bigtable ti consente di abbonarti alle mutazioni dei dati in base alla tabella. Quando ti abboni ai modifiche in tempo reale delle tabelle, si applicano i seguenti vincoli:
- Vengono restituiti solo i descrittori e le celle modificati delle operazioni di eliminazione.
- Viene restituito solo il nuovo valore di una cella modificata.
Quando i record delle modifiche dei dati vengono pubblicati in un argomento Pub/Sub, i messaggi potrebbero essere inseriti in modo non ordinato rispetto all'ordinamento originale dei timestamp di commit di Bigtable.
I record delle modifiche dei dati di Bigtable che non possono essere pubblicati negli argomenti Pub/Sub vengono temporaneamente inseriti in una directory di coda dei messaggi non recapitabili (coda dei messaggi non elaborati) in Cloud Storage. Dopo il numero massimo di tentativi non riusciti, questi record vengono inseriti a tempo indeterminato nella stessa directory della coda dei messaggi non recapitabili per la revisione da parte di persone fisiche o per l'ulteriore elaborazione da parte dell'utente.
La pipeline richiede che esista l'argomento Pub/Sub di destinazione. L'argomento di destinazione potrebbe essere configurato per convalidare i messaggi utilizzando uno schema. Quando un argomento Pub/Sub specifica uno schema, la pipeline viene avviata solo se lo schema è valido. A seconda del tipo di schema, utilizza una delle seguenti definizioni dello schema per l'argomento di destinazione:
Buffer di protocollo
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangeLogEntryProto"; message ChangelogEntryProto{ required bytes rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional bytes column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional bytes value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Avro
{ "name" : "ChangelogEntryMessage", "type" : "record", "namespace" : "com.google.cloud.teleport.bigtable", "fields" : [ { "name" : "rowKey", "type" : "bytes"}, { "name" : "modType", "type" : { "name": "ModType", "type": "enum", "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]} }, { "name": "isGC", "type": "boolean" }, { "name": "tieBreaker", "type": "int"}, { "name": "columnFamily", "type": "string"}, { "name": "commitTimestamp", "type" : "long"}, { "name" : "sourceInstance", "type" : "string"}, { "name" : "sourceCluster", "type" : "string"}, { "name" : "sourceTable", "type" : "string"}, { "name": "column", "type" : ["null", "bytes"]}, { "name": "timestamp", "type" : ["null", "long"]}, { "name": "timestampFrom", "type" : ["null", "long"]}, { "name": "timestampTo", "type" : ["null", "long"]}, { "name" : "value", "type" : ["null", "bytes"]} ] }
JSON
Utilizza il seguente schema Protobuf con la codifica dei messaggi JSON
:
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangelogEntryMessageText"; message ChangelogEntryText{ required string rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional string column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional string value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Ogni nuovo messaggio Pub/Sub include una voce di un record di modifica dei dati restituito dallo stream di modifiche dalla riga corrispondente nella tabella Bigtable. Il modello Pub/Sub appiattisce le voci in ogni record di modifica dei dati in singole modifiche a livello di cella.
Descrizione del messaggio di output Pub/Sub
Nome campo | Descrizione |
---|---|
rowKey |
La chiave di riga della riga modificata. Arriva sotto forma di array di byte. Quando la codifica dei messaggi JSON è configurata, le chiavi di riga vengono restituite come stringhe. Quando viene specificato useBase64Rowkeys , le chiavi di riga vengono codificate in Base64. In caso contrario, viene utilizzato un set di caratteri specificato da bigtableChangeStreamCharset per decodificare i byte della chiave di riga in una stringa. |
modType |
Il tipo di mutazione della riga. Utilizza uno dei seguenti valori: SET_CELL , DELETE_CELLS o DELETE_FAMILY . |
columnFamily |
La famiglia di colonne interessata dalla mutazione della riga. |
column |
Il qualificatore di colonna interessato dalla mutazione della riga. Per il tipo di mutazione DELETE_FAMILY , il campo della colonna non è impostato. Arriva sotto forma di array di byte. Quando la codifica dei messaggi JSON è configurata, le colonne vengono restituite come stringhe. Quando viene specificato useBase64ColumnQualifier , il campo della colonna è codificato in Base64. In caso contrario, viene utilizzato un set di caratteri specificato da bigtableChangeStreamCharset per decodificare i byte della chiave di riga in una stringa. |
commitTimestamp |
L'ora in cui Bigtable applica la mutazione. Il tempo è misurato in microsecondi dall'epoca di Unix (1° gennaio 1970 alle ore UTC). |
timestamp |
Il valore timestamp della cella interessata dalla mutazione. Per i tipi di mutazione DELETE_CELLS e DELETE_FAMILY , il timestamp non è impostato. Il tempo è misurato in microsecondi dall'epoca di Unix (1° gennaio 1970 alle ore UTC). |
timestampFrom |
Descrive l'inizio inclusivo dell'intervallo di timestamp per tutte le celle eliminate dalla mutazione DELETE_CELLS . Per altri tipi di mutazione, timestampFrom non è impostato. Il tempo è misurato in microsecondi dall'epoca di Unix (1° gennaio 1970 alle ore UTC). |
timestampTo |
Descrive la fine esclusiva dell'intervallo di timestamp per tutte le celle eliminate dalla mutazione DELETE_CELLS . Per altri tipi di mutazione, timestampTo non è impostato. |
isGC |
Un valore booleano che indica se la mutazione è generata da un meccanismo di garbage collection di Bigtable. |
tieBreaker |
Quando due mutazioni vengono registrate contemporaneamente da cluster Bigtable diversi, alla tabella di origine viene applicata la mutazione con il valore tiebreaker più alto. Le mutazioni con valori tiebreaker inferiori vengono scartate. |
value |
Il nuovo valore impostato dalla mutazione. A meno che non sia impostata l'opzione della pipeline stripValues , il valore viene impostato per le mutazioni SET_CELL . Per altri tipi di mutazione, il valore non è impostato. Arriva sotto forma di array di byte. Quando la codifica dei messaggi JSON è configurata, i valori vengono restituiti come stringhe.
Quando viene specificato useBase64Values , il valore viene codificato in Base64. In caso contrario, viene utilizzato un set di caratteri specificato da bigtableChangeStreamCharset per decodificare i byte del valore in una stringa. |
sourceInstance |
Il nome dell'istanza Bigtable che ha registrato la mutazione. Potrebbe verificarsi quando più pipeline trasmettono in streaming le modifiche da istanze diverse allo stesso argomento Pub/Sub. |
sourceCluster |
Il nome del cluster Bigtable che ha registrato la mutazione. Potrebbe essere utilizzato quando più pipeline trasmettono in streaming le modifiche da istanze diverse allo stesso argomento Pub/Sub. |
sourceTable |
Il nome della tabella Bigtable che ha ricevuto la mutazione. Potrebbe essere utilizzato nel caso in cui un flusso di più pipeline passi da tabelle diverse allo stesso argomento Pub/Sub. |
Requisiti della pipeline
- L'istanza di origine Bigtable specificata.
- La tabella di origine Bigtable specificata. La tabella deve avere i modifiche in tempo reale abilitati.
- Il profilo dell'applicazione Bigtable specificato.
- Deve esistere l'argomento Pub/Sub specificato.
Parametri del modello
Parametri obbligatori
- pubSubTopic: il nome dell'argomento Pub/Sub di destinazione.
- bigtableChangeStreamAppProfile: l'ID profilo dell'applicazione Bigtable. Il profilo dell'applicazione deve utilizzare il routing a un singolo cluster e consentire le transazioni su riga singola.
- bigtableReadInstanceId: l'ID istanza Bigtable di origine.
- bigtableReadTableId: l'ID della tabella Bigtable di origine.
Parametri facoltativi
- messageEncoding: la codifica dei messaggi da pubblicare nell'argomento Pub/Sub. Quando lo schema dell'argomento di destinazione è configurato, la codifica dei messaggi è determinata dalle impostazioni dell'argomento. Sono supportati i seguenti valori:
BINARY
eJSON
. Il valore predefinito èJSON
. - messageFormat: la codifica dei messaggi da pubblicare nell'argomento Pub/Sub. Quando lo schema dell'argomento di destinazione è configurato, la codifica dei messaggi è determinata dalle impostazioni dell'argomento. Sono supportati i seguenti valori:
AVRO
,PROTOCOL_BUFFERS
eJSON
. Il valore predefinito èJSON
. Quando viene utilizzato il formatoJSON
, i campi rowKey, column e value del messaggio sono stringhe, il cui contenuto è determinato dalle opzioni della pipelineuseBase64Rowkeys
,useBase64ColumnQualifiers
,useBase64Values
ebigtableChangeStreamCharset
. - stripValues: se impostato su
true
, le modificheSET_CELL
vengono restituite senza nuovi valori impostati. Il valore predefinito èfalse
. Questo parametro è utile quando non è necessario che sia presente un nuovo valore, operazione nota anche come invalidazione della cache, o quando i valori sono estremamente grandi e superano i limiti di dimensione dei messaggi Pub/Sub. - dlqDirectory: la directory per la coda di messaggi non recapitabili. I record che non vengono elaborati vengono archiviati in questa directory. Il valore predefinito è una directory nella posizione temporanea del job Dataflow. Nella maggior parte dei casi, puoi utilizzare il percorso predefinito.
- dlqRetryMinutes: il numero di minuti tra i tentativi di ripetizione della coda dei messaggi non recapitabili. Il valore predefinito è
10
. - dlqMaxRetries: il numero massimo di nuovi tentativi per la coda dei messaggi non recapitabili. Il valore predefinito è
5
. - useBase64Rowkeys: utilizzato con la codifica dei messaggi JSON. Se impostato su
true
, il camporowKey
è una stringa con codifica Base64. In caso contrario,rowKey
viene prodotto utilizzandobigtableChangeStreamCharset
per decodificare i byte in una stringa. Il valore predefinito èfalse
. - pubSubProjectId: l'ID progetto Bigtable. Il valore predefinito è il progetto del job Dataflow.
- useBase64ColumnQualifiers: utilizzato con la codifica dei messaggi JSON. Se impostato su
true
, il campocolumn
è una stringa con codifica Base64. In caso contrario, la colonna viene generata utilizzandobigtableChangeStreamCharset
per decodificare i byte in una stringa. Il valore predefinito èfalse
. - useBase64Values: utilizzato con la codifica dei messaggi JSON. Se impostato su
true
, il campo del valore è una stringa con codifica Base64. In caso contrario, il valore viene prodotto utilizzandobigtableChangeStreamCharset
per decodificare i byte in una stringa. Il valore predefinito èfalse
. - disableDlqRetries: indica se disattivare o meno i tentativi per la DLQ. Il valore predefinito è false.
- bigtableChangeStreamMetadataInstanceId: l'ID istanza dei metadati delle modifiche in tempo reale di Bigtable. Il valore predefinito è vuoto.
- bigtableChangeStreamMetadataTableTableId: l'ID della tabella dei metadati del connettore Bigtable modifiche in tempo reale. Se non specificata, viene creata automaticamente una tabella di metadati del connettore di modifiche in tempo reale Bigtable durante l'esecuzione della pipeline. Il valore predefinito è vuoto.
- bigtableChangeStreamCharset: il nome del set di caratteri dei modifiche in tempo reale Bigtable. Il valore predefinito è UTF-8.
- bigtableChangeStreamStartTimestamp: il timestamp iniziale (https://tools.ietf.org/html/rfc3339), inclusivo, da utilizzare per la lettura degli modifiche in tempo reale. Ad esempio:
2022-05-05T07:59:59Z
. Il valore predefinito è il timestamp dell'ora di inizio della pipeline. - bigtableChangeStreamIgnoreColumnFamilies: un elenco separato da virgole di modifiche ai nomi famiglia di colonne da ignorare. Il valore predefinito è vuoto.
- bigtableChangeStreamIgnoreColumns: un elenco separato da virgole di modifiche ai nomi delle colonne da ignorare. Esempio: "cf1:col1,cf2:col2". Il valore predefinito è vuoto.
- bigtableChangeStreamName: un nome univoco per la pipeline client. Consente di riprendere l'elaborazione dal punto in cui si è interrotta una pipeline in esecuzione in precedenza. Il valore predefinito è un nome generato automaticamente. Consulta i log dei job Dataflow per il valore utilizzato.
- bigtableChangeStreamResume: se impostato su
true
, una nuova pipeline riprende l'elaborazione dal punto in cui si è interrotta una pipeline in esecuzione in precedenza con lo stesso valorebigtableChangeStreamName
. Se la pipeline con il valorebigtableChangeStreamName
specificato non è mai stata eseguita, non viene avviata una nuova pipeline. Se impostato sufalse
, viene avviata una nuova pipeline. Se una pipeline con lo stesso valore dibigtableChangeStreamName
è già stata eseguita per l'origine specificata, non viene avviata una nuova pipeline. Il valore predefinito èfalse
. - bigtableReadChangeStreamTimeoutMs: il timeout per le richieste Bigtable ReadChangeStream in millisecondi.
- bigtableReadProjectId: l'ID progetto Bigtable. Il valore predefinito è il progetto per il job Dataflow.
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione
predefinita è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Dal menu a discesa Modello di dataflow, seleziona the Bigtable change streams to Pub/Sub template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ pubSubTopic=PUBSUB_TOPIC
Sostituisci quanto segue:
PROJECT_ID
: l'ID progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome univoco del job a tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare l'ultima versione del modello, disponibile nella cartella principale senza data nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
BIGTABLE_INSTANCE_ID
: l'ID istanza Bigtable.BIGTABLE_TABLE_ID
: l'ID della tabella Bigtable.BIGTABLE_APPLICATION_PROFILE_ID
: l'ID del profilo app Bigtable.PUBSUB_TOPIC
: il nome dell'argomento di destinazione Pub/Sub
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "pubSubTopic": "PUBSUB_TOPIC" } } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome univoco del job a tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare l'ultima versione del modello, disponibile nella cartella principale senza data nel bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale con data nel bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
BIGTABLE_INSTANCE_ID
: l'ID istanza Bigtable.BIGTABLE_TABLE_ID
: l'ID della tabella Bigtable.BIGTABLE_APPLICATION_PROFILE_ID
: l'ID del profilo app Bigtable.PUBSUB_TOPIC
: il nome dell'argomento di destinazione Pub/Sub
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.