Plantilla de Datastream a Spanner

La plantilla de Datastream para Spanner es una canalización de transmisión que lee eventos de Datastream desde un bucket de Cloud Storage y los escribe en una base de datos de Spanner. Está diseñado para la migración de datos de fuentes de Datastream a Spanner.

Todas las tablas necesarias para la migración deben existir en la base de datos de destino de Spanner antes de la ejecución de la plantilla. Por lo tanto, la migración del esquema de una base de datos de origen a Spanner de destino se debe completar antes de la migración de datos. Los datos pueden existir en las tablas antes de la migración. Esta plantilla no propaga los cambios de esquema de Datastream a la base de datos de Spanner.

La coherencia de los datos solo está garantizada al final de la migración cuando todos los datos se escribieron en Spanner. Para almacenar información sobre el orden para cada registro escrito en Spanner, esta plantilla crea una tabla adicional (llamada tabla paralela) para cada tabla en la base de datos de Spanner. Esto se usa para garantizar la coherencia al final de la migración. Las tablas paralelas no se borran después de la migración y se pueden usar con fines de validación al final de la migración.

Cualquier error que ocurra durante la operación, como discrepancias de esquema, archivos JSON con formato incorrecto o errores resultantes de la ejecución de transformaciones, se registra en una cola de errores. La cola de errores es una carpeta de Cloud Storage que almacena todos los eventos de Datastream que encontraron errores junto con el motivo del error en formato de texto. Los errores pueden ser transitorios o permanentes, y se almacenan en las carpetas de Cloud Storage adecuadas en la cola de errores. Los errores transitorios se reintentan automáticamente, mientras que los errores permanentes no. En el caso de errores permanentes, tienes la opción de corregir los eventos de cambio y moverlos al bucket que se puede reintentar mientras se ejecuta la plantilla.

Requisitos de la canalización

  • Una transmisión de Datastream en estado En ejecución o No iniciado
  • Un bucket de Cloud Storage en el que se replican los eventos de Datastream.
  • Una base de datos de Spanner con tablas existentes. Estas tablas pueden estar vacías o contener datos.

Parámetros de la plantilla

Parámetros obligatorios

  • instanceId: La instancia de Spanner en la que se replican los cambios.
  • databaseId: La base de datos de Spanner en la que se replican los cambios.

