Vorlage "Change Data Capture von MySQL für BigQuery mit Debezium und Pub/Sub (Stream)"
Mit Sammlungen den Überblick behalten
Sie können Inhalte basierend auf Ihren Einstellungen speichern und kategorisieren.
Die Vorlage "Change Data Capture von MySQL für BigQuery mit Debezium und Pub/Sub" ist eine Streamingpipeline, die Pub/Sub-Nachrichten mit Änderungsdaten aus einer MySQL-Datenbank liest und die Datensätze in BigQuery schreibt. Ein Debezium-Connector erfasst Änderungen an der MySQL-Datenbank und veröffentlicht die geänderten Daten in Pub/Sub. Die Vorlage liest dann die Pub/Sub-Nachrichten und schreibt sie in BigQuery.
Über diese Vorlage können Sie MySQL-Datenbanken und BigQuery-Tabellen miteinander synchronisieren. Die Pipeline schreibt die geänderten Daten in eine BigQuery-Staging-Tabelle und aktualisiert in regelmäßigen Abständen eine BigQuery-Tabelle zu Replikation der MySQL-Datenbank.
Die Pub/Sub-Nachrichten müssen in einer Beam Row serialisiert sein.
Vorlagenparameter
Erforderliche Parameter
inputSubscriptions: Die durch Kommas getrennte Liste der Pub/Sub-Eingabeabos, aus denen gelesen werden soll, im Format <SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, ....
changeLogDataset: Das BigQuery-Dataset, in dem die Staging-Tabellen gespeichert werden sollen, im Format <DATASET_NAME>.
replicaDataset: Der Speicherort des BigQuery-Datasets, in dem die Replikattabellen gespeichert werden sollen, im Format <DATASET_NAME>.
Optionale Parameter
inputTopics: Durch Kommas getrennte Liste von PubSub-Themen, an die CDC-Daten übertragen werden.
updateFrequencySecs: Das Intervall, in dem die Pipeline die BigQuery-Tabelle zur Replikation der MySQL-Datenbank aktualisiert.
useSingleTopic: Setzen Sie diesen Wert auf true, wenn Sie den Debezium-Connector so konfiguriert haben, dass alle Tabellenaktualisierungen in einem einzigen Thema veröffentlicht werden. Die Standardeinstellung ist "false".
useStorageWriteApiAtLeastOnce: Gibt bei Verwendung der Storage Write API die Schreibsemantik an. Wenn Sie die "Mindestens einmal"-Semantik verwenden möchten (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), legen Sie diesen Parameter auf true fest. Wenn Sie die "Genau einmal"-Semantik verwenden möchten, legen Sie den Parameter auf false fest. Dieser Parameter gilt nur, wenn useStorageWriteApitrue ist. Der Standardwert ist false.
numStorageWriteApiStreams: Gibt bei Verwendung der Storage Write API die Anzahl der Schreibstreams an. Wenn useStorageWriteApitrue und useStorageWriteApiAtLeastOncefalse ist, müssen Sie diesen Parameter festlegen. Die Standardeinstellung ist 0.
storageWriteApiTriggeringFrequencySec: Wenn Sie die Storage Write API verwenden, wird die Triggerhäufigkeit in Sekunden angegeben. Wenn useStorageWriteApitrue und useStorageWriteApiAtLeastOncefalse ist, müssen Sie diesen Parameter festlegen.
Führen Sie die Vorlage aus.
Führen Sie die folgenden Schritte aus, um diese Vorlage auszuführen:
[[["Leicht verständlich","easyToUnderstand","thumb-up"],["Mein Problem wurde gelöst","solvedMyProblem","thumb-up"],["Sonstiges","otherUp","thumb-up"]],[["Schwer verständlich","hardToUnderstand","thumb-down"],["Informationen oder Beispielcode falsch","incorrectInformationOrSampleCode","thumb-down"],["Benötigte Informationen/Beispiele nicht gefunden","missingTheInformationSamplesINeed","thumb-down"],["Problem mit der Übersetzung","translationIssue","thumb-down"],["Sonstiges","otherDown","thumb-down"]],["Zuletzt aktualisiert: 2025-08-18 (UTC)."],[[["\u003cp\u003eThis template streams change data from a MySQL database to BigQuery using Debezium, which captures changes and publishes them to Pub/Sub, and then the pipeline reads these messages and writes them to BigQuery.\u003c/p\u003e\n"],["\u003cp\u003eThe pipeline synchronizes MySQL databases with BigQuery tables by writing changed data to a staging table and intermittently updating a BigQuery table that mirrors the MySQL database.\u003c/p\u003e\n"],["\u003cp\u003eTo run the pipeline, a Debezium connector must be deployed, and Pub/Sub messages must be serialized in Beam Row format.\u003c/p\u003e\n"],["\u003cp\u003eRequired parameters for running the template include a comma-separated list of Pub/Sub input subscriptions, a BigQuery dataset for staging tables, and a BigQuery dataset for replica tables.\u003c/p\u003e\n"],["\u003cp\u003eThe pipeline supports the optional use of the BigQuery Storage Write API, offering both at-least-once and exactly-once semantics, along with configurable options for stream numbers and triggering frequency.\u003c/p\u003e\n"]]],[],null,["# Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream) template\n\nThe Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub\ntemplate is a streaming pipeline that reads Pub/Sub messages with change data from\na MySQL database and writes the records to BigQuery. A Debezium connector captures\nchanges to the MySQL database and publishes the changed data to Pub/Sub. The\ntemplate then reads the Pub/Sub messages and writes them to BigQuery.\n\nYou can use this template to sync MySQL databases and BigQuery tables. The\npipeline writes the changed data to a BigQuery staging table and intermittently\nupdates a BigQuery table replicating the MySQL database.\n\nPipeline requirements\n---------------------\n\n- The Debezium connector must be [deployed](https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/master/v2/cdc-parent#deploying-the-connector).\n- The Pub/Sub messages must be serialized in a [Beam Row](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/values/Row.html).\n\nTemplate parameters\n-------------------\n\n### Required parameters\n\n- **inputSubscriptions** : The comma-separated list of Pub/Sub input subscriptions to read from, in the format `\u003cSUBSCRIPTION_NAME\u003e,\u003cSUBSCRIPTION_NAME\u003e, ...`.\n- **changeLogDataset**: The BigQuery dataset to store the staging tables in, in the format \\\u003cDATASET_NAME\\\u003e.\n- **replicaDataset**: The location of the BigQuery dataset to store the replica tables in, in the format \\\u003cDATASET_NAME\\\u003e.\n\n### Optional parameters\n\n- **inputTopics**: Comma-separated list of PubSub topics to where CDC data is being pushed.\n- **updateFrequencySecs**: The interval at which the pipeline updates the BigQuery table replicating the MySQL database.\n- **useSingleTopic** : Set this to `true` if you configure your Debezium connector to publish all table updates to a single topic. Defaults to: false.\n- **useStorageWriteApi** : If true, the pipeline uses the BigQuery Storage Write API (\u003chttps://cloud.google.com/bigquery/docs/write-api\u003e). The default value is `false`. For more information, see Using the Storage Write API (\u003chttps://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api\u003e).\n- **useStorageWriteApiAtLeastOnce** : When using the Storage Write API, specifies the write semantics. To use at-least once semantics (\u003chttps://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics\u003e), set this parameter to `true`. To use exactly-once semantics, set the parameter to `false`. This parameter applies only when `useStorageWriteApi` is `true`. The default value is `false`.\n- **numStorageWriteApiStreams** : When using the Storage Write API, specifies the number of write streams. If `useStorageWriteApi` is `true` and `useStorageWriteApiAtLeastOnce` is `false`, then you must set this parameter. Defaults to: 0.\n- **storageWriteApiTriggeringFrequencySec** : When using the Storage Write API, specifies the triggering frequency, in seconds. If `useStorageWriteApi` is `true` and `useStorageWriteApiAtLeastOnce` is `false`, then you must set this parameter.\n\nRun the template\n----------------\n\nTo run this template, perform the following steps:\n\n1. On your local machine, clone the [DataflowTemplates repository](https://github.com/GoogleCloudPlatform/DataflowTemplates).\n2. Change to the `v2/cdc-parent` directory.\n3. Ensure that the Debezium connector is [deployed](https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/master/v2/cdc-parent#deploying-the-connector).\n4. Using Maven, run the Dataflow template: \n\n ```bash\n mvn exec:java -pl cdc-change-applier -Dexec.args=\"--runner=DataflowRunner \\\n --inputSubscriptions=\u003cvar translate=\"no\"\u003eSUBSCRIPTIONS\u003c/var\u003e \\\n --updateFrequencySecs=300 \\\n --changeLogDataset=\u003cvar translate=\"no\"\u003eCHANGELOG_DATASET\u003c/var\u003e \\\n --replicaDataset=\u003cvar translate=\"no\"\u003eREPLICA_DATASET\u003c/var\u003e \\\n --project=\u003cvar translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e \\\n --region=REGION_NAME\"\n \n ```\n\n Replace the following:\n - \u003cvar translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e: the Google Cloud project ID where you want to run the Dataflow job\n - \u003cvar translate=\"no\"\u003eSUBSCRIPTIONS\u003c/var\u003e: your comma-separated list of Pub/Sub subscription names\n - \u003cvar translate=\"no\"\u003eCHANGELOG_DATASET\u003c/var\u003e: your BigQuery dataset for changelog data\n - \u003cvar translate=\"no\"\u003eREPLICA_DATASET\u003c/var\u003e: your BigQuery dataset for replica tables\n\nWhat's next\n-----------\n\n- Learn about [Dataflow templates](/dataflow/docs/concepts/dataflow-templates).\n- See the list of [Google-provided templates](/dataflow/docs/guides/templates/provided-templates).\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e"]]