Crear y gestionar flujos de cambios

En esta página se describe cómo crear, modificar y ver flujos de cambios de Spanner para bases de datos con dialecto GoogleSQL y PostgreSQL. Para obtener más información sobre los flujos de cambios, consulta el artículo Acerca de los flujos de cambios.

Como los flujos de cambios son objetos de esquema, se crean y gestionan mediante las mismas actualizaciones de esquema basadas en DDL que se usan para cualquier otro tipo de trabajo de definición de bases de datos, como crear tablas o añadir índices.

Spanner inicia una operación de larga duración después de que envíes una instrucción DDL que cambie el esquema, incluidas las que se usan para crear, modificar o eliminar flujos de cambios. Un flujo de cambios nuevo o modificado empieza a monitorizar las columnas o tablas especificadas en su nueva configuración cuando se completa esta operación de larga duración.

Crear un flujo de cambios

Para crear un flujo de cambios, debes proporcionar su nombre y los objetos de esquema que monitoriza: toda la base de datos o una lista de tablas y columnas específicas. También puede configurar un flujo de cambios con cualquiera de los siguientes elementos:

GoogleSQL

La sintaxis DDL para crear un flujo de cambios con GoogleSQL es la siguiente:

CREATE CHANGE STREAM CHANGE_STREAM_NAME
  [FOR column_or_table_watching_definition[, ... ] ]
  [
    OPTIONS (
      retention_period = timespan,
      value_capture_type = type,
      exclude_ttl_deletes = boolean,
      exclude_insert = boolean,
      exclude_update = boolean,
      exclude_delete = boolean,
      allow_txn_exclusion = boolean
    )
  ]

PostgreSQL

La sintaxis DDL para crear un flujo de cambios con PostgreSQL es la siguiente:

CREATE CHANGE STREAM CHANGE_STREAM_NAME
  [FOR column_or_table_watching_definition[, ... ] ]
  [
    WITH (
      retention_period = timespan,
      value_capture_type = type,
      exclude_ttl_deletes = boolean,
      exclude_insert = boolean,
      exclude_update = boolean,
      exclude_delete = boolean,
      allow_txn_exclusion = boolean
    )
  ]

Un nuevo flujo de cambios empieza a monitorizar los objetos de esquema que tiene asignados en cuanto se completa la operación de larga duración que lo ha creado.

En los siguientes ejemplos se ilustra la creación de flujos de cambios con varias configuraciones.

Monitorizar una base de datos completa

Para crear un flujo de cambios que monitorice todos los cambios de datos que se realicen en las tablas de una base de datos, usa la palabra clave ALL:

CREATE CHANGE STREAM EverythingStream
FOR ALL;

La configuración de ALL incluye de forma implícita todas las futuras tablas y columnas de datos de la base de datos en cuanto se creen. No incluye vistas, tablas de esquema de información ni otros objetos que no sean tablas de datos normales.

Monitorizar tablas específicas

Para limitar el ámbito de un flujo de cambios a tablas específicas en lugar de a una base de datos completa, especifica una lista de una o varias tablas:

CREATE CHANGE STREAM SingerAlbumStream
FOR Singers, Albums;

Spanner actualiza automáticamente los flujos de cambios que monitorizan tablas completas para reflejar cualquier cambio de esquema que afecte a esas tablas, como las columnas añadidas o eliminadas.

Monitorizar columnas específicas

Usa la sintaxis table(column_1[, column_2, ...]) para monitorizar los cambios de una o varias columnas específicas que no sean claves en las tablas que indiques:

CREATE CHANGE STREAM NamesAndTitles
FOR Singers(FirstName, LastName), Albums(Title);

No puedes especificar columnas de clave principal aquí porque cada flujo de cambios siempre monitoriza las claves principales de cada tabla que observa. De esta forma, cada registro de cambio de datos puede identificar la fila modificada por su clave principal.

Ver tablas y columnas en un solo flujo

Puedes combinar la sintaxis de la monitorización de tablas y columnas de los dos ejemplos anteriores en un solo flujo de cambios:

CREATE CHANGE STREAM NamesAndAlbums
FOR Singers(FirstName, LastName), Albums;

