创建和管理变更数据流

本页面介绍了如何为 GoogleSQL 方言数据库和 PostgreSQL 方言数据库创建、修改和查看 Spanner 变更数据流。如需详细了解变更数据流,请参阅变更数据流简介

由于变更数据流是架构对象,因此您可以通过用于任何其他类型的数据库定义工作(例如创建表或添加索引)的相同 DDL 驱动架构更新来创建和管理它们。

在您提交架构更改 DDL 语句(包括用于创建、更改或删除变更数据流的语句)后,Spanner 会开始执行长时间运行的操作。新的或已更改的变更数据流会在完成此长时间运行的操作后,开始监控其新配置指定的列或表。

创建变更流

如需创建变更数据流,您需要提供其名称以及它所监控的架构对象:整个数据库或特定表和列的列表。您可以选择使用以下任一项配置变更数据流:

GoogleSQL

使用 GoogleSQL 创建变更数据流的 DDL 语法如下所示:

CREATE CHANGE STREAM CHANGE_STREAM_NAME
  [FOR column_or_table_watching_definition[, ... ] ]
  [
    OPTIONS (
      retention_period = timespan,
      value_capture_type = type,
      exclude_ttl_deletes = boolean,
      exclude_insert = boolean,
      exclude_update = boolean,
      exclude_delete = boolean,
      allow_txn_exclusion = boolean
    )
  ]

PostgreSQL

使用 PostgreSQL 创建变更数据流的 DDL 语法如下所示:

CREATE CHANGE STREAM CHANGE_STREAM_NAME
  [FOR column_or_table_watching_definition[, ... ] ]
  [
    WITH (
      retention_period = timespan,
      value_capture_type = type,
      exclude_ttl_deletes = boolean,
      exclude_insert = boolean,
      exclude_update = boolean,
      exclude_delete = boolean,
      allow_txn_exclusion = boolean
    )
  ]

新的变更数据流会在创建它的长时间运行操作完成后立即开始监控其已分配的架构对象。

以下示例展示了如何使用各种配置创建变更数据流。

监控整个数据库

如需创建一个变更数据流来监控在整个数据库的表中执行的每项数据更改,请使用 ALL 关键字:

CREATE CHANGE STREAM EverythingStream
FOR ALL;

ALL 配置会隐式包含数据库的所有未来数据表和列,只要它们一创建,就会包含在内。它不包含视图、信息架构表或除常规数据表之外的其他对象。

监控特定表

如需将变更数据流的范围限制为特定表(而不是整个数据库),请指定一个或多个表的列表:

CREATE CHANGE STREAM SingerAlbumStream
FOR Singers, Albums;

Spanner 会自动更新监控整个表的变更数据流,以反映影响这些表的任何架构更改(例如添加或删除列)。

监控特定列

使用 table(column_1[, column_2, ...]) 语法可监控您命名的表中一个或多个特定非键列的更改:

CREATE CHANGE STREAM NamesAndTitles
FOR Singers(FirstName, LastName), Albums(Title);

您无法在此处指定主键列,因为每个变更数据流始终会跟踪其所监控的每个表的主键。这样,每个数据更改记录都可以通过主键来标识已更改的行。

监控单个数据流中的表和列

您可以在单个变更数据流中组合前面两个示例中的表监控和列监控语法:

CREATE CHANGE STREAM NamesAndAlbums
FOR Singers(FirstName, LastName), Albums;

指定较长的保留期限

如需指定变更数据流数据保留期限(超过默认的一天),请将 retention_period 设置为最长一周的期限(以小时 (h) 或天 (d) 表示)。

两个示例:

GoogleSQL

CREATE CHANGE STREAM LongerDataRetention
FOR ALL
OPTIONS ( retention_period = '36h' );
CREATE CHANGE STREAM MaximumDataRetention
FOR ALL
OPTIONS ( retention_period = '7d' );

PostgreSQL

CREATE CHANGE STREAM LongerDataRetention
FOR ALL
WITH ( retention_period = '36h' );
CREATE CHANGE STREAM MaximumDataRetention
FOR ALL
WITH ( retention_period = '7d' );

