Esegui il trasferimento di Change Data Capture da MySQL a BigQuery utilizzando il modello Debezium e Pub/Sub (flusso)

Il modello di acquisizione dei dati modificati da MySQL a BigQuery utilizzando Debezium e Pub/Sub è una pipeline in modalità flusso che legge i messaggi Pub/Sub con dati di modifica da un database MySQL e scrive i record in BigQuery. Un connettore Debezium acquisisce le modifiche al database MySQL e pubblica i dati modificati in Pub/Sub. Il modello legge quindi i messaggi Pub/Sub e li scrive in BigQuery.

Puoi utilizzare questo modello per sincronizzare i database MySQL e le tabelle BigQuery. La pipeline scrive i dati modificati in una tabella di staging BigQuery e intermittentemente aggiorna una tabella BigQuery che replica il database MySQL.

Requisiti della pipeline

  • Il connettore Debezium deve essere implementato.
  • I messaggi Pub/Sub devono essere serializzati in una riga Beam.

Parametri del modello

Parametri obbligatori

  • inputSubscriptions: l'elenco separato da virgole delle sottoscrizioni di input Pub/Sub da cui leggere, nel formato <SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, ....
  • changeLogDataset: il set di dati BigQuery in cui archiviare le tabelle di gestione temporanea, nel formato <DATASET_NAME>.
  • replicaDataset: la posizione del set di dati BigQuery in cui archiviare le tabelle di replica, nel formato <DATASET_NAME>.

Parametri facoltativi

  • inputTopics: elenco separato da virgole di argomenti PubSub a cui vengono inviati i dati CDC.
  • updateFrequencySecs: l'intervallo a cui la pipeline aggiorna la tabella BigQuery che replica il database MySQL.
  • useSingleTopic: imposta questo valore su true se configuri il connettore Debezium per pubblicare tutti gli aggiornamenti delle tabelle in un unico argomento. Il valore predefinito è false.
  • useStorageWriteApi: se true, la pipeline utilizza l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). Il valore predefinito è false. Per ulteriori informazioni, consulta Utilizzo dell'API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: quando utilizzi l'API Storage Write, specifica la semantica di scrittura. Per utilizzare la semantica almeno una volta (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), imposta questo parametro su true. Per utilizzare la semantica esattamente una volta, imposta il parametro su false. Questo parametro si applica solo quando useStorageWriteApi è true. Il valore predefinito è false.
  • numStorageWriteApiStreams: quando utilizzi l'API Storage Write, specifica il numero di stream di scrittura. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro. Il valore predefinito è 0.
  • storageWriteApiTriggeringFrequencySec: quando utilizzi l'API Storage Write, specifica la frequenza di attivazione in secondi. Se useStorageWriteApi è true e useStorageWriteApiAtLeastOnce è false, devi impostare questo parametro.

Esegui il modello

Per eseguire questo modello, segui questi passaggi:

  1. Sulla tua macchina locale, clona il repository DataflowTemplates.
  2. Passa alla directory v2/cdc-parent.
  3. Assicurati che il connettore Debezium sia implementato.
  4. Utilizza Maven per eseguire il modello Dataflow:
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
        --inputSubscriptions=SUBSCRIPTIONS \
        --updateFrequencySecs=300 \
        --changeLogDataset=CHANGELOG_DATASET \
        --replicaDataset=REPLICA_DATASET \
        --project=PROJECT_ID \
        --region=REGION_NAME"
      

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
    • SUBSCRIPTIONS: l'elenco separato da virgole dei nomi delle sottoscrizioni Pub/Sub
    • CHANGELOG_DATASET: il set di dati BigQuery per i dati del log delle modifiche
    • REPLICA_DATASET: il tuo set di dati BigQuery per le tabelle delle repliche

Passaggi successivi