Parámetros opcionales

  • inputFilePattern: La ubicación del archivo de Cloud Storage que contiene los archivos de Datastream para replicar. Por lo general, esta es la ruta de acceso raíz de una transmisión. Se inhabilitó la compatibilidad con esta función.
  • inputFileFormat: Es el formato del archivo de salida que produce Datastream. Por ejemplo, avro,json. La configuración predeterminada es avro.
  • sessionFilePath: Es la ruta de acceso del archivo de sesión en Cloud Storage que contiene información de asignación de HarbourBridge.
  • projectId: Es el ID del proyecto de Spanner.
  • spannerHost: Es el extremo de Cloud Spanner al que se llamará en la plantilla. Por ejemplo, https://batch-spanner.googleapis.com La configuración predeterminada es https://batch-spanner.googleapis.com.
  • gcsPubSubSubscription: La suscripción a Pub/Sub que se usa en una política de notificaciones de Cloud Storage. Para el nombre, usa el formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • streamName: El nombre o la plantilla del flujo que se consultará para obtener la información del esquema y el tipo de fuente.
  • shadowTablePrefix: Es el prefijo que se usa para nombrar las tablas de paralelas. Predeterminado: shadow_.
  • shouldCreateShadowTables: Esta marca indica si las tablas paralelas se deben crear en la base de datos de Cloud Spanner. Configuración predeterminada: verdadero.
  • rfcStartDateTime: La fecha y hora de inicio que se usa para recuperar desde Cloud Storage (https://tools.ietf.org/html/rfc3339). El valor predeterminado es: 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency: La cantidad de archivos de DataStream simultáneos que se leerán. La configuración predeterminada es 30.
  • deadLetterQueueDirectory: Es la ruta de acceso del archivo que se usa cuando se almacena el resultado de la cola de errores. La ruta predeterminada es un directorio en la ubicación temporal del trabajo de Dataflow.
  • dlqRetryMinutes: Es la cantidad de minutos entre reintentos de la cola de mensajes no entregados. La configuración predeterminada es 10.
  • dlqMaxRetryCount: Es la cantidad máxima de veces que se pueden reintentar los errores temporales a través de DLQ. La configuración predeterminada es 500.
  • dataStreamRootUrl: URL raíz de la API de Datastream. La configuración predeterminada es https://datastream.googleapis.com/.
  • datastreamSourceType: Este es el tipo de base de datos de origen a la que se conecta Datastream. Ejemplo: mysql/oracle. Debe configurarse cuando se realizan pruebas sin Datastream en ejecución real.
  • roundJsonDecimals: Si se establece esta marca, se redondean los valores decimales en las columnas json a un número que se puede almacenar sin pérdida de precisión. La configuración predeterminada es "false".
  • runMode: Este es el tipo de modo de ejecución, ya sea normal o con reintentoDLQ. La configuración predeterminada es: regular.
  • transformationContextFilePath: Es la ruta de acceso del archivo de contexto de transformación en el almacenamiento en la nube que se usa para propagar los datos que se usan en las transformaciones que se realizan durante las migraciones. p. ej., el ID del fragmento al nombre de la base de datos para identificar la base de datos desde la que se migró una fila.
  • directoryWatchDurationInMinutes: Es la duración durante la cual la canalización debe seguir sondeando un directorio en GCS. Los archivos de salida de Datastream se organizan en una estructura de directorio que muestra la marca de tiempo del evento agrupada por minutos. Este parámetro debe ser aproximadamente igual al retraso máximo que puede ocurrir entre el evento que ocurre en la base de datos de origen y el mismo evento que Datastream escribe en GCS. Percentil 99.9 = 10 minutos. La configuración predeterminada es 10.
  • spannerPriority: La prioridad de solicitud para llamadas de Cloud Spanner. El valor debe ser uno de los siguientes: [HIGH,MEDIUM,LOW]. El valor predeterminado es HIGH.
  • dlqGcsPubSubSubscription: La suscripción a Pub/Sub que se usa en una política de notificaciones de Cloud Storage para el directorio de reintentos de DLQ cuando se ejecuta en modo normal. Para el nombre, usa el formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>. Cuando se configuran, se ignoran deadLetterQueueDirectory y dlqRetryMinutes.
  • transformationJarPath: Es la ubicación del archivo JAR personalizado en Cloud Storage para el archivo que contiene la lógica de transformación personalizada para procesar registros en la migración directa. La configuración predeterminada es vacía.
  • transformationClassName: Es el nombre de clase completamente calificado que tiene la lógica de transformación personalizada. Es un campo obligatorio en caso de que se especifique transformJarPath. La configuración predeterminada es vacía.
  • transformationCustomParameters: Es la cadena que contiene cualquier parámetro personalizado que se pasará a la clase de transformación personalizada. La configuración predeterminada es vacía.
  • filteredEventsDirectory: Es la ruta de acceso del archivo para almacenar los eventos filtrados a través de la transformación personalizada. El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow. El valor predeterminado es suficiente en la mayoría de las condiciones.
  • shardingContextFilePath: La ruta de acceso del archivo de contexto de fragmentación en Cloud Storage se usa para propagar el ID de fragmento en la base de datos de Spanner para cada fragmento de origen.Tiene el formato Map<stream_name, Map<db_name, shard_id>>.
  • tableOverrides: Son las anulaciones de nombres de tablas de la fuente a Spanner. Se escriben en el siguiente formato: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]. En este ejemplo, se muestra cómo asignar la tabla Singers a Vocalists y la tabla Albums a Records. Por ejemplo, [{Singers, Vocalists}, {Albums, Records}] La configuración predeterminada es vacía.
  • columnOverrides: Son las anulaciones de nombres de columna de la fuente a Spanner. Se escriben en el siguiente formato: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]. Ten en cuenta que SourceTableName debe permanecer igual en el par de fuente y Spanner. Para anular los nombres de las tablas, usa tableOverrides.En el ejemplo, se asigna SingerName a TalentName y AlbumName a RecordName en las tablas Singers y Albums, respectivamente. Por ejemplo, [{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}] La configuración predeterminada es vacía.
  • schemaOverridesFilePath: Es un archivo que especifica la tabla y las anulaciones de nombres de columna de la fuente a Spanner. La configuración predeterminada es vacía.

Ejecuta la plantilla

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Datastream to Spanner template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage que se usa para almacenar eventos de Datastream. Por ejemplo: gs://bucket/path/to/data/.
  • CLOUDSPANNER_INSTANCE: es la instancia de Spanner.
  • CLOUDSPANNER_DATABASE: es la base de datos de Spanner.
  • DLQ: es la ruta de acceso de Cloud Storage para el directorio de la cola de errores.

API

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage que se usa para almacenar eventos de Datastream. Por ejemplo: gs://bucket/path/to/data/.
  • CLOUDSPANNER_INSTANCE: es la instancia de Spanner.
  • CLOUDSPANNER_DATABASE: es la base de datos de Spanner.
  • DLQ: es la ruta de acceso de Cloud Storage para el directorio de la cola de errores.

¿Qué sigue?