Especificar un periodo de conservación más largo

Para especificar un periodo de conservación de datos de flujo de cambios que sea superior al valor predeterminado de un día, asigna a retention_period un periodo de hasta treinta días, expresado en horas (h) o días (d).

Dos ejemplos:

GoogleSQL

CREATE CHANGE STREAM LongerDataRetention
FOR ALL
OPTIONS ( retention_period = '36h' );
CREATE CHANGE STREAM MaximumDataRetention
FOR ALL
OPTIONS ( retention_period = '7d' );

PostgreSQL

CREATE CHANGE STREAM LongerDataRetention
FOR ALL
WITH ( retention_period = '36h' );
CREATE CHANGE STREAM MaximumDataRetention
FOR ALL
WITH ( retention_period = '7d' );

Especificar otro tipo de captura de valor

Para especificar un tipo de captura de valor de flujo de cambios que no sea OLD_AND_NEW_VALUES, asigne a value_capture_type el valor NEW_VALUES o NEW_ROW, como se muestra en los siguientes ejemplos:

GoogleSQL

CREATE CHANGE STREAM NewRowChangeStream
FOR ALL
OPTIONS ( value_capture_type = 'NEW_ROW' );
CREATE CHANGE STREAM NewValuesChangeStream
FOR ALL
OPTIONS ( value_capture_type = 'NEW_VALUES' );

PostgreSQL

CREATE CHANGE STREAM NewRowChangeStream
FOR ALL
WITH ( value_capture_type = 'NEW_ROW' );
CREATE CHANGE STREAM NewValuesChangeStream
FOR ALL
WITH ( value_capture_type = 'NEW_VALUES' );

Filtrar eliminaciones basadas en TTL

Puede filtrar las eliminaciones basadas en TTL del ámbito de su flujo de cambios mediante el filtro exclude_ttl_deletes.

Para obtener más información sobre cómo funciona este filtro, consulta Filtro de eliminación basado en el tiempo de vida.

GoogleSQL

Para crear un flujo de cambios con el filtro de eliminaciones basadas en TTL, ejecuta el siguiente ejemplo:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
OPTIONS (exclude_ttl_deletes = true)

Haz los cambios siguientes:

  • CHANGE_STREAM_NAME: el nombre de tu nuevo flujo de cambios

En el siguiente ejemplo se crea un flujo de cambios llamado NewFilterChangeStream que excluye todas las eliminaciones basadas en TTL:

CREATE CHANGE STREAM NewFilterChangeStream FOR ALL
OPTIONS (exclude_ttl_deletes = true)

PostgreSQL

Para crear un flujo de cambios con el filtro de eliminaciones basadas en TTL, ejecuta el siguiente ejemplo:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
WITH (exclude_ttl_deletes = true)

Haz los cambios siguientes:

  • CHANGE_STREAM_NAME: el nombre del nuevo flujo de cambios

En el siguiente ejemplo se crea un flujo de cambios llamado NewFilterChangeStream que excluye todas las eliminaciones basadas en TTL:

CREATE CHANGE STREAM NewFilterChangeStream FOR ALL
WITH (exclude_ttl_deletes = true)

Para añadir o quitar el filtro de eliminaciones basadas en TTL de un flujo de cambios, consulta Modificar el filtro de eliminaciones basadas en TTL. Puedes confirmar los filtros de tu flujo de cambios viendo las definiciones del flujo de cambios como DDL.

Filtrar por tipo de modificación de tabla

Filtra una o varias de estas modificaciones de la tabla del ámbito de tu flujo de cambios con las siguientes opciones de filtro disponibles:

  • exclude_insert: excluye todas las modificaciones de la tabla INSERT
  • exclude_update: excluye todas las modificaciones de la tabla UPDATE
  • exclude_delete: excluye todas las modificaciones de la tabla DELETE

Para obtener más información sobre cómo funcionan estos filtros, consulte Filtros de tipo de modificación de tabla.

GoogleSQL

Para crear un flujo de cambios con uno o varios filtros de tipo de modificación de tabla, ejecuta lo siguiente:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
OPTIONS (MOD_TYPE_FILTER_NAME = true)

