使用 Debezium 和 Pub/Sub (串流) 範本,透過變更資料擷取將資料從 MySQL 同步處理至 BigQuery

「使用 Debezium 和 Pub/Sub 從 MySQL 擷取變更資料並寫入 BigQuery」範本是一種串流管道,可從 MySQL 資料庫讀取含有變更資料的 Pub/Sub 訊息,然後將記錄寫入 BigQuery。Debezium 連接器會擷取 MySQL 資料庫的變更,並將變更後的資料發布至 Pub/Sub。範本接著會讀取 Pub/Sub 訊息,並將其寫入 BigQuery。

這個範本可讓您同步處理 MySQL 資料庫和 BigQuery 資料表。管道會將變更的資料寫入 BigQuery 暫存資料表,並間歇性更新 BigQuery 資料表,複製 MySQL 資料庫。

管道相關規定

  • 必須部署 Debezium 連接器。
  • Pub/Sub 訊息必須以 Beam Row 序列化。

範本參數

必要參數

  • inputSubscriptions:要讀取的 Pub/Sub 輸入訂閱清單 (以半形逗號分隔),格式為 <SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, ...
  • changeLogDataset:用於儲存暫存資料表的 BigQuery 資料集,格式為 <DATASET_NAME>。
  • replicaDataset:BigQuery 資料集的位置,用於儲存副本資料表,格式為 <DATASET_NAME>。

選用參數

  • inputTopics:以半形逗號分隔的 Pub/Sub 主題清單,CDC 資料會推送至這些主題。
  • updateFrequencySecs:管道更新 BigQuery 資料表 (複製 MySQL 資料庫) 的間隔。
  • useSingleTopic:如果將 Debezium 連接器設定為將所有資料表更新發布至單一主題,請將此值設為 true。預設值為 false。
  • useStorageWriteApi:如果為 true,管道會使用 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)。預設值為 false。詳情請參閱「使用 Storage Write API」(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)。
  • useStorageWriteApiAtLeastOnce:使用 Storage Write API 時,指定寫入語意。如要使用「至少一次」語意 (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics),請將這個參數設為 true。如要使用「僅限一次」語意,請將參數設為 false。只有在 useStorageWriteApitrue 時,這項參數才會生效。預設值為 false
  • numStorageWriteApiStreams:使用 Storage Write API 時,指定寫入串流的數量。如果 useStorageWriteApitrue,且 useStorageWriteApiAtLeastOncefalse,則必須設定這個參數。預設值為 0。
  • storageWriteApiTriggeringFrequencySec:使用 Storage Write API 時,指定觸發頻率 (以秒為單位)。如果 useStorageWriteApitrue,且 useStorageWriteApiAtLeastOncefalse,則必須設定這個參數。

執行範本

如要執行這個範本,請按照下列步驟操作:

  1. 在本機電腦上複製 DataflowTemplates 存放區
  2. 切換至 v2/cdc-parent 目錄。
  3. 確認 Debezium 連接器已部署
  4. 使用 Maven 執行 Dataflow 範本:
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
        --inputSubscriptions=SUBSCRIPTIONS \
        --updateFrequencySecs=300 \
        --changeLogDataset=CHANGELOG_DATASET \
        --replicaDataset=REPLICA_DATASET \
        --project=PROJECT_ID \
        --region=REGION_NAME"
      

    更改下列內容:

    • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
    • SUBSCRIPTIONS:以半形逗號分隔的 Pub/Sub 訂閱項目名稱清單
    • CHANGELOG_DATASET:用於存放變更記錄資料的 BigQuery 資料集
    • REPLICA_DATASET:用於副本資料表的 BigQuery 資料集

後續步驟