Plantilla de Spanner Change Streams a Source Database

Flujo de procesamiento en streaming. Lee datos de los flujos de cambios de Spanner y los escribe en una fuente.

Parámetros de plantilla

Parámetro Descripción
changeStreamName Nombre del flujo de cambios de Spanner del que lee la canalización.
instanceId Nombre de la instancia de Spanner en la que se encuentra el flujo de cambios.
databaseId Nombre de la base de datos de Spanner que monitoriza el flujo de cambios.
spannerProjectId El nombre del proyecto de Spanner.
metadataInstance La instancia para almacenar los metadatos que usa el conector para controlar el consumo de los datos de la API de flujo de cambios.
metadataDatabase La base de datos en la que se almacenan los metadatos que usa el conector para controlar el consumo de los datos de la API Change Stream.
sourceShardsFilePath Ruta a un archivo de Cloud Storage que contiene información del perfil de conexión de los fragmentos de origen.
startTimestamp Opcional: Marca de tiempo inicial para leer los cambios. El valor predeterminado es una cadena vacía.
endTimestamp Opcional: Marca de tiempo final para leer los cambios. Si no se proporciona ninguna marca de tiempo, se lee indefinidamente. El valor predeterminado es una cadena vacía.
shadowTablePrefix Opcional: prefijo usado para asignar nombres a las tablas de sombra. Valor predeterminado: shadow_.
sessionFilePath Opcional: ruta de la sesión en Cloud Storage que contiene información de asignación de HarbourBridge.
filtrationMode Opcional: modo de filtración. Especifica cómo se deben eliminar determinados registros en función de un criterio. Los modos admitidos son: none (no filtrar nada), forward_migration (filtrar los registros escritos mediante la pipeline de migración hacia adelante). El valor predeterminado es forward_migration.
shardingCustomJarPath Opcional: ubicación del archivo JAR personalizado en Cloud Storage que contiene la lógica de personalización para obtener el ID de fragmento. Si defines este parámetro, define el parámetro shardingCustomJarPath. El valor predeterminado es una cadena vacía.
shardingCustomClassName Opcional: nombre de clase completo que tiene la implementación del ID de fragmento personalizado. Si se especifica shardingCustomJarPath, este parámetro es obligatorio. El valor predeterminado es una cadena vacía.
shardingCustomParameters Opcional: cadena que contiene los parámetros personalizados que se van a transferir a la clase de partición personalizada. El valor predeterminado es una cadena vacía.
sourceDbTimezoneOffset Opcional: la diferencia horaria con respecto al UTC de la base de datos de origen. Valor de ejemplo: +10:00. El valor predeterminado es +00:00.
dlqGcsPubSubSubscription Opcional: la suscripción de Pub/Sub que se usa en una política de notificaciones de Cloud Storage para el directorio de reintentos de DLQ cuando se ejecuta en modo normal. El nombre debe tener el formato projects/<project-id>/subscriptions/<subscription-name>. Si se define, se ignoran deadLetterQueueDirectory y dlqRetryMinutes.
skipDirectoryName Opcional: Los registros omitidos de la replicación inversa se escriben en este directorio. El nombre de directorio predeterminado es skip.
maxShardConnections Opcional: número máximo de conexiones que puede aceptar un fragmento determinado. El valor predeterminado es 10000.
deadLetterQueueDirectory Opcional: la ruta que se usa al almacenar la salida de la cola de errores. La ruta predeterminada es un directorio de la ubicación temporal de la tarea de Dataflow.
dlqMaxRetryCount Opcional: número máximo de veces que se pueden reintentar los errores temporales a través de la cola de mensajes fallidos. El valor predeterminado es 500.
runMode Opcional: el tipo de modo de ejecución. Valores admitidos: regular, retryDLQ. Valor predeterminado: regular. Especifica retryDLQ es solo para registros de cola de mensajes fallidos graves de reintento.
dlqRetryMinutes Opcional: número de minutos entre reintentos de la cola de mensajes fallidos. El valor predeterminado es 10.

Ejecutar la plantilla

Consola

  1. Ve a la página Crear tarea a partir de plantilla de Dataflow.
  2. Ir a Crear tarea a partir de plantilla
  3. En el campo Nombre de la tarea, introduce un nombre único.
  4. Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es us-central1.

    Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de flujo de datos, seleccione the Spanner Change Streams to Source Database template.
  6. En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
  7. Haz clic en Ejecutar trabajo.

CLI de gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_to_SourceDb \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       changeStreamName=CHANGE_STREAM_NAME,\
       instanceId=INSTANCE_ID,\
       databaseId=DATABASE_ID,\
       spannerProjectId=SPANNER_PROJECT_ID,\
       metadataInstance=METADATA_INSTANCE,\
       metadataDatabase=METADATA_DATABASE,\
       sourceShardsFilePath=SOURCE_SHARDS_FILE_PATH,\

Haz los cambios siguientes:

  • JOB_NAME: un nombre de trabajo único que elijas
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • CHANGE_STREAM_NAME: el nombre del flujo de cambios del que se va a leer
  • INSTANCE_ID: el ID de la instancia de Cloud Spanner.
  • DATABASE_ID: el ID de la base de datos de Cloud Spanner.
  • SPANNER_PROJECT_ID: el ID de proyecto de Cloud Spanner.
  • METADATA_INSTANCE: la instancia de Cloud Spanner para almacenar metadatos al leer de los flujos de cambios
  • METADATA_DATABASE: la base de datos de Cloud Spanner para almacenar metadatos al leer de los flujos de cambios
  • SOURCE_SHARDS_FILE_PATH: la ruta al archivo de GCS que contiene los detalles del fragmento de origen

API

Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "changeStreamName": "CHANGE_STREAM_NAME",
       "instanceId": "INSTANCE_ID",
       "databaseId": "DATABASE_ID",
       "spannerProjectId": "SPANNER_PROJECT_ID",
       "metadataInstance": "METADATA_INSTANCE",
       "metadataDatabase": "METADATA_DATABASE",
       "sourceShardsFilePath": "SOURCE_SHARDS_FILE_PATH",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_to_SourceDb",
     "environment": { "maxWorkers": "10" }
  }
}

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • JOB_NAME: un nombre de trabajo único que elijas
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • LOCATION: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • CHANGE_STREAM_NAME: el nombre del flujo de cambios del que se va a leer
  • INSTANCE_ID: el ID de la instancia de Cloud Spanner.
  • DATABASE_ID: el ID de la base de datos de Cloud Spanner.
  • SPANNER_PROJECT_ID: el ID de proyecto de Cloud Spanner.
  • METADATA_INSTANCE: la instancia de Cloud Spanner para almacenar metadatos al leer de los flujos de cambios
  • METADATA_DATABASE: la base de datos de Cloud Spanner para almacenar metadatos al leer de los flujos de cambios
  • SOURCE_SHARDS_FILE_PATH: la ruta al archivo de GCS que contiene los detalles del fragmento de origen