Haz los cambios siguientes:

  • CHANGE_STREAM_NAME: el nombre de tu nuevo flujo de cambios
  • MOD_TYPE_FILTER_NAME: el filtro que quieras añadir: exclude_insert, exclude_update o exclude_delete. Si añade más de un filtro a la vez, sepárelos con comas.

En el siguiente ejemplo se crea un flujo de cambios llamado NewFilterChangeStream que excluye los tipos de modificación de tabla INSERT y UPDATE:

CREATE CHANGE STREAM NewFilterChangeStream FOR ALL
OPTIONS (exclude_insert = true, exclude_update = true)

PostgreSQL

Para crear un flujo de cambios con uno o varios filtros de tipo de modificación de tabla, ejecuta lo siguiente:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
WITH (MOD_TYPE_FILTER_NAME = true)

Haz los cambios siguientes:

  • CHANGE_STREAM_NAME: el nombre de tu nuevo flujo de cambios
  • MOD_TYPE_FILTER_NAME: el filtro que quieras añadir: exclude_insert, exclude_update o exclude_delete. Si añade más de un filtro a la vez, sepárelos con comas.

En el siguiente ejemplo se crea un flujo de cambios llamado NewFilterChangeStream que excluye los tipos de modificación de tabla INSERT y UPDATE:

CREATE CHANGE STREAM NewFilterChangeStream FOR ALL
WITH (exclude_insert = true, exclude_update = true)

Para añadir o quitar un filtro de tipo de modificación de tabla de un flujo de cambios, consulta Modificar el filtro por tipo de modificación de tabla. Para confirmar qué filtros de tipo de modificación de tabla hay en tu flujo de cambios, puedes ver las definiciones del flujo de cambios como DDL.

Habilitar la exclusión de registros a nivel de transacción

Puede habilitar sus flujos de cambios para excluir registros de transacciones de escritura especificadas. Para ello, defina la opción allow_txn_exclusion al crear un flujo de cambios o modifique un flujo de cambios que ya tenga.

Para obtener más información sobre cómo funciona esta opción, consulta Exclusión de registros a nivel de transacción.

GoogleSQL

Para crear un flujo de cambios que pueda excluir registros de transacciones de escritura especificadas, ejecute lo siguiente:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
OPTIONS (allow_txn_exclusion = true)

Haz los cambios siguientes:

  • CHANGE_STREAM_NAME: el nombre del nuevo flujo de cambios

En el siguiente ejemplo se crea un flujo de cambios llamado NewChangeStream que puede excluir registros de transacciones de escritura especificadas:

CREATE CHANGE STREAM NewChangeStream FOR ALL
OPTIONS (allow_txn_exclusion = true)

PostgreSQL

Para crear un flujo de cambios que pueda excluir registros de transacciones de escritura especificadas, ejecute lo siguiente:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
WITH (allow_txn_exclusion = true)

Haz los cambios siguientes:

  • CHANGE_STREAM_NAME: el nombre del nuevo flujo de cambios

En el siguiente ejemplo se crea un flujo de cambios llamado NewChangeStream que puede excluir registros de transacciones de escritura especificadas:

CREATE CHANGE STREAM NewChangeStream FOR ALL
WITH (allow_txn_exclusion = true)

Para habilitar o inhabilitar la exclusión de registros a nivel de transacción de un flujo de cambios, consulta Modificar la exclusión de registros a nivel de transacción. Para comprobar el ajuste de esta opción, consulta Ver definiciones de secuencias de cambios como DDL.

Especificar una transacción de escritura que se va a excluir de los flujos de cambios

Para especificar que una transacción de escritura se excluya de los flujos de cambios, debes definir el parámetro exclude_txn_from_change_streams como true. En los siguientes ejemplos de código se muestra cómo especificar que una transacción de escritura se excluya de los flujos de cambios mediante la biblioteca de cliente.

Go


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
)