指定其他值捕获类型

如需指定除 OLD_AND_NEW_VALUES 以外的变更数据流值捕获类型,请将 value_capture_type 设置为 NEW_VALUESNEW_ROW,如以下示例所示:

GoogleSQL

CREATE CHANGE STREAM NewRowChangeStream
FOR ALL
OPTIONS ( value_capture_type = 'NEW_ROW' );
CREATE CHANGE STREAM NewValuesChangeStream
FOR ALL
OPTIONS ( value_capture_type = 'NEW_VALUES' );

PostgreSQL

CREATE CHANGE STREAM NewRowChangeStream
FOR ALL
WITH ( value_capture_type = 'NEW_ROW' );
CREATE CHANGE STREAM NewValuesChangeStream
FOR ALL
WITH ( value_capture_type = 'NEW_VALUES' );

过滤基于 TTL 的删除操作

您可以使用 exclude_ttl_deletes 过滤条件从变更数据流的范围中过滤基于 TTL 的删除操作

如需详细了解此过滤条件的工作原理,请参阅基于存留时间的删除过滤条件

GoogleSQL

如需使用基于 TTL 的删除过滤条件创建变更数据流,请运行以下示例:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
OPTIONS (exclude_ttl_deletes = true)

替换以下内容:

  • CHANGE_STREAM_NAME:新变更数据流的名称

以下示例创建了一个名为 NewFilterChangeStream 的变更数据流,该流会排除所有基于 TTL 的删除操作:

CREATE CHANGE STREAM NewFilterChangeStream FOR ALL
OPTIONS (exclude_ttl_deletes = true)

PostgreSQL

如需使用基于 TTL 的删除过滤条件创建变更数据流,请运行以下示例:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
WITH (exclude_ttl_deletes = true)

替换以下内容:

  • CHANGE_STREAM_NAME:新变更数据流的名称

以下示例创建了一个名为 NewFilterChangeStream 的变更数据流,该流会排除所有基于 TTL 的删除操作:

CREATE CHANGE STREAM NewFilterChangeStream FOR ALL
WITH (exclude_ttl_deletes = true)

如需从现有变更数据流中添加或移除基于 TTL 的删除过滤条件,请参阅修改基于 TTL 的删除过滤条件。您可以通过以 DDL 形式查看变更数据流的定义来确认变更数据流过滤条件。

按表修改类型过滤

使用以下可用过滤选项从变更数据流的范围中过滤一个或多个此类表修改:

  • exclude_insert:排除所有 INSERT 表修改
  • exclude_update:排除所有 UPDATE 表修改
  • exclude_delete:排除所有 DELETE 表修改

如需详细了解这些过滤条件的运作方式,请参阅表修改类型过滤条件

GoogleSQL

如需创建包含一个或多个表修改类型过滤条件的变更数据流,请运行以下命令:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
OPTIONS (MOD_TYPE_FILTER_NAME = true)

替换以下内容:

  • CHANGE_STREAM_NAME:新变更数据流的名称
  • MOD_TYPE_FILTER_NAME:您要添加的过滤条件:exclude_insertexclude_updateexclude_delete。如果要一次添加多个过滤条件,请用英文逗号分隔每个过滤条件。

以下示例创建了一个名为 NewFilterChangeStream 的变更数据流,该流会排除 INSERTUPDATE 表修改类型:

CREATE CHANGE STREAM NewFilterChangeStream FOR ALL
OPTIONS (exclude_insert = true, exclude_update = true)

PostgreSQL

如需创建包含一个或多个表修改类型过滤条件的变更数据流,请运行以下命令:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
WITH (MOD_TYPE_FILTER_NAME = true)

替换以下内容:

  • CHANGE_STREAM_NAME:新变更数据流的名称
  • MOD_TYPE_FILTER_NAME:您要添加的过滤条件:exclude_insertexclude_updateexclude_delete。如果要一次添加多个过滤条件,请用英文逗号分隔每个过滤条件。

以下示例创建了一个名为 NewFilterChangeStream 的变更数据流,该流会排除 INSERTUPDATE 表修改类型:

