Modello Datastream per MySQL o PostgreSQL (stream)

Il modello Datastream to SQL è una pipeline di streaming che legge i dati di Datastream e li replica in qualsiasi database MySQL o PostgreSQL. Il modello legge i dati da Cloud Storage utilizzando le notifiche Pub/Sub e li replica nelle tabelle di replica SQL. Specifica il parametro gcsPubSubSubscription per leggere i dati dalle notifiche Pub/Sub OPPURE fornisci il parametro inputFilePattern per leggere direttamente i dati dai file in Cloud Storage.

Il modello non supporta il linguaggio di definizione dei dati (DDL) e prevede che tutte le tabelle esistano già nel database. La replica utilizza le trasformazioni stateful di Dataflow per filtrare i dati obsoleti e garantire la coerenza dei dati non in ordine. Ad esempio, se è già stata elaborata una versione più recente di una riga, una versione in ritardo della stessa riga viene ignorata. Il Data Manipulation Language (DML) eseguito è un tentativo di replicare perfettamente i dati di origine in quelli di destinazione. Le istruzioni DML eseguite seguono le seguenti regole:

  • Se esiste una chiave primaria, le operazioni di inserimento e aggiornamento utilizzano la sintassi upsert (ad es. INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • Se esistono chiavi primarie, le eliminazioni vengono replicate come DML di eliminazione.
  • Se non esiste una chiave primaria, nella tabella vengono inserite sia le operazioni di inserimento che di aggiornamento.
  • Se non esistono chiavi primarie, le eliminazioni vengono ignorate.

Se utilizzi le utilità di migrazione da Oracle a Postgres, aggiungi ROWID in SQL come chiave primaria quando non esiste.

Requisiti della pipeline

  • Un flusso Datastream pronto per la replica dei dati o che la sta già eseguendo.
  • Notifiche Pub/Sub di Cloud Storage sono attive per i dati di Datastream.
  • Un database PostgreSQL è stato inizializzato con lo schema richiesto.
  • È configurato l'accesso alla rete tra i worker di Dataflow e PostgreSQL.

Parametri del modello

Parametri obbligatori

  • inputFilePattern: la posizione dei file Datastream in Cloud Storage da replicare. In genere, questa posizione del file è il percorso principale dello stream.
  • databaseHost: l'host SQL a cui connettersi.
  • databaseUser: l'utente SQL con tutte le autorizzazioni necessarie per scrivere in tutte le tabelle della replica.
  • databasePassword: la password per l'utente SQL.

Parametri facoltativi

  • gcsPubSubSubscription: la sottoscrizione Pub/Sub con le notifiche dei file Datastream. Ad esempio, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>.
  • inputFileFormat: il formato del file di output prodotto da Datastream. Ad esempio, avro o json. Il valore predefinito è avro.
  • streamName: il nome o il modello dello stream da interrogare per ottenere informazioni sullo schema. Il valore predefinito è {_metadata_stream}.
  • rfcStartDateTime: la data e l'ora di inizio utilizzate per il recupero da Cloud Storage (https://tools.ietf.org/html/rfc3339). Il valore predefinito è: 1970-01-01T00:00:00.00Z.
  • dataStreamRootUrl: URL radice dell'API Datastream. Il valore predefinito è https://datastream.googleapis.com/.
  • databaseType: il tipo di database in cui scrivere (ad esempio Postgres). Il valore predefinito è postgres.
  • databasePort: la porta del database SQL a cui connettersi. Il valore predefinito è 5432.
  • databaseName: il nome del database SQL a cui connettersi. Il valore predefinito è postgres.
  • schemaMap: una mappa di chiavi/valori utilizzata per determinare le modifiche ai nomi degli schemi (ad es. old_name:new_name,CaseError:case_error). Il valore predefinito è vuoto.
  • customConnectionString: stringa di connessione facoltativa che verrà utilizzata al posto della stringa di database predefinita.
  • numThreads: determina il parallelismo delle chiavi del passaggio Format to DML. In particolare, il valore viene passato a Reshuffle.withNumBuckets. Il valore predefinito è 100.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello di dataflow, seleziona the Cloud Datastream to SQL template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome univoco del job a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: il percorso Cloud Storage ai dati Datastream. Ad esempio: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: la sottoscrizione Pub/Sub da cui leggere i file modificati. Ad esempio: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: l'IP host SQL.
  • DATABASE_USER: il tuo utente SQL.
  • DATABASE_PASSWORD: la password SQL.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome univoco del job a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: il percorso Cloud Storage ai dati Datastream. Ad esempio: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: la sottoscrizione Pub/Sub da cui leggere i file modificati. Ad esempio: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: l'IP host SQL.
  • DATABASE_USER: il tuo utente SQL.
  • DATABASE_PASSWORD: la password SQL.

Passaggi successivi