// readWriteTxnExcludedFromChangeStreams executes the insert and update DMLs on
// Singers table excluded from tracking change streams with ddl option
// allow_txn_exclusion = true.
func readWriteTxnExcludedFromChangeStreams(w io.Writer, db string) error {
	// db = `projects/<project>/instances/<instance-id>/database/<database-id>`
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return fmt.Errorf("readWriteTxnExcludedFromChangeStreams.NewClient: %w", err)
	}
	defer client.Close()

	_, err = client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
		stmt := spanner.Statement{
			SQL: `INSERT Singers (SingerId, FirstName, LastName)
					VALUES (111, 'Virginia', 'Watson')`,
		}
		_, err := txn.Update(ctx, stmt)
		if err != nil {
			return fmt.Errorf("readWriteTxnExcludedFromChangeStreams.Update: %w", err)
		}
		fmt.Fprintln(w, "New singer inserted.")
		stmt = spanner.Statement{
			SQL: `UPDATE Singers SET FirstName = 'Hi' WHERE SingerId = 111`,
		}
		_, err = txn.Update(ctx, stmt)
		if err != nil {
			return fmt.Errorf("readWriteTxnExcludedFromChangeStreams.Update: %w", err)
		}
		fmt.Fprint(w, "Singer first name updated.")
		return nil
	}, spanner.TransactionOptions{ExcludeTxnFromChangeStreams: true})
	if err != nil {
		return err
	}
	return nil
}

Java

static void readWriteTxnExcludedFromChangeStreams(DatabaseClient client) {
  // Exclude the transaction from allowed tracking change streams with alloww_txn_exclusion=true.
  // This exclusion will be applied to all the individual operations inside this transaction.
  client
      .readWriteTransaction(Options.excludeTxnFromChangeStreams())
      .run(
          transaction -> {
            transaction.executeUpdate(
                Statement.of(
                    "INSERT Singers (SingerId, FirstName, LastName)\n"
                        + "VALUES (1341, 'Virginia', 'Watson')"));
            System.out.println("New singer inserted.");

            transaction.executeUpdate(
                Statement.of("UPDATE Singers SET FirstName = 'Hi' WHERE SingerId = 111"));
            System.out.println("Singer first name updated.");

            return null;
          });
}

Modificar un flujo de cambios

Para modificar la configuración de un flujo de cambios, usa una instrucción ALTER CHANGE STREAMDDL. Usa una sintaxis similar a CREATE CHANGE STREAM. Puede cambiar las columnas que monitoriza un flujo o la duración de su periodo de conservación de datos. También puedes suspender por completo su monitorización y conservar los registros de cambios de datos.

Modificar lo que monitoriza un flujo de cambios

En este ejemplo, se añade toda la tabla Songs al flujo de cambios NamesAndAlbums configurado anteriormente:

ALTER CHANGE STREAM NamesAndAlbums
SET FOR Singers(FirstName, LastName), Albums, Songs;

Spanner sustituye el comportamiento del flujo de cambios con nombre por la nueva configuración una vez completada la operación de larga duración que actualiza la definición del flujo de cambios en el esquema de la base de datos.

Modificar el periodo de conservación de datos de un flujo de cambios

Para modificar el tiempo durante el que un flujo de cambios conserva sus registros internos, define retention_period en una instrucción DDL ALTER CHANGE STREAM.

En este ejemplo, se ajusta el periodo de retención de datos al NamesAndAlbums flujo de cambios creado anteriormente:

GoogleSQL

ALTER CHANGE STREAM NamesAndAlbums
SET OPTIONS ( retention_period = '36h' );

PostgreSQL

ALTER CHANGE STREAM NamesAndAlbums
SET ( retention_period = '36h' );

Modificar el tipo de captura de valores de un flujo de cambios

Para modificar el tipo de captura de valores de un flujo de cambios, define la cláusula value_capture_type en una instrucción DDL ALTER CHANGE STREAM.

En este ejemplo, se ajusta el tipo de captura de valor a NEW_VALUES.

GoogleSQL

ALTER CHANGE STREAM NamesAndAlbums
SET OPTIONS ( value_capture_type = 'NEW_VALUES' );

PostgreSQL

ALTER CHANGE STREAM NamesAndAlbums
SET ( value_capture_type = 'NEW_VALUES' );

Modificar el filtro de eliminaciones basadas en TTL