CREATE CHANGE STREAM NewFilterChangeStream FOR ALL
WITH (exclude_insert = true, exclude_update = true)

如需在现有变更数据流中添加或从中移除表修改类型过滤条件,请参阅按表修改类型修改过滤条件。您可以将变更数据流的定义视为 DDL,以确认变更数据流存在哪些表修改类型过滤条件。

启用事务级记录排除

您可以通过在创建变更数据流时设置 allow_txn_exclusion 选项,或者通过修改现有变更数据流,让变更数据流排除指定写入事务中的记录。

如需详细了解此选项的工作原理,请参阅事务级记录排除

GoogleSQL

如需创建可排除指定写入事务中的记录的变更数据流,请运行以下命令:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
OPTIONS (allow_txn_exclusion = true)

替换以下内容:

  • CHANGE_STREAM_NAME:新变更数据流的名称

以下示例创建了一个名为 NewChangeStream 的变更数据流,该变更数据流可以排除指定写入事务中的记录:

CREATE CHANGE STREAM NewChangeStream FOR ALL
OPTIONS (allow_txn_exclusion = true)

PostgreSQL

如需创建可排除指定写入事务中的记录的变更数据流,请运行以下命令:

CREATE CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
WITH (allow_txn_exclusion = true)

替换以下内容:

  • CHANGE_STREAM_NAME:新变更数据流的名称

以下示例创建了一个名为 NewChangeStream 的变更数据流,该流可以排除指定写入事务中的记录:

CREATE CHANGE STREAM NewChangeStream FOR ALL
WITH (allow_txn_exclusion = true)

如需启用或停用现有变更数据流中的事务级记录排除功能,请参阅修改事务级记录排除功能。如需检查此选项的设置,请参阅以 DDL 形式查看变更数据流的定义

指定要从变更数据流中排除的写入事务

如需指定要从变更数据流中排除的写入事务,您必须将 exclude_txn_from_change_streams 参数设为 true。以下代码示例展示了如何使用客户端库指定要从变更数据流中排除的写入事务。

Go


import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/spanner"
)

// readWriteTxnExcludedFromChangeStreams executes the insert and update DMLs on
// Singers table excluded from tracking change streams with ddl option
// allow_txn_exclusion = true.
func readWriteTxnExcludedFromChangeStreams(w io.Writer, db string) error {
	// db = `projects/<project>/instances/<instance-id>/database/<database-id>`
	ctx := context.Background()
	client, err := spanner.NewClient(ctx, db)
	if err != nil {
		return fmt.Errorf("readWriteTxnExcludedFromChangeStreams.NewClient: %w", err)
	}
	defer client.Close()

	_, err = client.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
		stmt := spanner.Statement{
			SQL: `INSERT Singers (SingerId, FirstName, LastName)
					VALUES (111, 'Virginia', 'Watson')`,
		}
		_, err := txn.Update(ctx, stmt)
		if err != nil {
			return fmt.Errorf("readWriteTxnExcludedFromChangeStreams.Update: %w", err)
		}
		fmt.Fprintln(w, "New singer inserted.")
		stmt = spanner.Statement{
			SQL: `UPDATE Singers SET FirstName = 'Hi' WHERE SingerId = 111`,
		}
		_, err = txn.Update(ctx, stmt)
		if err != nil {
			return fmt.Errorf("readWriteTxnExcludedFromChangeStreams.Update: %w", err)
		}
		fmt.Fprint(w, "Singer first name updated.")
		return nil
	}, spanner.TransactionOptions{ExcludeTxnFromChangeStreams: true})
	if err != nil {
		return err
	}
	return nil
}

Java

static void readWriteTxnExcludedFromChangeStreams(DatabaseClient client) {
  // Exclude the transaction from allowed tracking change streams with alloww_txn_exclusion=true.
  // This exclusion will be applied to all the individual operations inside this transaction.
  client
      .readWriteTransaction(Options.excludeTxnFromChangeStreams())
      .run(
          transaction -> {
            transaction.executeUpdate(
                Statement.of(
                    "INSERT Singers (SingerId, FirstName, LastName)\n"
                        + "VALUES (1341, 'Virginia', 'Watson')"));
            System.out.println("New singer inserted.");

            transaction.executeUpdate(
                Statement.of("UPDATE Singers SET FirstName = 'Hi' WHERE SingerId = 111"));
            System.out.println("Singer first name updated.");

            return null;
          });
}

