Il modello Datastream to SQL è una pipeline di streaming che legge
i dati di Datastream e li replica in qualsiasi database MySQL o PostgreSQL. Il modello legge i dati da Cloud Storage
utilizzando le notifiche Pub/Sub e li replica nelle tabelle di replica SQL. Specifica
il parametro gcsPubSubSubscription
per leggere i dati dalle notifiche Pub/Sub OPPURE
fornisci il parametro inputFilePattern
per leggere direttamente i dati dai file in Cloud Storage.
Il modello non supporta il linguaggio di definizione dei dati (DDL) e prevede che tutte le tabelle esistano già nel database. La replica utilizza le trasformazioni stateful di Dataflow per filtrare i dati obsoleti e garantire la coerenza dei dati non in ordine. Ad esempio, se è già stata elaborata una versione più recente di una riga, una versione in ritardo della stessa riga viene ignorata. Il Data Manipulation Language (DML) eseguito è un tentativo di replicare perfettamente i dati di origine in quelli di destinazione. Le istruzioni DML eseguite seguono le seguenti regole:
- Se esiste una chiave primaria, le operazioni di inserimento e aggiornamento utilizzano la sintassi upsert
(ad es.
INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE
). - Se esistono chiavi primarie, le eliminazioni vengono replicate come DML di eliminazione.
- Se non esiste una chiave primaria, nella tabella vengono inserite sia le operazioni di inserimento che di aggiornamento.
- Se non esistono chiavi primarie, le eliminazioni vengono ignorate.
Se utilizzi le utilità di migrazione da Oracle a Postgres, aggiungi ROWID
in
SQL come chiave primaria quando non esiste.
Requisiti della pipeline
- Un flusso Datastream pronto per la replica dei dati o che la sta già eseguendo.
- Notifiche Pub/Sub di Cloud Storage sono attive per i dati di Datastream.
- Un database PostgreSQL è stato inizializzato con lo schema richiesto.
- È configurato l'accesso alla rete tra i worker di Dataflow e PostgreSQL.
Parametri del modello
Parametri obbligatori
- inputFilePattern: la posizione dei file Datastream in Cloud Storage da replicare. In genere, questa posizione del file è il percorso principale dello stream.
- databaseHost: l'host SQL a cui connettersi.
- databaseUser: l'utente SQL con tutte le autorizzazioni necessarie per scrivere in tutte le tabelle della replica.
- databasePassword: la password per l'utente SQL.
Parametri facoltativi
- gcsPubSubSubscription: la sottoscrizione Pub/Sub con le notifiche dei file Datastream. Ad esempio,
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>
. - inputFileFormat: il formato del file di output prodotto da Datastream. Ad esempio,
avro
ojson
. Il valore predefinito èavro
. - streamName: il nome o il modello dello stream da interrogare per ottenere informazioni sullo schema. Il valore predefinito è
{_metadata_stream}
. - rfcStartDateTime: la data e l'ora di inizio utilizzate per il recupero da Cloud Storage (https://tools.ietf.org/html/rfc3339). Il valore predefinito è: 1970-01-01T00:00:00.00Z.
- dataStreamRootUrl: URL radice dell'API Datastream. Il valore predefinito è https://datastream.googleapis.com/.
- databaseType: il tipo di database in cui scrivere (ad esempio Postgres). Il valore predefinito è postgres.
- databasePort: la porta del database SQL a cui connettersi. Il valore predefinito è
5432
. - databaseName: il nome del database SQL a cui connettersi. Il valore predefinito è
postgres
. - schemaMap: una mappa di chiavi/valori utilizzata per determinare le modifiche ai nomi degli schemi (ad es. old_name:new_name,CaseError:case_error). Il valore predefinito è vuoto.
- customConnectionString: stringa di connessione facoltativa che verrà utilizzata al posto della stringa di database predefinita.
- numThreads: determina il parallelismo delle chiavi del passaggio Format to DML. In particolare, il valore viene passato a Reshuffle.withNumBuckets. Il valore predefinito è 100.
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione
predefinita è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Dal menu a discesa Modello di dataflow, seleziona the Cloud Datastream to SQL template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --enable-streaming-engine \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\ databaseHost=DATABASE_HOST,\ databaseUser=DATABASE_USER,\ databasePassword=DATABASE_PASSWORD
Sostituisci quanto segue:
PROJECT_ID
: l'ID progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome univoco del job a tua sceltaREGION_NAME
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
: il percorso Cloud Storage ai dati Datastream. Ad esempio:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: la sottoscrizione Pub/Sub da cui leggere i file modificati. Ad esempio:projects/my-project-id/subscriptions/my-subscription-id
.DATABASE_HOST
: l'IP host SQL.DATABASE_USER
: il tuo utente SQL.DATABASE_PASSWORD
: la password SQL.
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME", "databaseHost": "DATABASE_HOST", "databaseUser": "DATABASE_USER", "databasePassword": "DATABASE_PASSWORD" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL", } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome univoco del job a tua sceltaLOCATION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
: il percorso Cloud Storage ai dati Datastream. Ad esempio:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: la sottoscrizione Pub/Sub da cui leggere i file modificati. Ad esempio:projects/my-project-id/subscriptions/my-subscription-id
.DATABASE_HOST
: l'IP host SQL.DATABASE_USER
: il tuo utente SQL.DATABASE_PASSWORD
: la password SQL.
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.