Para modificar el filtro eliminaciones basadas en TTL de un flujo de cambios, defina el filtro exclude_ttl_deletes en una instrucción ALTER CHANGE STREAM DDL. Puedes usarlo para añadir o quitar el filtro de tus secuencias de cambios.

Para obtener más información sobre cómo funcionan estos filtros, consulta Filtro de eliminación basada en el tiempo de vida.

Añadir el filtro de eliminaciones basadas en TTL a un flujo de cambios

GoogleSQL

Para añadir el filtro de eliminaciones basadas en TTL a un flujo de cambios, ejecuta lo siguiente para definir el filtro en true:

ALTER CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
SET OPTIONS (exclude_ttl_deletes = true)

Haz los cambios siguientes:

  • CHANGE_STREAM_NAME: el nombre de tu flujo de cambios

En el siguiente ejemplo, se añade el filtro exclude_ttl_deletes a un flujo de cambios llamado NewFilterChangeStream que excluye todas las eliminaciones basadas en TTL:

ALTER CHANGE STREAM NewFilterChangeStream FOR ALL
SET OPTIONS (exclude_ttl_deletes = true)

De esta forma, se excluyen del flujo de cambios todas las eliminaciones basadas en TTL futuras.

PostgreSQL

Para añadir el filtro de eliminaciones basadas en TTL a un flujo de cambios, ejecuta lo siguiente para definir el filtro en true:

ALTER CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
SET (exclude_ttl_deletes = true)

Haz los cambios siguientes:

  • CHANGE_STREAM_NAME: el nombre de tu flujo de cambios

En el siguiente ejemplo, se añade el filtro exclude_ttl_deletes a un flujo de cambios llamado NewFilterChangeStream que excluye todas las eliminaciones basadas en TTL:

ALTER CHANGE STREAM NewFilterChangeStream FOR ALL
SET (exclude_ttl_deletes = true)

De esta forma, se excluyen del flujo de cambios todas las eliminaciones basadas en TTL futuras.

Quitar el filtro de eliminaciones basadas en TTL de un flujo de cambios

GoogleSQL

Para quitar el filtro de eliminaciones basadas en TTL de un flujo de cambios, ejecuta lo siguiente para definir el filtro en false:

ALTER CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
SET OPTIONS (exclude_ttl_deletes = false)

Haz los cambios siguientes:

  • CHANGE_STREAM_NAME: el nombre de tu nuevo flujo de cambios

En el ejemplo siguiente, se elimina el filtro exclude_ttl_deletes de un flujo de cambios llamado NewFilterChangeStream:

ALTER CHANGE STREAM NewFilterChangeStream FOR ALL
SET OPTIONS (exclude_ttl_deletes = false)

Esto incluye todas las eliminaciones basadas en TTL futuras de la secuencia de cambios.

También puede definir el filtro en null para quitar el filtro de eliminaciones basadas en TTL.

PostgreSQL

Para quitar el filtro de eliminaciones basadas en TTL de un flujo de cambios, ejecuta lo siguiente para definir el filtro en false:

ALTER CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
SET (exclude_ttl_deletes = false)

Haz los cambios siguientes:

  • CHANGE_STREAM_NAME: el nombre de tu nuevo flujo de cambios

En el ejemplo siguiente, se elimina el filtro exclude_ttl_deletes de un flujo de cambios llamado NewFilterChangeStream:

ALTER CHANGE STREAM NewFilterChangeStream FOR ALL
SET (exclude_ttl_deletes = false)

Esto incluye todas las eliminaciones basadas en TTL futuras de la secuencia de cambios.

También puede definir el filtro en null para quitar el filtro de eliminaciones basadas en TTL.

Modificar el filtro por tipo de modificación de la tabla

Para modificar los filtros de tipo de modificación de tabla de un flujo de cambios, define el tipo de filtro en una ALTER CHANGE STREAM declaración de DDL. Puedes usarlo para añadir un filtro nuevo o quitar uno que ya tengas de tu flujo de cambios.

Añadir un filtro de tipo de modificación de tabla a un flujo de cambios

GoogleSQL

Para añadir uno o varios filtros de tipo de modificación de tabla nuevos a un flujo de cambios, ejecuta lo siguiente para definir el filtro en true:

ALTER CHANGE STREAM CHANGE_STREAM_NAME
SET OPTIONS (MOD_TYPE_FILTER_NAME = true)

Haz los cambios siguientes:

  • CHANGE_STREAM_NAME: sustitúyelo por el nombre de tu change stream
  • MOD_TYPE_FILTER_NAME: sustitúyelo por el filtro que quieras añadir: exclude_insert, exclude_update o exclude_delete. Si añades más de un filtro a la vez, separa cada filtro con una coma.

En el ejemplo siguiente, se añade el filtro exclude_delete a un flujo de cambios llamado NewFilterChangeStream:

ALTER CHANGE STREAM NewFilterChangeStream
SET OPTIONS (exclude_delete = true)

PostgreSQL

Para añadir uno o varios filtros de tipo de modificación de tabla nuevos a un flujo de cambios, ejecuta lo siguiente para definir el filtro en true:

ALTER CHANGE STREAM CHANGE_STREAM_NAME
SET (MOD_TYPE_FILTER_NAME = true)

Haz los cambios siguientes:

  • CHANGE_STREAM_NAME: sustitúyelo por el nombre de tu change stream
  • MOD_TYPE_FILTER_NAME: sustitúyelo por el filtro que quieras añadir: exclude_insert, exclude_update o exclude_delete. Si añades más de un filtro a la vez, separa cada filtro con una coma.

En el ejemplo siguiente, se añade el filtro exclude_delete a un flujo de cambios llamado NewFilterChangeStream:

ALTER CHANGE STREAM NewFilterChangeStream
SET (exclude_delete = true)

Quitar un filtro de tipo de modificación de tabla de un flujo de cambios

GoogleSQL

Para quitar uno o varios filtros de tipo de modificación de tabla en un flujo de cambios, ejecuta lo siguiente para definir el filtro en false:

ALTER CHANGE STREAM CHANGE_STREAM_NAME
SET OPTIONS (MOD_TYPE_FILTER_NAME = false)

Haz los cambios siguientes:

  • CHANGE_STREAM_NAME: sustitúyelo por el nombre de tu change stream
  • MOD_TYPE_FILTER_NAME: sustitúyelo por el filtro que quieras quitar: exclude_insert, exclude_update o exclude_delete. Si quitas más de un filtro a la vez, sepáralos con comas.

En el ejemplo siguiente, se elimina el filtro exclude_delete de un flujo de cambios llamado NewFilterChangeStream:

ALTER CHANGE STREAM NewFilterChangeStream
SET OPTIONS (exclude_delete = false)

También puedes quitar un filtro de modificación de tabla volviendo a definir el valor predeterminado del filtro. Para ello, asigna el valor null al filtro.

PostgreSQL

Para quitar uno o varios filtros de tipo de modificación de tabla en un flujo de cambios, ejecuta lo siguiente para definir el filtro en false:

ALTER CHANGE STREAM CHANGE_STREAM_NAME
SET (MOD_TYPE_FILTER_NAME = false)

Haz los cambios siguientes:

  • CHANGE_STREAM_NAME: sustitúyelo por el nombre de tu change stream
  • MOD_TYPE_FILTER_NAME: sustitúyelo por el filtro que quieras quitar: exclude_insert, exclude_update o exclude_delete. Si quitas más de un filtro a la vez, sepáralos con comas.

En el ejemplo siguiente, se elimina el filtro exclude_delete de un flujo de cambios llamado NewFilterChangeStream:

ALTER CHANGE STREAM NewFilterChangeStream
SET (exclude_delete = false)

También puedes quitar un filtro de modificación de tabla volviendo a definir el valor predeterminado del filtro. Para ello, asigna el valor null al filtro.

Modificar el flujo de cambios para permitir la exclusión de registros a nivel de transacción

Puede modificar su flujo de cambios para que excluya los registros de transacciones de escritura específicas. Para ello, define la opción allow_txn_exclusion en true en una instrucción ALTER CHANGE STREAM DDL. Si no configuras esta opción o la estableces en false, el flujo de cambios monitorizará todas las transacciones de escritura.