修改变更数据流

如需修改变更数据流的配置,请使用 ALTER CHANGE STREAM DDL 语句。它使用与 CREATE CHANGE STREAM 类似的语法。您可以更改数据流监控的列,或更改其数据保留期限的长度。您也可以完全暂停对其进行监控,同时保留其数据更改记录。

修改变更数据流监控的内容

以下示例将整个 Songs 表添加到之前配置的 NamesAndAlbums 变更数据流中:

ALTER CHANGE STREAM NamesAndAlbums
SET FOR Singers(FirstName, LastName), Albums, Songs;

在更新数据库架构中变更数据流的定义的长时间运行操作完成后,Spanner 会将命名变更数据流的行为替换为新配置。

修改变更数据流的数据保留期限

如需修改变更数据流保留其内部记录的时长,请在 ALTER CHANGE STREAM DDL 语句中设置 retention_period

以下示例会将数据保留期限调整为之前创建的 NamesAndAlbums 变更数据流:

GoogleSQL

ALTER CHANGE STREAM NamesAndAlbums
SET OPTIONS ( retention_period = '36h' );

PostgreSQL

ALTER CHANGE STREAM NamesAndAlbums
SET ( retention_period = '36h' );

修改变更数据流的值捕获类型

如需修改变更数据流的值捕获类型,请在 ALTER CHANGE STREAM DDL 语句中设置 value_capture_type 子句。

此示例将值捕获类型调整为 NEW_VALUES

GoogleSQL

ALTER CHANGE STREAM NamesAndAlbums
SET OPTIONS ( value_capture_type = 'NEW_VALUES' );

PostgreSQL

ALTER CHANGE STREAM NamesAndAlbums
SET ( value_capture_type = 'NEW_VALUES' );

修改基于 TTL 的删除过滤条件

如需修改变更数据流的基于 TTL 的删除过滤条件,请在 ALTER CHANGE STREAM DDL 语句中设置 exclude_ttl_deletes 过滤条件。您可以使用此方法将过滤条件添加到现有变更数据流中或从中移除过滤条件。

如需详细了解这些过滤条件的工作原理,请参阅基于存留时间的删除过滤条件

将基于 TTL 的删除过滤条件添加到现有变更数据流

GoogleSQL

如需将基于 TTL 的删除过滤条件添加到现有变更数据流,请运行以下命令以将过滤条件设置为 true

ALTER CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
SET OPTIONS (exclude_ttl_deletes = true)

替换以下内容:

  • CHANGE_STREAM_NAME:现有变更数据流的名称

在以下示例中,exclude_ttl_deletes 过滤条件已添加到名为 NewFilterChangeStream 的现有变更数据流中,该流会排除所有基于 TTL 的删除操作:

ALTER CHANGE STREAM NewFilterChangeStream FOR ALL
SET OPTIONS (exclude_ttl_deletes = true)

这会从变更数据流中排除所有未来基于 TTL 的删除操作。

PostgreSQL

如需将基于 TTL 的删除过滤条件添加到现有变更数据流,请运行以下命令以将过滤条件设置为 true

ALTER CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
SET (exclude_ttl_deletes = true)

替换以下内容:

  • CHANGE_STREAM_NAME:现有变更数据流的名称

在以下示例中,exclude_ttl_deletes 过滤条件已添加到名为 NewFilterChangeStream 的现有变更数据流中,该流会排除所有基于 TTL 的删除操作:

ALTER CHANGE STREAM NewFilterChangeStream FOR ALL
SET (exclude_ttl_deletes = true)

这会从变更数据流中排除所有未来基于 TTL 的删除操作。

从现有变更数据流中移除基于 TTL 的删除过滤条件

GoogleSQL

