使用 Java 读取变更数据流

Java 版 Cloud Bigtable 客户端库提供处理数据更改记录的初级方法。不过,在大多数情况下,我们建议您使用 Dataflow 流式传输更改,而不是使用本页面中介绍的方法,因为 Dataflow 会为您处理分区拆分和合并。

准备工作

在使用 Java 读取变更数据流之前,请务必熟悉变更数据流概览。然后,满足以下前提条件。

设置身份验证

如需在本地开发环境中使用本页面上的 Java 示例,请安装并初始化 gcloud CLI,然后使用您的用户凭据设置应用默认凭据。

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

如需了解详情,请参阅 Set up authentication for a local development environment

如需了解如何为生产环境设置身份验证,请参阅 Set up Application Default Credentials for code running on Google Cloud

启用变更数据流

您必须先在表上启用变更数据流,然后才能读取数据流。您还可以创建启用了变更数据流的新表

所需的角色

如需获得读取 Bigtable 变更数据流所需的权限,请让您的管理员授予您以下 IAM 角色。

  • 针对 Bigtable 实例(包含您计划从中流式传输更改的表)的 Bigtable Administrator (roles/bigtable.admin)

将 Java 客户端库添加为依赖项

将类似于以下示例的代码添加到您的 Maven pom.xml 文件中。将 VERSION 替换为您正在使用的客户端库的版本。版本必须为 2.21.0 或更高版本。

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

确定表的分区

如需开始发出 ReadChangeStream 请求,您需要知道表的分区。您可以使用 GenerateInitialChangeStreamPartitions 方法确定。以下示例展示了如何使用此方法获取表示表中每个分区的 ByteStringRanges 流。每个 ByteStringRange 都包含分区的开始和结束键。

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

处理每个分区的更改

然后,您可以使用 ReadChangeStream 方法处理每个分区的更改。以下示例演示了如何从当前时间开始打开分区的数据流。

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery 接受以下参数:

  • 数据流分区(必需)- 从中流式传输更改的分区
  • 下列其中一项:
    • 开始时间 - 提交开始处理更改的时间戳
    • 连续令牌 - 表示要继续流式传输的位置的令牌
  • 结束时间(可选)- 提交在到达时停止处理更改的时间戳。如果您不提供此值,数据流就会继续读取。
  • 检测信号时长(可选)- 没有新更改时的检测信号消息的频率(默认为 5 秒)

变更数据流记录格式

返回的变更数据流记录是以下三种响应类型之一:

  • ChangeStreamMutation - 表示数据更改记录的消息。

  • CloseStream - 表示客户端应停止从数据流读取的消息。

    • 状态 - 表示关闭数据流的原因。以下状态之一:
      • OK - 已达到指定分区的结束时间
      • OUT_OF_RANGE - 指定分区不复存在,表示此分区上发生了拆分或合并。需要为每个新分区创建一个新的 ReadChangeStream 请求。
    • NewPartitions - 提供有关 OUT_OF_RANGE 响应的分区信息更新。
    • ChangeStreamContinuationTokens - 用于从同一位置恢复新 ReadChangeStream 请求的令牌列表。每个 NewPartition 一个。
  • Heartbeat - 包含可用于检查数据流状态的检查点的定期消息。

    • EstimatedLowWatermark - 指定分区的低水印估计值
    • ContinuationToken - 用于从当前位置继续流式传输指定分区的令牌。

数据更改记录内容

如需了解变更数据流记录,请参阅数据更改记录中包含的内容

处理分区中的更改

当表分区发生更改时,ReadChangeStream 请求会返回一条 CloseStream 消息,其中包含从新分区继续流式传输所需的信息。

发生拆分时,将包含多个新分区以及每个分区的相应 ContinuationToken。如需继续从同一位置流式传输新分区,请使用相应令牌对每个新分区发出新的 ReadChangeStream 请求。

例如,如果您要流式传输分区 [A,C) 并将其拆分为 [A,B)[B,C) 两个分区,则会出现以下事件序列:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

若要继续从同一位置流式传输每个分区,请发送以下 ReadChangeStreamQuery 请求:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

发生合并时,如需从同一分区继续,您需要发送包含合并分区中每个令牌的新 ReadChangeStream 请求。

例如,如果您要流式传输 [A,B)[B,C) 两个分区,并且它们合并到分区 [A,C) 中,则会出现以下事件序列:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

如需从同一位置继续流式传输分区 [A, C),请按如下所示发送 ReadChangeStreamQuery

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

后续步骤