Para obtener más información sobre cómo funciona esta opción, consulta Exclusión de registros a nivel de transacción.

Habilitar la exclusión de registros a nivel de transacción en un flujo de cambios

GoogleSQL

Para habilitar la exclusión de registros a nivel de transacción en un flujo de cambios, ejecuta lo siguiente:

ALTER CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
SET OPTIONS (allow_txn_exclusion = true)

Haz los cambios siguientes:

  • CHANGE_STREAM_NAME: el nombre de tu flujo de cambios

En el ejemplo siguiente, la opción allow_txn_exclusion está habilitada en un flujo de cambios NewAllowedChangeStream:

ALTER CHANGE STREAM NewAllowedChangeStream FOR ALL
SET OPTIONS (allow_txn_exclusion = true)

De esta forma, el flujo de cambios puede excluir registros de transacciones de escritura específicas.

PostgreSQL

Para habilitar la exclusión de registros a nivel de transacción en un flujo de cambios, ejecuta lo siguiente:

ALTER CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
SET (allow_txn_exclusion = true)

Haz los cambios siguientes:

  • CHANGE_STREAM_NAME: el nombre de tu flujo de cambios

En el ejemplo siguiente, la opción allow_txn_exclusion está habilitada en un flujo de cambios NewAllowedChangeStream:

ALTER CHANGE STREAM NewAllowedChangeStream FOR ALL
SET (allow_txn_exclusion = true)

De esta forma, el flujo de cambios puede excluir registros de transacciones de escritura específicas.

Inhabilitar la exclusión de registros a nivel de transacción en un flujo de cambios

GoogleSQL

Para inhabilitar la exclusión de registros a nivel de transacción en un flujo de cambios, ejecuta lo siguiente:

ALTER CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
SET OPTIONS (allow_txn_exclusion = false)

Haz los cambios siguientes:

  • CHANGE_STREAM_NAME: el nombre de tu flujo de cambios

En el ejemplo siguiente, la opción allow_txn_exclusion está inhabilitada en un flujo de cambios llamado NewAllowedChangeStream:

ALTER CHANGE STREAM NewFilterChangeStream FOR ALL
SET OPTIONS (allow_txn_exclusion = false)

El flujo de cambios monitoriza todas las transacciones de escritura.

PostgreSQL

Para inhabilitar la exclusión de registros a nivel de transacción en un flujo de cambios, ejecuta lo siguiente:

ALTER CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
SET (allow_txn_exclusion = false)

Haz los cambios siguientes:

  • CHANGE_STREAM_NAME: el nombre de tu flujo de cambios

En el ejemplo siguiente, la opción allow_txn_exclusion está inhabilitada en un flujo de cambios llamado NewAllowedChangeStream:

ALTER CHANGE STREAM NewAllowedChangeStream FOR ALL
SET (allow_txn_exclusion = false)

El flujo de cambios monitoriza todas las transacciones de escritura.

Suspender un flujo de cambios

Si quieres que un flujo de cambios detenga su actividad, pero conserve sus registros internos (al menos durante el periodo de conservación de datos), puedes modificarlo para que no monitorice nada.

Para ello, emite una instrucción DDL de tipo ALTER CHANGE STREAM que sustituya la definición del flujo de cambios por la frase especial DROP FOR ALL. Por ejemplo:

ALTER CHANGE STREAM MyStream DROP FOR ALL;

El flujo sigue existiendo en la base de datos, pero no monitoriza ningún objeto y no genera más registros de cambios de datos. Sus registros de cambios se mantienen intactos, sujetos a la política de conservación de datos del flujo.

Para reanudar una emisión suspendida, envía otra instrucción ALTER CHANGE STREAM con su configuración anterior.

Eliminar un flujo de cambios

Para eliminar de forma permanente un flujo de cambios, emite una DROP CHANGE STREAM que incluya el nombre del flujo:

DROP CHANGE STREAM NamesAndAlbums;

Spanner detiene inmediatamente el flujo, lo elimina del esquema de la base de datos y borra sus registros de cambios de datos.

Listar y ver flujos de cambios

