En esta página se describen los flujos de cambios de Spanner para bases de datos con dialecto GoogleSQL y PostgreSQL, entre los que se incluyen los siguientes:
- Modelo de partición basado en divisiones
- Formato y contenido de los registros de flujo de cambios
- La sintaxis de nivel inferior que se usa para consultar esos registros
- Ejemplo del flujo de trabajo de la consulta
Usas la API de Spanner para consultar flujos de cambios directamente. Las aplicaciones que usan Dataflow para leer datos de flujo de cambios no tienen que trabajar directamente con el modelo de datos descrito aquí.
Para obtener una guía de introducción más amplia sobre los flujos de cambios, consulta el resumen de los flujos de cambios.
Cambiar las particiones de un flujo
Cuando se produce un cambio en una tabla que está monitorizada por un flujo de cambios, Spanner escribe un registro de flujo de cambios correspondiente en la base de datos de forma síncrona en la misma transacción que el cambio de datos. Esto significa que, si la transacción se realiza correctamente, Spanner también habrá capturado y conservado el cambio. Internamente, Spanner coloca el registro del flujo de cambios y el cambio de datos en la misma ubicación para que los procese el mismo servidor y se minimice la sobrecarga de escritura.
Como parte de la DML de una división concreta, Spanner añade la escritura a la división de datos de flujo de cambios correspondiente en la misma transacción. Gracias a esta colocación, los flujos de cambios no añaden una coordinación adicional entre los recursos de servicio, lo que minimiza la sobrecarga de confirmación de las transacciones.
Spanner se escala dividiendo y combinando datos de forma dinámica en función de la carga y el tamaño de la base de datos, y distribuyendo las divisiones entre los recursos de servicio.
Para que las lecturas y escrituras de los flujos de cambios se puedan escalar, Spanner divide y combina el almacenamiento interno de los flujos de cambios junto con los datos de la base de datos, lo que evita automáticamente los puntos de acceso. Para admitir la lectura de registros de flujo de cambios casi en tiempo real a medida que se escalan las escrituras de la base de datos, la API de Spanner se ha diseñado para que se pueda consultar un flujo de cambios simultáneamente mediante particiones de flujo de cambios. Cambia el mapa de particiones de flujo de cambios para modificar las divisiones de datos de flujo de cambios que contienen los registros de flujo de cambios. Las particiones de un flujo de cambios cambian dinámicamente a lo largo del tiempo y están correlacionadas con la forma en que Spanner divide y combina dinámicamente los datos de la base de datos.
Una partición de flujo de cambios contiene registros de un intervalo de claves inmutable durante un periodo específico. Cualquier partición de flujo de cambios se puede dividir en una o varias particiones de flujo de cambios, o bien se puede combinar con otras particiones de flujo de cambios. Cuando se producen estos eventos de división o combinación, se crean particiones secundarias para registrar los cambios de sus respectivos intervalos de claves inmutables en el siguiente intervalo de tiempo. Además de los registros de cambios de datos, una consulta de flujo de cambios devuelve registros de particiones secundarias para notificar a los lectores las nuevas particiones de flujo de cambios que se deben consultar, así como registros de latido para indicar el progreso cuando no se han producido escrituras recientemente.
Cuando se consulta una partición de un flujo de cambios concreto, los registros de cambios se devuelven en orden de marca de tiempo de confirmación. Cada registro de cambios se devuelve exactamente una vez. No se garantiza el orden de los registros de cambios en las particiones de la secuencia de cambios. Los registros de cambios de una clave principal concreta solo se devuelven en una partición durante un periodo específico.
Debido al linaje de particiones principales y secundarias, para procesar los cambios de una clave concreta en orden de marca de tiempo de confirmación, los registros devueltos de las particiones secundarias solo se deben procesar después de que se hayan procesado los registros de todas las particiones principales.
Funciones de lectura de secuencias de cambios y sintaxis de consultas
GoogleSQL
Para consultar los flujos de cambios, usa la API ExecuteStreamingSql
. Spanner crea automáticamente una función de lectura especial junto con el flujo de cambios. La función de lectura proporciona acceso a los registros del flujo de cambios. La convención de nomenclatura de la función de lectura es READ_change_stream_name
.
Si hay un flujo de cambios SingersNameStream
en la base de datos, la sintaxis de consulta de GoogleSQL es la siguiente:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
La función de lectura acepta los siguientes argumentos:
Nombre del argumento | Tipo | ¿Es obligatorio? | Descripción |
---|---|---|---|
start_timestamp |
TIMESTAMP |
Obligatorio | Especifica que se deben devolver los registros con commit_timestamp mayor o igual que start_timestamp . El valor debe estar dentro del periodo de conservación del flujo de cambios, ser inferior o igual a la hora actual y ser superior o igual a la marca de tiempo de la creación del flujo de cambios. |
end_timestamp |
TIMESTAMP |
Opcional (valor predeterminado: NULL ) |
Especifica que se deben devolver los registros con un commit_timestamp inferior o igual a end_timestamp . El valor debe estar dentro del periodo de retención del flujo de cambios y ser igual o superior a start_timestamp . La consulta finaliza cuando se devuelven todos los ChangeRecords
hasta el end_timestamp o cuando terminas la
conexión. Si end_timestamp se define como NULL
o no se especifica, la consulta sigue ejecutándose hasta que se devuelvan todos los
ChangeRecords o hasta que finalices la
conexión. |
partition_token |
STRING |
Opcional (valor predeterminado: NULL ) |
Especifica qué partición del flujo de cambios se va a consultar en función del contenido de los registros de particiones secundarias. Si se incluye NULL o no se especifica, significa que el lector está consultando el flujo de cambios por primera vez y no ha obtenido ningún token de partición específico para consultar. |
heartbeat_milliseconds |
INT64 |
Obligatorio | Determina con qué frecuencia se devuelve un latido ChangeRecord en caso de que no se hayan confirmado transacciones en esta partición.
El valor debe estar entre 1,000 (un segundo) y 300,000 (cinco minutos). |
read_options |
ARRAY |
Opcional (valor predeterminado: NULL ) |
Añade opciones de lectura reservadas para uso futuro. El único valor permitido es NULL . |
Te recomendamos que crees un método auxiliar para generar el texto de la consulta de la función de lectura y vincularle parámetros, tal como se muestra en el siguiente ejemplo.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
Para consultar los flujos de cambios, usa la API ExecuteStreamingSql
. Spanner crea automáticamente una función de lectura especial junto con el flujo de cambios. La función de lectura proporciona acceso a los registros del flujo de cambios. La convención de nomenclatura de la función de lectura es spanner.read_json_change_stream_name
.
Si existe un flujo de cambios SingersNameStream
en la base de datos, la sintaxis de consulta de PostgreSQL es la siguiente:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
La función de lectura acepta los siguientes argumentos:
Nombre del argumento | Tipo | ¿Es obligatorio? | Descripción |
---|---|---|---|
start_timestamp |
timestamp with time zone |
Obligatorio | Especifica que se deben devolver los registros de cambios con commit_timestamp
igual o superior a start_timestamp . El valor debe estar dentro del periodo de conservación del flujo de cambios, ser inferior o igual a la hora actual y ser igual o superior a la marca de tiempo de la creación del flujo de cambios. |
end_timestamp |
timestamp with timezone |
Opcional (valor predeterminado: NULL ) |
Especifica que se deben devolver los registros de cambios con commit_timestamp
menor o igual que end_timestamp . El valor debe estar dentro del periodo de conservación del flujo de cambios y ser igual o superior a start_timestamp .
La consulta finaliza después de devolver todos los registros de cambios hasta end_timestamp o hasta que termines la conexión.
Si NULL , la consulta continúa ejecutándose hasta que se devuelvan todos los registros de cambios o hasta que finalices la conexión. |
partition_token |
text |
Opcional (valor predeterminado: NULL ) |
Especifica qué partición del flujo de cambios se va a consultar en función del contenido de los registros de particiones secundarias. Si se incluye NULL o no se especifica, significa que el lector está consultando el flujo de cambios por primera vez y no ha obtenido ningún token de partición específico para consultar. |
heartbeat_milliseconds |
bigint |
Obligatorio | Determina la frecuencia con la que se devuelve un latido ChangeRecord cuando no se han confirmado transacciones en esta partición.
El valor debe estar entre 1,000 (un segundo) y 300,000 (cinco minutos). |
null |
null |
Obligatorio | Reservado para uso futuro |
Te recomendamos que crees un método auxiliar para compilar el texto de la función de lectura y enlazar parámetros a él, como se muestra en el siguiente ejemplo.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Cambiar el formato de registro de los flujos de cambios
GoogleSQL
La función de lectura de flujos de cambios devuelve una sola columna ChangeRecord
de tipo ARRAY<STRUCT<...>>
. En cada fila, esta matriz siempre contiene un solo elemento.
Los elementos de la matriz tienen el siguiente tipo:
STRUCT < data_change_record ARRAY<STRUCT<...>>, heartbeat_record ARRAY<STRUCT<...>>, child_partitions_record ARRAY<STRUCT<...>> >
Este STRUCT
tiene tres campos: data_change_record
, heartbeat_record
y child_partitions_record
, cada uno de tipo ARRAY<STRUCT<...>>
. En cualquier fila que devuelva la función de lectura de la secuencia de cambios, solo uno de estos tres campos contiene un valor; los otros dos están vacíos o tienen el valor NULL
. Estos campos de matriz contienen, como máximo, un elemento.
En las siguientes secciones se examina cada uno de estos tres tipos de registros.
PostgreSQL
La función de lectura de flujos de cambios devuelve una sola columna ChangeRecord
de tipo JSON
con la siguiente estructura:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
Este objeto puede tener tres claves: data_change_record
, heartbeat_record
y child_partitions_record
. El tipo de valor correspondiente es JSON
. En cualquier fila que devuelva la función de lectura del flujo de cambios, solo existe una de estas tres claves.
En las siguientes secciones se examina cada uno de estos tres tipos de registros.
Registros de cambios de datos
Un registro de cambios de datos contiene un conjunto de cambios en una tabla con el mismo tipo de modificación (inserción, actualización o eliminación) confirmados en la misma marca de tiempo de confirmación en una partición de flujo de cambios para la misma transacción. Se pueden devolver varios registros de cambios de datos para la misma transacción en varias particiones de flujo de cambios.
Todos los registros de cambios de datos tienen los campos commit_timestamp
, server_transaction_id
y record_sequence
, que determinan el orden en el flujo de cambios de un registro de flujo. Estos tres campos son suficientes para derivar el orden de los cambios y proporcionar coherencia externa.
Ten en cuenta que varias transacciones pueden tener la misma marca de tiempo de confirmación si afectan a datos que no se solapan. El campo server_transaction_id
permite distinguir qué conjunto de cambios (posiblemente en particiones de flujo de cambios) se emitieron en la misma transacción. Si lo combinas con los campos record_sequence
y number_of_records_in_transaction
, también podrás almacenar en búfer y ordenar todos los registros de una transacción concreta.
Los campos de un registro de cambios de datos incluyen lo siguiente:
GoogleSQL
Campo | Tipo | Descripción |
---|---|---|
commit_timestamp |
TIMESTAMP |
Indica la marca de tiempo en la que se ha confirmado el cambio. |
record_sequence |
STRING |
Indica el número de secuencia del registro en la transacción.
Los números de secuencia son únicos y aumentan de forma monótona (pero no necesariamente de forma contigua) en una transacción. Ordena los registros del mismo
server_transaction_id por record_sequence para
reconstruir el orden de los cambios en la transacción.
Spanner puede optimizar este orden para mejorar el rendimiento y es posible que no siempre coincida con el orden original que proporciones. |
server_transaction_id |
STRING |
Proporciona una cadena única global que representa la transacción en la que se ha confirmado el cambio. El valor solo se debe usar en el contexto del procesamiento de registros de flujo de cambios y no se correlaciona con el ID de transacción de la API de Spanner. |
is_last_record_in_transaction_in_partition |
BOOL |
Indica si es el último registro de una transacción en la partición actual. |
table_name |
STRING |
Nombre de la tabla afectada por el cambio. |
value_capture_type |
STRING |
Describe el tipo de captura de valor que se especificó en la configuración del flujo de cambios cuando se capturó este cambio. El tipo de captura de valor puede ser uno de los siguientes:
El valor predeterminado es |
column_types |
[ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key": BOOLEAN "ordinal_position": NUMBER }, ... ] |
Indica el nombre de la columna, el tipo de columna, si es una clave principal y la posición de la columna tal como se define en el esquema (ordinal_position ). La primera columna de una tabla del esquema tendría una posición ordinal de 1 . El tipo de columna se puede anidar en el caso de las columnas de matriz. El formato coincide con la estructura de tipo descrita en la referencia de la API Spanner.
|
mods |
[ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...] ] |
Describe los cambios que se han realizado, incluidos los valores de clave principal, los valores antiguos y los valores nuevos de las columnas modificadas o monitorizadas.
La disponibilidad y el contenido de los valores antiguos y nuevos dependen del value_capture_type configurado. Los campos new_values y old_values solo contienen las columnas que no son clave. |
mod_type |
STRING |
Describe el tipo de cambio. Uno de los valores INSERT , UPDATE o DELETE . |
number_of_records_in_transaction |
INT64 |
Indica el número de registros de cambios de datos que forman parte de esta transacción en todas las particiones del flujo de cambios. |
number_of_partitions_in_transaction |
INT64 |
Indica el número de particiones que devuelven registros de cambios de datos de esta transacción. |
transaction_tag |
STRING |
Indica la etiqueta de transacción asociada a esta transacción. |
is_system_transaction |
BOOL |
Indica si la transacción es una transacción del sistema. |
PostgreSQL
Campo | Tipo | Descripción |
---|---|---|
commit_timestamp |
STRING |
Indica la marca de tiempo en la que se ha confirmado el cambio. |
record_sequence |
STRING |
Indica el número de secuencia del registro en la transacción.
Los números de secuencia son únicos y aumentan de forma monótona (pero no necesariamente de forma contigua) en una transacción. Ordena los registros del mismo
server_transaction_id por record_sequence para
reconstruir el orden de los cambios en la transacción. |
server_transaction_id |
STRING |
Proporciona una cadena única global que representa la transacción en la que se ha confirmado el cambio. El valor solo debe usarse en el contexto del procesamiento de registros de flujo de cambios y no se corresponde con el ID de transacción de la API de Spanner. |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Indica si es el último registro de una transacción en la partición actual. |
table_name |
STRING |
Indica el nombre de la tabla afectada por el cambio. |
value_capture_type |
STRING |
Describe el tipo de captura de valor que se especificó en la configuración del flujo de cambios cuando se capturó este cambio. El tipo de captura de valor puede ser uno de los siguientes:
El valor predeterminado es |
column_types |
[ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key": BOOLEAN "ordinal_position": NUMBER }, ... ] |
Indica el nombre de la columna, el tipo de columna, si es una clave principal y la posición de la columna tal como se define en el esquema (ordinal_position ). La primera columna de una tabla del esquema tendría una posición ordinal de 1 . El tipo de columna se puede anidar en el caso de las columnas de matriz. El formato coincide con la estructura de tipo descrita en la referencia de la API Spanner.
|
mods |
[ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...] ] |
Describe los cambios que se han realizado, incluidos los valores de clave principal, los valores antiguos y los valores nuevos de las columnas modificadas o de las que se ha hecho un seguimiento. La disponibilidad y el contenido de los valores antiguos y nuevos dependen de la value_capture_type configurada. Los campos new_values y old_values solo contienen las columnas que no son clave.
|
mod_type |
STRING |
Describe el tipo de cambio. Uno de los valores INSERT , UPDATE o DELETE . |
number_of_records_in_transaction |
INT64 |
Indica el número de registros de cambios de datos que forman parte de esta transacción en todas las particiones del flujo de cambios. |
number_of_partitions_in_transaction |
NUMBER |
Indica el número de particiones que devuelven registros de cambios de datos de esta transacción. |
transaction_tag |
STRING |
Indica la etiqueta de transacción asociada a esta transacción. |
is_system_transaction |
BOOLEAN |
Indica si la transacción es una transacción del sistema. |
Ejemplo de registro de cambios de datos
A continuación, se muestra un par de registros de cambios de datos de ejemplo. Describen una sola transacción en la que se produce una transferencia entre dos cuentas. Las dos cuentas están en particiones de flujo de cambios independientes.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
El siguiente registro de cambios de datos es un ejemplo de un registro con el valor
capture type NEW_VALUES
. Ten en cuenta que solo se rellenan los valores nuevos.
Solo se ha modificado la columna LastUpdate
, por lo que solo se ha devuelto esa columna.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
El siguiente registro de cambios de datos es un ejemplo de un registro con el valor
capture type NEW_ROW
. Solo se ha modificado la columna LastUpdate
, pero se devuelven todas las columnas registradas.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
El siguiente registro de cambios de datos es un ejemplo de un registro con el valor
capture type NEW_ROW_AND_OLD_VALUES
. Solo se ha modificado la columna LastUpdate
, pero se devuelven todas las columnas monitorizadas. Este tipo de captura de valor captura el valor nuevo y el valor antiguo de LastUpdate
.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Registros de latidos
Cuando se devuelve un registro de latido, indica que se han devuelto todos los cambios con commit_timestamp
menor o igual que el timestamp
del registro de latido, y los registros de datos futuros de esta partición deben tener marcas de tiempo de confirmación superiores a las devueltas por el registro de latido. Los registros de latido se devuelven cuando no se han escrito cambios en los datos de una partición. Cuando se escriben cambios en los datos de la partición, se puede usar data_change_record.commit_timestamp
en lugar de heartbeat_record.timestamp
para indicar que el lector está avanzando en la lectura de la partición.
Puede usar los registros de latido devueltos en las particiones para sincronizar los lectores en todas las particiones. Una vez que todos los lectores hayan recibido un latido superior o igual a una marca de tiempo A
o hayan recibido datos o registros de partición secundaria superiores o iguales a la marca de tiempo A
, los lectores sabrán que han recibido todos los registros confirmados en esa marca de tiempo o antes y podrán empezar a procesar los registros almacenados en el búfer. Por ejemplo, pueden ordenar los registros de partición cruzada por marca de tiempo y agruparlos por server_transaction_id
.A
Un registro de latido contiene solo un campo:
GoogleSQL
Campo | Tipo | Descripción |
---|---|---|
timestamp |
TIMESTAMP |
Indica la marca de tiempo del registro de latido. |
PostgreSQL
Campo | Tipo | Descripción |
---|---|---|
timestamp |
STRING |
Indica la marca de tiempo del registro de latido. |
Ejemplo de registro de señal de latido
Ejemplo de registro de latido que comunica que se han devuelto todos los registros con marcas de tiempo inferiores o iguales a la marca de tiempo de este registro:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Registros de particiones secundarias
Los registros de particiones secundarias devuelven información sobre las particiones secundarias: sus tokens de partición, los tokens de sus particiones principales y el start_timestamp
que representa la marca de tiempo más antigua para la que las particiones secundarias contienen registros de cambios. Los registros cuyas marcas de tiempo de confirmación sean inmediatamente anteriores a child_partitions_record.start_timestamp
se devuelven en la partición actual. Después de devolver todos los registros de partición secundarios de esta partición, esta consulta devuelve un estado correcto, lo que indica que se han devuelto todos los registros de esta partición.
Los campos de un registro de partición secundaria incluyen lo siguiente:
GoogleSQL
Campo | Tipo | Descripción |
---|---|---|
start_timestamp |
TIMESTAMP |
Indica que los registros de cambios de datos devueltos de las particiones secundarias de este registro de partición secundaria tienen una marca de tiempo de confirmación mayor o igual que start_timestamp . Cuando se consulta una partición secundaria, la consulta debe especificar el token de la partición secundaria y un start_timestamp igual o mayor que child_partitions_token.start_timestamp . Todos los registros de las particiones secundarias devueltos por una partición tienen el mismo start_timestamp y la marca de tiempo siempre está entre los valores start_timestamp y end_timestamp especificados en la consulta. |
record_sequence |
STRING |
Indica un número de secuencia que aumenta de forma monótona y que se puede usar para definir el orden de los registros de partición secundarios cuando se devuelven varios registros de partición secundarios con el mismo start_timestamp en una partición concreta. El token de partición, start_timestamp
y record_sequence , identifica de forma única un registro de partición secundario.
|
child_partitions |
[ { "token" : "STRING", "parent_partition_tokens" : ["STRING"] } ] |
Devuelve un conjunto de particiones secundarias y la información asociada. Esto incluye la cadena de token de partición que se usa para identificar la partición secundaria en las consultas, así como los tokens de sus particiones principales. |
PostgreSQL
Campo | Tipo | Descripción |
---|---|---|
start_timestamp |
STRING |
Indica que los registros de cambios de datos devueltos de las particiones secundarias de este registro de partición secundaria tienen una marca de tiempo de confirmación mayor o igual que start_timestamp . Cuando se consulta una partición secundaria, la consulta debe especificar el token de la partición secundaria y un start_timestamp igual o mayor que child_partitions_token.start_timestamp . Todos los registros de las particiones secundarias devueltos por una partición tienen el mismo start_timestamp y la marca de tiempo siempre está entre los valores start_timestamp y end_timestamp especificados en la consulta.
|
record_sequence |
STRING |
Indica un número de secuencia que aumenta de forma monótona y que se puede usar para definir el orden de los registros de partición secundarios cuando se devuelven varios registros de partición secundarios con el mismo start_timestamp en una partición concreta. El token de partición, start_timestamp
y record_sequence , identifica de forma única un registro de partición secundario.
|
child_partitions |
[ { "token": "STRING", "parent_partition_tokens": ["STRING"], }, [...] ] |
Devuelve una matriz de particiones secundarias y su información asociada. Esto incluye la cadena de token de partición que se usa para identificar la partición secundaria en las consultas, así como los tokens de sus particiones principales. |
Ejemplo de registro de partición secundaria
A continuación, se muestra un ejemplo de un registro de partición secundaria:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Flujo de trabajo de consulta de flujo de cambios
Ejecuta consultas de flujo de cambios con la API ExecuteStreamingSql
, con una transacción de solo lectura de un solo uso y un límite de marca de tiempo fuerte. La función de lectura de cambios te permite especificar start_timestamp
y end_timestamp
para el intervalo de tiempo que te interese. Se puede acceder a todos los registros de cambios dentro del periodo de conservación mediante la marca de tiempo de solo lectura segura.
Todos los demás
TransactionOptions
no son válidos para las consultas de flujo de cambios. Además, si TransactionOptions.read_only.return_read_timestamp
se define como true
, se devuelve un valor especial de kint64max - 1
en el mensaje Transaction
que describe la transacción, en lugar de una marca de tiempo de lectura válida. Este valor especial debe descartarse y no utilizarse en ninguna consulta posterior.
Cada consulta de flujo de cambios puede devolver cualquier número de filas, cada una de las cuales contiene un registro de cambio de datos, un registro de latido o un registro de particiones secundarias. No es necesario fijar una fecha límite para la solicitud.
Ejemplo de flujo de trabajo de consulta de flujo de cambios
El flujo de trabajo de las consultas de streaming empieza con la emisión de la primera consulta de flujo de cambios especificando el partition_token
en NULL
. La consulta debe especificar la función de lectura del flujo de cambios, las marcas de tiempo de inicio y finalización de interés, y el intervalo de latido. Cuando end_timestamp
es NULL
, la consulta sigue
devolviendo cambios en los datos hasta que finaliza la partición.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Procesa los registros de datos de esta consulta hasta que se devuelvan todos los registros de partición secundaria. En el siguiente ejemplo, se devuelven dos registros de partición secundarios y tres tokens de partición. A continuación, la consulta finaliza. Los registros de partición secundarios de una consulta específica siempre comparten el mismo start_timestamp
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": "1000012389",
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": "1000012390",
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
Para procesar los cambios después de 2022-05-01T09:00:01Z
, crea tres consultas nuevas y ejecútalas en paralelo. Si se usan juntas, las tres consultas devuelven los cambios de datos del mismo intervalo de claves que cubre su elemento superior. Siempre debe definir start_timestamp
como start_timestamp
en el mismo registro de partición secundaria y usar el mismo end_timestamp
e intervalo de latido para procesar los registros de forma coherente en todas las consultas.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
La consulta de child_token_2
finaliza después de devolver otro registro de partición secundaria. Este registro indica que una nueva partición cubre los cambios de child_token_2
y child_token_3
a partir de 2022-05-01T09:30:15Z
. La consulta de child_token_3
devuelve el mismo registro exacto, ya que ambas son particiones principales de la nueva child_token_4
. Para asegurar un procesamiento estricto y ordenado de los registros de datos de una clave concreta, la consulta en child_token_4
debe iniciarse después de que hayan terminado todos los elementos principales. En este caso, los elementos superiores son child_token_2
y child_token_3
. Solo debes crear una consulta por token de partición secundario. El diseño del flujo de trabajo de la consulta debe asignar un elemento superior para que espere y programe la consulta en child_token_4
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": "1000012389",
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": ["child_token_2", "child_token_3"],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Consulta ejemplos de cómo gestionar y analizar registros de flujo de cambios en el conector SpannerIO Dataflow de Apache Beam en GitHub.