如需从现有变更数据流移除基于 TTL 的删除过滤条件,请运行以下命令以将过滤条件设置为 false

ALTER CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
SET OPTIONS (exclude_ttl_deletes = false)

替换以下内容:

  • CHANGE_STREAM_NAME:新变更数据流的名称

在以下示例中,从名为 NewFilterChangeStream 的现有变更数据流中移除了 exclude_ttl_deletes 过滤条件:

ALTER CHANGE STREAM NewFilterChangeStream FOR ALL
SET OPTIONS (exclude_ttl_deletes = false)

这包括对变更数据流进行的所有未来基于 TTL 的删除操作。

您还可以将过滤条件设置为 null,以移除基于 TTL 的删除过滤条件。

PostgreSQL

如需从现有变更数据流移除基于 TTL 的删除过滤条件,请运行以下命令以将过滤条件设置为 false

ALTER CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
SET (exclude_ttl_deletes = false)

替换以下内容:

  • CHANGE_STREAM_NAME:新变更数据流的名称

在以下示例中,从名为 NewFilterChangeStream 的现有变更数据流中移除了 exclude_ttl_deletes 过滤条件:

ALTER CHANGE STREAM NewFilterChangeStream FOR ALL
SET (exclude_ttl_deletes = false)

这包括对变更数据流进行的所有未来基于 TTL 的删除操作。

您还可以将过滤条件设置为 null,以移除基于 TTL 的删除过滤条件。

按表修改类型修改过滤条件

如需修改变更数据流的表修改类型过滤条件,请在 ALTER CHANGE STREAM DDL 语句中设置过滤条件类型。您可以使用此方法为变更数据流添加新过滤条件,或从变更数据流中移除现有过滤条件。

向现有变更数据流添加表修改类型过滤条件

GoogleSQL

如需向现有变更数据流添加一个或多个新的表修改类型过滤条件,请运行以下命令以将过滤条件设置为 true

ALTER CHANGE STREAM CHANGE_STREAM_NAME
SET OPTIONS (MOD_TYPE_FILTER_NAME = true)

替换以下内容:

  • CHANGE_STREAM_NAME:替换为您现有变更数据流的名称
  • MOD_TYPE_FILTER_NAME:替换为您要添加的过滤条件:exclude_insertexclude_updateexclude_delete。如果要一次添加多个过滤条件,请用英文逗号分隔每个过滤条件。

在以下示例中,exclude_delete 过滤条件已添加到名为 NewFilterChangeStream 的现有变更数据流中:

ALTER CHANGE STREAM NewFilterChangeStream
SET OPTIONS (exclude_delete = true)

PostgreSQL

如需向现有变更数据流添加一个或多个新的表修改类型过滤条件,请运行以下命令以将过滤条件设置为 true

ALTER CHANGE STREAM CHANGE_STREAM_NAME
SET (MOD_TYPE_FILTER_NAME = true)

替换以下内容:

  • CHANGE_STREAM_NAME:替换为您现有变更数据流的名称
  • MOD_TYPE_FILTER_NAME:替换为您要添加的过滤条件:exclude_insertexclude_updateexclude_delete。如果要一次添加多个过滤条件,请用英文逗号分隔每个过滤条件。

在以下示例中,exclude_delete 过滤条件已添加到名为 NewFilterChangeStream 的现有变更数据流中:

ALTER CHANGE STREAM NewFilterChangeStream
SET (exclude_delete = true)

从现有变更数据流中移除表修改类型过滤条件

GoogleSQL

如需移除变更数据流中的一个或多个现有表修改类型过滤条件,请运行以下命令以将过滤条件设置为 false

ALTER CHANGE STREAM CHANGE_STREAM_NAME
SET OPTIONS (MOD_TYPE_FILTER_NAME = false)

替换以下内容:

  • CHANGE_STREAM_NAME:替换为您现有变更数据流的名称
  • MOD_TYPE_FILTER_NAME:替换为您要移除的过滤条件:exclude_insertexclude_updateexclude_delete。如果要一次移除多个过滤条件,请用英文逗号分隔每项过滤条件。