La Google Cloud consola proporciona una interfaz web para enumerar y revisar las definiciones de los flujos de cambios de una base de datos. También puede ver la estructura de los flujos de cambios como sus instrucciones DDL equivalentes o consultando el esquema de información de la base de datos.

Ver los flujos de cambios con la consola Google Cloud

Para ver una lista de los flujos de cambios de una base de datos y consultar sus definiciones, sigue estos pasos:

  1. Ve a la página Instancias de Spanner de laGoogle Cloud consola.

    Abre la página de instancias.

  2. Vaya a la instancia y a la base de datos correspondientes.

  3. En el menú de navegación, haga clic en Cambios.

Se muestra una lista de todos los flujos de cambios de esa base de datos y se resume la configuración de cada uno. Al hacer clic en el nombre de un flujo, se muestran más detalles sobre las tablas y las columnas que monitoriza.

Ver definiciones de flujos de cambios como DDL

Ver el esquema de una base de datos como DDL incluye descripciones de todos sus streams de cambios, donde aparecen como instrucciones CREATE CHANGE STREAM.

  • Para hacerlo desde la consola, haz clic en el enlace Mostrar DDL equivalente en la página de la base de datos de la Google Cloud consola.

  • Para hacerlo desde la línea de comandos, usa el comando ddl describe de la CLI de Google Cloud.

Consultar el esquema de información sobre los flujos de cambios

Puedes consultar directamente el esquema de información de una base de datos sobre sus flujos de cambios. Las siguientes tablas contienen los metadatos que definen los nombres de los flujos de cambios, las tablas y las columnas que monitorizan, y sus periodos de conservación:

Prácticas recomendadas para los flujos de cambios

A continuación, se indican algunas prácticas recomendadas para configurar y gestionar los flujos de cambios.

Considera la posibilidad de usar una base de datos de metadatos independiente

Los flujos de cambios usan una base de datos de metadatos para mantener el estado interno. La base de datos de metadatos puede ser la misma que la base de datos que contiene los flujos de cambios o puede ser diferente. Te recomendamos que crees una base de datos independiente para almacenar los metadatos.

El conector de flujos de cambios de Spanner necesita permisos de lectura y escritura en la base de datos de metadatos. No es necesario que prepares esta base de datos con un esquema, ya que el conector se encarga de ello.

Si se usa una base de datos de metadatos independiente, se eliminan las complejidades que podrían surgir si se permite que el conector escriba directamente en la base de datos de la aplicación:

  • Al separar la base de datos de metadatos de la base de datos de producción con el flujo de cambios, el conector solo necesita permisos de lectura en la base de datos de producción.

  • Al restringir el tráfico del conector a una base de datos de metadatos independiente, las escrituras realizadas por el propio conector no se incluyen en los flujos de cambios de producción. Esto es especialmente importante en el caso de los flujos de cambios que monitorizan toda la base de datos.

Si no se usa ninguna base de datos independiente para almacenar los metadatos, le recomendamos que monitorice el impacto en la CPU del conector de flujos de cambios en sus instancias.

Comparar las nuevas secuencias de cambios y cambiar su tamaño si es necesario

Antes de añadir nuevos flujos de cambios a tu instancia de producción, te recomendamos que hagas una prueba de rendimiento de una carga de trabajo realista en una instancia de staging con los flujos de cambios habilitados. De esta forma, puedes determinar si necesitas añadir nodos a tu instancia para aumentar sus capacidades de computación y almacenamiento.

Ejecuta estas pruebas hasta que se estabilicen las métricas de CPU y almacenamiento. Lo ideal es que la utilización de la CPU de la instancia se mantenga por debajo de los máximos recomendados y que el uso del almacenamiento no supere el límite de almacenamiento de la instancia.

Usar diferentes regiones para el balanceo de carga

Cuando se usan flujos de cambios en una configuración de instancia multirregional, se recomienda ejecutar sus pipelines de procesamiento en una región distinta de la región principal predeterminada. De esta forma, la carga de streaming se distribuye entre las réplicas que no son principales. Sin embargo, si necesitas priorizar la latencia de streaming más baja posible en lugar del balanceo de carga, ejecuta la carga de streaming en la región principal.

Siguientes pasos