Il modello Datastream a BigQuery è una pipeline in modalità flusso che legge
i dati di Datastream e li replica in BigQuery. Il modello legge i dati da Cloud Storage
utilizzando le notifiche Pub/Sub e li replica in una tabella di gestione temporanea BigQuery
partizionata in base al tempo. Dopo la replica, il modello esegue un MERGE
in BigQuery
per eseguire l'upsert di tutte le modifiche di acquisizione dei dati di modifica (CDC) in una replica della tabella di origine. Specifica
il parametro gcsPubSubSubscription
per leggere i dati dalle notifiche Pub/Sub OPPURE
fornisci il parametro inputFilePattern
per leggere direttamente i dati dai file in Cloud Storage.
Il modello gestisce la creazione e l'aggiornamento delle tabelle BigQuery gestite dalla replica. Quando è richiesto il Data Definition Language (DDL), un callback a Datastream estrae lo schema della tabella di origine e lo traduce in tipi di dati BigQuery. Le operazioni supportate includono:
- Le nuove tabelle vengono create man mano che vengono inseriti i dati.
- Le nuove colonne vengono aggiunte alle tabelle BigQuery con valori iniziali nulli.
- Le colonne eliminate vengono ignorate in BigQuery e i valori futuri sono nulli.
- Le colonne rinominate vengono aggiunte a BigQuery come nuove colonne.
- Le modifiche al tipo non vengono propagate a BigQuery.
È consigliabile eseguire questa pipeline utilizzando la modalità di streaming almeno una volta, perché il modello esegue la deduplicazione quando unisce i dati da una tabella BigQuery temporanea alla tabella BigQuery principale. Questo passaggio della pipeline indica che non ci sono ulteriori vantaggi nell'utilizzo della modalità di streaming esattamente una volta.
Requisiti della pipeline
- Un flusso Datastream pronto per la replica dei dati o che la sta già eseguendo.
- Notifiche Pub/Sub di Cloud Storage sono attive per i dati di Datastream.
- Vengono creati i set di dati di destinazione BigQuery e all'account di servizio Compute Engine è stato concesso l'accesso amministrativo.
- Per creare la tabella di replica di destinazione è necessaria una chiave primaria nella tabella di origine.
- Un database di origine MySQL o Oracle. I database PostgreSQL e SQL Server non sono supportati.
Parametri del modello
Parametri obbligatori
- inputFilePattern: la posizione del file per l'output del file Datastream in Cloud Storage, nel formato
gs://<BUCKET_NAME>/<ROOT_PATH>/
. - inputFileFormat: il formato dei file di output prodotti da Datastream. I valori consentiti sono
avro
ejson
. Il valore predefinito èavro
. - gcsPubSubSubscription: l'abbonamento Pub/Sub utilizzato da Cloud Storage per notificare a Dataflow i nuovi file disponibili per l'elaborazione, nel formato:
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>
. - outputStagingDatasetTemplate: il nome del set di dati contenente le tabelle di gestione temporanea. Questo parametro supporta i modelli, ad esempio
{_metadata_dataset}_log
omy_dataset_log
. Normalmente, questo parametro è un nome di set di dati. Il valore predefinito è{_metadata_dataset}
. Nota: per le origini MySQL, il nome del database viene mappato a{_metadata_schema}
anziché a{_metadata_dataset}
. - outputDatasetTemplate: il nome del set di dati contenente le tabelle di replica. Questo parametro supporta i modelli, ad esempio
{_metadata_dataset}
omy_dataset
. Normalmente, questo parametro è un nome di set di dati. Il valore predefinito è{_metadata_dataset}
. Nota: per le origini MySQL, il nome del database viene mappato a{_metadata_schema}
anziché a{_metadata_dataset}
. - deadLetterQueueDirectory: il percorso utilizzato da Dataflow per scrivere l'output della coda dei messaggi non recapitabili. Questo percorso non deve trovarsi nello stesso percorso dell'output del file Datastream. Il valore predefinito è
empty
.
Parametri facoltativi
- streamName: il nome o il modello dello stream da interrogare per ottenere informazioni sullo schema. Il valore predefinito è: {_metadata_stream}. Il valore predefinito è solitamente sufficiente.
- rfcStartDateTime: la data e l'ora di inizio da utilizzare per recuperare i dati da Cloud Storage (https://tools.ietf.org/html/rfc3339). Valore predefinito:
1970-01-01T00:00:00.00Z
. - fileReadConcurrency: il numero di file DataStream simultanei da leggere. Il valore predefinito è
10
. - outputProjectId: l'ID del progetto Google Cloud che contiene i set di dati BigQuery in cui inserire i dati di output. Il valore predefinito di questo parametro è il progetto in cui è in esecuzione la pipeline Dataflow.
- outputStagingTableNameTemplate: il modello da utilizzare per denominare le tabelle di staging. Ad esempio,
{_metadata_table}
. Il valore predefinito è{_metadata_table}_log
. - outputTableNameTemplate: il modello da utilizzare per il nome delle tabelle di replica, ad esempio
{_metadata_table}
. Il valore predefinito è{_metadata_table}
. - ignoreFields: campi separati da virgole da ignorare in BigQuery. Valore predefinito:
_metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count
. Ad esempio:_metadata_stream,_metadata_schema
. - mergeFrequencyMinutes: il numero di minuti tra le unioni per una determinata tabella. Il valore predefinito è
5
. - dlqRetryMinutes: il numero di minuti tra i nuovi tentativi di DLQ. Il valore predefinito è
10
. - dataStreamRootUrl: l'URL radice dell'API Datastream. Il valore predefinito è https://datastream.googleapis.com/.
- applyMerge: indica se disattivare le query MERGE per il job. Il valore predefinito è
true
. - mergeConcurrency: il numero di query BigQuery MERGE simultanee. Efficace solo quando applyMerge è impostato su true. Il valore predefinito è
30
. - partitionRetentionDays: il numero di giorni da utilizzare per la conservazione delle partizioni durante l'esecuzione delle unioni BigQuery. Il valore predefinito è
1
. - useStorageWriteApiAtLeastOnce: questo parametro ha effetto solo se
Use BigQuery Storage Write API
è abilitato. Setrue
, per l'API Storage Write vengono utilizzate le semantiche almeno una volta. In caso contrario, vengono utilizzate le semantiche exactly-once. Il valore predefinito èfalse
. - javascriptTextTransformGcsPath: l'URI Cloud Storage del file .js che definisce la funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio,
gs://my-bucket/my-udfs/my_file.js
. - javascriptTextTransformFunctionName: il nome della funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è
myTransform(inJson) { /*...do stuff...*/ }
, il nome della funzione èmyTransform
. Per esempi di funzioni JavaScript definite dall'utente, vedi Esempi di UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). - javascriptTextTransformReloadIntervalMinutes: specifica la frequenza con cui ricaricare la funzione definita dall'utente, in minuti. Se il valore è maggiore di 0, Dataflow controlla periodicamente il file UDF in Cloud Storage e ricarica la UDF se il file viene modificato. Questo parametro ti consente di aggiornare la UDF mentre la pipeline è in esecuzione, senza dover riavviare il job. Se il valore è
0
, il ricaricamento delle UDF è disattivato. Il valore predefinito è0
. - pythonTextTransformGcsPath: il pattern del percorso Cloud Storage per il codice Python contenente le funzioni definite dall'utente. Ad esempio,
gs://your-bucket/your-transforms/*.py
. - pythonRuntimeVersion: la versione del runtime da utilizzare per questa UDF Python.
- pythonTextTransformFunctionName: il nome della funzione da chiamare dal file JavaScript. Utilizza solo lettere, cifre e trattini bassi. Ad esempio,
transform_udf1
. - runtimeRetries: il numero di tentativi di esecuzione di un runtime prima che l'operazione non vada a buon fine. Il valore predefinito è 5.
- useStorageWriteApi: se è true, la pipeline utilizza l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è
false
. Per ulteriori informazioni, consulta Utilizzo dell'API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: quando utilizzi l'API Storage Write, specifica il numero di flussi di scrittura. Se
useStorageWriteApi
ètrue
euseStorageWriteApiAtLeastOnce
èfalse
, devi impostare questo parametro. Il valore predefinito è 0. - storageWriteApiTriggeringFrequencySec: quando si utilizza l'API Storage Write, specifica la frequenza di attivazione, in secondi. Se
useStorageWriteApi
ètrue
euseStorageWriteApiAtLeastOnce
èfalse
, devi impostare questo parametro.
Funzione definita dall'utente
Se vuoi, puoi estendere questo modello scrivendo una funzione definita dall'utente (UDF). Il modello chiama la UDF per ogni elemento di input. I payload degli elementi vengono serializzati come stringhe JSON. Per ulteriori informazioni, consulta Creare funzioni definite dall'utente per i modelli Dataflow.
Specifiche della funzione
La funzione definita dall'utente ha le seguenti specifiche:
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione
predefinita è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Dal menu a discesa Modello di dataflow, seleziona the Datastream to BigQuery template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- (Facoltativo) Per passare dall'elaborazione exactly-once alla modalità di streaming almeno una volta, seleziona Almeno una volta.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --enable-streaming-engine \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\ outputStagingDatasetTemplate=BIGQUERY_DATASET,\ outputDatasetTemplate=BIGQUERY_DATASET,\ outputStagingTableNameTemplate=BIGQUERY_TABLE,\ outputTableNameTemplate=BIGQUERY_TABLE_log
Sostituisci quanto segue:
PROJECT_ID
: l'ID progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome univoco del job a tua sceltaREGION_NAME
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
: il percorso Cloud Storage ai dati Datastream. Ad esempio:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: la sottoscrizione Pub/Sub da cui leggere i file modificati. Ad esempio:projects/my-project-id/subscriptions/my-subscription-id
.BIGQUERY_DATASET
: il nome del set di dati BigQuery.BIGQUERY_TABLE
: il modello di tabella BigQuery. Ad esempio,{_metadata_schema}_{_metadata_table}_log
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME", "outputStagingDatasetTemplate": "BIGQUERY_DATASET", "outputDatasetTemplate": "BIGQUERY_DATASET", "outputStagingTableNameTemplate": "BIGQUERY_TABLE", "outputTableNameTemplate": "BIGQUERY_TABLE_log" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery", } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome univoco del job a tua sceltaLOCATION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
: il percorso Cloud Storage ai dati Datastream. Ad esempio:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: la sottoscrizione Pub/Sub da cui leggere i file modificati. Ad esempio:projects/my-project-id/subscriptions/my-subscription-id
.BIGQUERY_DATASET
: il nome del set di dati BigQuery.BIGQUERY_TABLE
: il modello di tabella BigQuery. Ad esempio,{_metadata_schema}_{_metadata_table}_log
Passaggi successivi
- Scopri come implementare Datastream e Dataflow per Analytics.
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.