在以下示例中,从名为 NewFilterChangeStream 的现有变更数据流中移除了 exclude_delete 过滤条件:

ALTER CHANGE STREAM NewFilterChangeStream
SET OPTIONS (exclude_delete = false)

您还可以通过将过滤条件重置为默认值来移除表修改过滤条件。为此,请将过滤条件值设置为 null

PostgreSQL

如需移除变更数据流中的一个或多个现有表修改类型过滤条件,请运行以下命令以将过滤条件设置为 false

ALTER CHANGE STREAM CHANGE_STREAM_NAME
SET (MOD_TYPE_FILTER_NAME = false)

替换以下内容:

  • CHANGE_STREAM_NAME:替换为您现有变更数据流的名称
  • MOD_TYPE_FILTER_NAME:替换为您要移除的过滤条件:exclude_insertexclude_updateexclude_delete。如果要一次移除多个过滤条件,请用英文逗号分隔每项过滤条件。

在以下示例中,从名为 NewFilterChangeStream 的现有变更数据流中移除了 exclude_delete 过滤条件:

ALTER CHANGE STREAM NewFilterChangeStream
SET (exclude_delete = false)

您还可以通过将过滤条件重置为默认值来移除表修改过滤条件。为此,请将过滤条件值设置为 null

修改变更数据流以允许排除事务级记录

您可以修改变更数据流,使其能够排除指定写入事务中的记录。为此,请在 ALTER CHANGE STREAM DDL 语句中将 allow_txn_exclusion 选项设置为 true。如果您未设置此选项,或者将其设置为 false,则变更数据流会监控所有写入事务。

如需详细了解此选项的工作原理,请参阅事务级记录排除

为现有变更数据流启用事务级记录排除功能

GoogleSQL

如需为现有变更数据流启用事务级记录排除,请运行以下命令:

ALTER CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
SET OPTIONS (allow_txn_exclusion = true)

替换以下内容:

  • CHANGE_STREAM_NAME:现有变更数据流的名称

在以下示例中,allow_txn_exclusion 选项已在现有变更数据流 NewAllowedChangeStream 上启用:

ALTER CHANGE STREAM NewAllowedChangeStream FOR ALL
SET OPTIONS (allow_txn_exclusion = true)

这样,变更数据流就可以从指定的写入事务中排除记录。

PostgreSQL

如需为现有变更数据流启用事务级记录排除,请运行以下命令:

ALTER CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
SET (allow_txn_exclusion = true)

替换以下内容:

  • CHANGE_STREAM_NAME:现有变更数据流的名称

在以下示例中,allow_txn_exclusion 选项已在现有变更数据流 NewAllowedChangeStream 上启用:

ALTER CHANGE STREAM NewAllowedChangeStream FOR ALL
SET (allow_txn_exclusion = true)

这样,变更数据流就可以从指定的写入事务中排除记录。

为现有变更数据流停用事务级记录排除

GoogleSQL

如需在现有变更数据流中停用事务级记录排除,请运行以下命令:

ALTER CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
SET OPTIONS (allow_txn_exclusion = false)

替换以下内容:

  • CHANGE_STREAM_NAME:变更数据流的名称

在以下示例中,已在名为 NewAllowedChangeStream 的现有变更数据流上停用 allow_txn_exclusion 选项:

ALTER CHANGE STREAM NewFilterChangeStream FOR ALL
SET OPTIONS (allow_txn_exclusion = false)

变更数据流会监控所有写入事务。

PostgreSQL

如需在现有变更数据流中停用事务级记录排除,请运行以下命令:

ALTER CHANGE STREAM CHANGE_STREAM_NAME FOR ALL
SET (allow_txn_exclusion = false)

替换以下内容:

  • CHANGE_STREAM_NAME:变更数据流的名称

在以下示例中,已在名为 NewAllowedChangeStream 的现有变更数据流上停用 allow_txn_exclusion 选项:

ALTER CHANGE STREAM NewAllowedChangeStream FOR ALL
SET (allow_txn_exclusion = false)

变更数据流会监控所有写入事务。

暂停变更数据流

