Modello di flussi di modifiche di Spanner al database di origine

Pipeline di streaming. Legge i dati da Spanner Change Streams e li scrive in un'origine.

Parametri del modello

Parametro Descrizione
changeStreamName Il nome della modifica in tempo reale Spanner da cui legge la pipeline.
instanceId Il nome dell'istanza Spanner in cui è presente la modifica in tempo reale.
databaseId Il nome del database Spanner monitorato dal flusso di modifiche.
spannerProjectId Il nome del progetto Spanner.
metadataInstance L'istanza in cui archiviare i metadati utilizzati dal connettore per controllare il consumo dei dati dell'API change stream.
metadataDatabase Il database in cui archiviare i metadati utilizzati dal connettore per controllare il consumo dei dati dell'API Change Stream.
sourceShardsFilePath Percorso di un file Cloud Storage contenente le informazioni del profilo di connessione per gli shard di origine.
startTimestamp (Facoltativo) Il timestamp iniziale per la lettura cambia. Il valore predefinito è vuoto.
endTimestamp (Facoltativo) Il timestamp di fine per la lettura delle modifiche. Se non viene fornito alcun timestamp, la lettura è illimitata. Il valore predefinito è vuoto.
shadowTablePrefix (Facoltativo) Il prefisso utilizzato per denominare le tabelle shadow. Predefinito: shadow_.
sessionFilePath (Facoltativo) Il percorso della sessione in Cloud Storage che contiene le informazioni di mappatura di HarbourBridge.
filtrationMode (Facoltativo) Modalità di filtraggio. Specifica come eliminare determinati record in base a un criterio. Le modalità supportate sono: none (non filtrare nulla), forward_migration (filtra i record scritti utilizzando la pipeline di migrazione in avanti). Il valore predefinito è forward_migration.
shardingCustomJarPath (Facoltativo) Posizione del file JAR personalizzato in Cloud Storage che contiene la logica di personalizzazione per il recupero dell'ID shard. Se imposti questo parametro, imposta il parametro shardingCustomJarPath. Il valore predefinito è vuoto.
shardingCustomClassName (Facoltativo) Nome completo della classe con l'implementazione dell'ID shard personalizzato. Se shardingCustomJarPath è specificato, questo parametro è obbligatorio. Il valore predefinito è vuoto.
shardingCustomParameters (Facoltativo) Stringa contenente eventuali parametri personalizzati da passare alla classe di sharding personalizzata. Il valore predefinito è vuoto.
sourceDbTimezoneOffset (Facoltativo) Lo scarto di fuso orario rispetto a UTC per il database di origine. Valore di esempio: +10:00. Valore predefinito: +00:00.
dlqGcsPubSubSubscription (Facoltativo) L'abbonamento Pub/Sub utilizzato in un criterio di notifica Cloud Storage per la directory di nuovi tentativi DLQ durante l'esecuzione in modalità normale. Il nome deve essere nel formato projects/<project-id>/subscriptions/<subscription-name>. Se impostati, deadLetterQueueDirectory e dlqRetryMinutes vengono ignorati.
skipDirectoryName (Facoltativo) I record ignorati dalla replica inversa vengono scritti in questa directory. Il nome della directory predefinito è skip.
maxShardConnections (Facoltativo) Il numero massimo di connessioni che un determinato shard può accettare. Valore predefinito: 10000.
deadLetterQueueDirectory (Facoltativo) Il percorso utilizzato per archiviare l'output della coda di errori. Il percorso predefinito è una directory nella posizione temporanea del job Dataflow.
dlqMaxRetryCount (Facoltativo) Il numero massimo di volte in cui è possibile riprovare a eseguire gli errori temporanei tramite la coda dei messaggi non recapitabili. Il valore predefinito è 500.
runMode (Facoltativo) Il tipo di modalità di esecuzione. Valori supportati: regular, retryDLQ. Predefinito: regular. Specifica retryDLQ per riprovare solo i record della coda di messaggi non recapitabili gravi.
dlqRetryMinutes (Facoltativo) Il numero di minuti tra i nuovi tentativi della coda messaggi non recapitabili. Il valore predefinito è 10.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello di dataflow, seleziona the Spanner Change Streams to Source Database template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

Interfaccia a riga di comando gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_to_SourceDb \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       changeStreamName=CHANGE_STREAM_NAME,\
       instanceId=INSTANCE_ID,\
       databaseId=DATABASE_ID,\
       spannerProjectId=SPANNER_PROJECT_ID,\
       metadataInstance=METADATA_INSTANCE,\
       metadataDatabase=METADATA_DATABASE,\
       sourceShardsFilePath=SOURCE_SHARDS_FILE_PATH,\

Sostituisci quanto segue:

  • JOB_NAME: un nome univoco del job a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • CHANGE_STREAM_NAME: il nome dello stream di modifiche da cui leggere
  • INSTANCE_ID: l'ID istanza Cloud Spanner.
  • DATABASE_ID: l'ID del database Cloud Spanner.
  • SPANNER_PROJECT_ID: l'ID progetto Cloud Spanner.
  • METADATA_INSTANCE: l'istanza Cloud Spanner in cui archiviare i metadati durante la lettura dagli stream di modifiche
  • METADATA_DATABASE: il database Cloud Spanner in cui archiviare i metadati durante la lettura dagli stream di modifiche
  • SOURCE_SHARDS_FILE_PATH: il percorso del file GCS contenente i dettagli dello shard di origine

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "changeStreamName": "CHANGE_STREAM_NAME",
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "spannerProjectId": "SPANNER_PROJECT_ID",
       "metadataInstance": "METADATA_INSTANCE",
       "metadataDatabase": "METADATA_DATABASE",
       "sourceShardsFilePath": "SOURCE_SHARDS_FILE_PATH",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_to_SourceDb",
     "environment": { "maxWorkers": "10" }
  }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome univoco del job a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • CHANGE_STREAM_NAME: il nome dello stream di modifiche da cui leggere
  • INSTANCE_ID: l'ID istanza Cloud Spanner.
  • DATABASE_ID: l'ID del database Cloud Spanner.
  • SPANNER_PROJECT_ID: l'ID progetto Cloud Spanner.
  • METADATA_INSTANCE: l'istanza Cloud Spanner in cui archiviare i metadati durante la lettura dagli stream di modifiche
  • METADATA_DATABASE: il database Cloud Spanner in cui archiviare i metadati durante la lettura dagli stream di modifiche
  • SOURCE_SHARDS_FILE_PATH: il percorso del file GCS contenente i dettagli dello shard di origine