Vorlage „Spanner-Änderungsstreams für Quelldatenbank“

Streamingpipeline Liest Daten aus Spanner-Änderungsstreams und schreibt sie in eine Quelle.

Vorlagenparameter

Parameter Beschreibung
changeStreamName Der Name des Spanner-Änderungsstreams, aus dem die Pipeline liest.
instanceId Der Name der Spanner-Instanz, in der sich der Änderungsstream befindet.
databaseId Der Name der Spanner-Datenbank, die vom Änderungsstream überwacht wird.
spannerProjectId Der Name des Spanner-Projekts.
metadataInstance Die Instanz, in der die Metadaten gespeichert werden, die vom Connector verwendet werden, um die Nutzung der Daten der Change Stream API zu steuern.
metadataDatabase Die Datenbank, in der die Metadaten gespeichert werden, die vom Connector verwendet werden, um die Nutzung der Änderungsstream-API-Daten zu steuern.
sourceShardsFilePath Pfad zu einer Cloud Storage-Datei mit Verbindungsprofilinformationen für Quell-Shards.
startTimestamp Optional: Der Startzeitstempel zum Lesen von Änderungen. Die Standardeinstellung ist leer.
endTimestamp Optional: Der Endzeitstempel für das Lesen von Änderungen. Wenn kein Zeitstempel angegeben ist, wird unbegrenzt gelesen. Die Standardeinstellung ist leer.
shadowTablePrefix Optional: Das Präfix zum Benennen von Schattentabellen. Standardeinstellung: shadow_.
sessionFilePath Optional: Sitzungspfad in Cloud Storage, der Zuordnungsinformationen von HarbourBridge enthält.
filtrationMode Optional: Art der Filtration. Gibt an, wie bestimmte Datensätze anhand eines Kriteriums gelöscht werden. Die unterstützten Modi sind: none (keine Filterung) und forward_migration (Filtern von Datensätzen, die mit der Forward-Migrationspipeline geschrieben wurden). Die Standardeinstellung ist forward_migration.
shardingCustomJarPath Optional: Speicherort der benutzerdefinierten JAR-Datei in Cloud Storage, die die Anpassungslogik zum Abrufen der Shard-ID enthält. Wenn Sie diesen Parameter festlegen, müssen Sie auch den Parameter shardingCustomJarPath festlegen. Die Standardeinstellung ist leer.
shardingCustomClassName Optional: Vollständig qualifizierter Klassenname mit der benutzerdefinierten Implementierung der Shard-ID. Wenn shardingCustomJarPath angegeben ist, ist dieser Parameter erforderlich. Die Standardeinstellung ist leer.
shardingCustomParameters Optional: String mit allen benutzerdefinierten Parametern, die an die benutzerdefinierte Sharding-Klasse übergeben werden sollen. Die Standardeinstellung ist leer.
sourceDbTimezoneOffset Optional: Der Zeitzonenversatz von UTC für die Quelldatenbank. Beispielwert: +10:00. Die Standardeinstellung ist +00:00.
dlqGcsPubSubSubscription Optional: Das Pub/Sub-Abo, das in einer Cloud Storage-Benachrichtigungsrichtlinie für das DLQ-Wiederholverzeichnis verwendet wird, wenn der reguläre Modus genutzt wird. Der Name muss das Format projects/<project-id>/subscriptions/<subscription-name> haben. Wenn er festgelegt ist, werden „deadLetterQueueDirectory“ und „dlqRetryMinutes“ ignoriert.
skipDirectoryName Optional: Datensätze, die bei der Rückwärtsreplikation übersprungen werden, werden in dieses Verzeichnis geschrieben. Der Standardverzeichnisname ist „skip“.
maxShardConnections Optional: Die maximale Anzahl von Verbindungen, die ein bestimmter Shard akzeptieren kann. Die Standardeinstellung ist 10000.
deadLetterQueueDirectory Optional: Der Pfad, der beim Speichern der Fehlerwarteschlangenausgabe verwendet wird. Der Standardpfad ist ein Verzeichnis unter dem temporären Speicherort des Dataflow-Jobs.
dlqMaxRetryCount Optional: Die maximale Anzahl der Wiederholungsversuche über die Dead-Letter Queue bei vorübergehenden Fehlern. Die Standardeinstellung ist 500.
runMode Optional: Der Typ des Ausführungsmodus. Unterstützte Werte: regular, retryDLQ. Standardeinstellung: regular. Geben Sie retryDLQ an, wenn nur Datensätze in der Warteschlange für unzustellbare Nachrichten mit schwerwiegenden Wiederholungsversuchen wiederholt werden sollen.
dlqRetryMinutes Optional: Die Anzahl der Minuten zwischen DLQ-Wiederholungen (Dead Letter Queue). Der Standardwert ist 10.

Führen Sie die Vorlage aus.

Console

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Spanner Change Streams to Source Database templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

gcloud-CLI

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_to_SourceDb \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       changeStreamName=CHANGE_STREAM_NAME,\
       instanceId=INSTANCE_ID,\
       databaseId=DATABASE_ID,\
       spannerProjectId=SPANNER_PROJECT_ID,\
       metadataInstance=METADATA_INSTANCE,\
       metadataDatabase=METADATA_DATABASE,\
       sourceShardsFilePath=SOURCE_SHARDS_FILE_PATH,\

Ersetzen Sie Folgendes:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • CHANGE_STREAM_NAME: Der Name des Änderungsstreams, aus dem gelesen werden soll
  • INSTANCE_ID: Die Cloud Spanner-Instanz-ID.
  • DATABASE_ID: die Cloud Spanner-Datenbank-ID
  • SPANNER_PROJECT_ID: die Cloud Spanner-Projekt-ID.
  • METADATA_INSTANCE: die Cloud Spanner-Instanz zum Speichern von Metadaten beim Lesen aus Änderungsstreams
  • METADATA_DATABASE: die Cloud Spanner-Datenbank zum Speichern von Metadaten beim Lesen aus Änderungsstreams
  • SOURCE_SHARDS_FILE_PATH: Der Pfad zur GCS-Datei mit den Details zum Quellshard.

API

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "changeStreamName": "CHANGE_STREAM_NAME",
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "spannerProjectId": "SPANNER_PROJECT_ID",
       "metadataInstance": "METADATA_INSTANCE",
       "metadataDatabase": "METADATA_DATABASE",
       "sourceShardsFilePath": "SOURCE_SHARDS_FILE_PATH",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_to_SourceDb",
     "environment": { "maxWorkers": "10" }
  }
}

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Google Cloud Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • CHANGE_STREAM_NAME: Der Name des Änderungsstreams, aus dem gelesen werden soll
  • INSTANCE_ID: Die Cloud Spanner-Instanz-ID.
  • DATABASE_ID: die Cloud Spanner-Datenbank-ID
  • SPANNER_PROJECT_ID: die Cloud Spanner-Projekt-ID.
  • METADATA_INSTANCE: die Cloud Spanner-Instanz zum Speichern von Metadaten beim Lesen aus Änderungsstreams
  • METADATA_DATABASE: die Cloud Spanner-Datenbank zum Speichern von Metadaten beim Lesen aus Änderungsstreams
  • SOURCE_SHARDS_FILE_PATH: Der Pfad zur GCS-Datei mit den Details zum Quellshard.