如果您希望变更数据流停止其活动,但保留其内部记录(至少在其数据保留期内),则可以对其进行更改,使其不监控任何内容。

为此,请发出 ALTER CHANGE STREAM DDL 语句,将变更数据流的定义替换为特殊字词 DROP FOR ALL。例如:

ALTER CHANGE STREAM MyStream DROP FOR ALL;

该数据流会继续存在于数据库中,但不会监控任何对象,也不会生成任何进一步的数据更改记录。其现有的更改记录保持不变,并受数据流的数据保留政策约束。

如需恢复已暂停的流,请使用其先前的配置发出另一个 ALTER CHANGE STREAM 语句。

删除变更数据流

如需永久删除变更数据流,请发出包含流名称的 DROP CHANGE STREAM 语句:

DROP CHANGE STREAM NamesAndAlbums;

Spanner 会立即停止该数据流,将其从数据库的架构中移除,并删除其数据更改记录。

列出和查看变更数据流

Google Cloud 控制台提供了一个网页界面,用于列出和查看数据库的变更数据流定义。您还可以将变更数据流的结构视为等效的 DDL 语句,或者通过查询数据库的信息架构来查看变更数据流的结构。

使用 Google Cloud 控制台查看变更数据流

如需查看数据库的变更数据流列表并查看其定义,请执行以下操作:

  1. 访问 Google Cloud 控制台的 Spanner 实例页面。

    打开实例页面

  2. 前往相应实例和数据库。

  3. 点击导航菜单中的更改数据流

这会显示该数据库的所有变更数据流的列表,并汇总每个变更数据流的配置。点击某个数据流的名称,即可显示其所监控的表和列的详细信息。

以 DDL 形式查看变更数据流的定义

以 DDL 形式查看数据库的架构时,会包含对其所有变更数据流的说明,这些变更数据流会显示为 CREATE CHANGE STREAM 语句。

  • 如需在控制台中执行此操作,请点击 Google Cloud 控制台中数据库页面上的显示等效 DDL 链接。

  • 如需通过命令行执行此操作,请使用 Google Cloud CLI 的 ddl describe 命令

查询有关变更数据流的信息架构

您可以直接查询数据库的变更数据流的信息架构。以下表格包含用于定义更改数据流名称、它们所监控的表和列以及保留期限的元数据:

变更数据流最佳实践

以下是一些有关配置和管理变更数据流的最佳实践。

考虑使用单独的元数据库

变更数据流使用元数据数据库来维护内部状态。元数据数据库可以与包含变更数据流的数据库相同,也可以不同。我们建议您为元数据存储创建单独的数据库。

Spanner 变更数据流连接器需要对元数据数据库拥有读写权限。您无需使用架构来准备此数据库;连接器会负责此操作。

使用单独的元数据库可以避免允许连接器直接写入应用数据库时可能出现的复杂情况:

  • 通过使用变更数据流将元数据库与生产数据库分开,连接器只需要对生产数据库具有读取权限。

  • 通过将连接器的流量限制到单独的元数据库,生产环境变更数据流中不包含由连接器本身执行的写入操作。这对于监控整个数据库的变更数据流尤其相关。

如果没有单独的数据库用于存储元数据,我们建议您监控变更数据流连接器对其实例的 CPU 影响。

对新的变更数据流进行基准测试,并根据需要调整大小

在将新的变更数据流添加到生产实例之前,请考虑在启用了变更数据流的预演实例上对真实工作负载进行基准化测试。这样,您就可以确定是否需要向实例添加节点,以增加其计算和存储容量。

运行这些测试,直到 CPU 和存储指标稳定。理想情况下,实例的 CPU 利用率应保持在建议的最大值以下,其存储空间用量不应超过实例的存储空间上限。

使用不同的区域进行负载均衡

多区域实例配置中使用变更数据流时,请考虑在与默认主要区域不同的区域运行其处理流水线。这有助于在非领导副本之间分摊流媒体负载。不过,如果您需要优先考虑尽可能降低流式传输延迟时间,而不是负载均衡,请在领导者区域运行流式传输负载。

后续步骤