使用 Java 讀取變更串流

Java 適用的 Cloud Bigtable 用戶端程式庫提供處理資料變更記錄的低階方法。不過,在大多數情況下,我們建議您使用 Dataflow 串流處理變更,而非使用本頁所述方法,因為 Dataflow 會為您處理分割區的分割和合併作業。

事前準備

使用 Java 讀取變更串流之前,請務必先熟悉變更串流總覽。然後完成下列必要條件。

設定驗證方法

如要在本機開發環境中使用本頁的 Java 範例,請安裝並初始化 gcloud CLI,然後使用使用者憑證設定應用程式預設憑證。

  1. Install the Google Cloud CLI.

  2. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

詳情請參閱 Set up authentication for a local development environment

如要瞭解如何設定正式版環境的驗證作業,請參閱 Set up Application Default Credentials for code running on Google Cloud

啟用變更串流

您必須先在資料表上啟用變更串流,才能讀取資料表。您也可以建立新資料表,並啟用變更串流。

必要的角色

如要取得讀取 Bigtable 變更串流所需的權限,請管理員授予下列 IAM 角色。

  • Bigtable 系統管理員 (roles/bigtable.admin) 須位於包含您要串流變更的資料表所在的 Bigtable 執行個體中

將 Java 用戶端程式庫新增為依附元件

在 Maven pom.xml 檔案中新增類似下列的程式碼。將 VERSION 替換為您使用的用戶端程式庫版本。版本必須為 2.21.0 以上。

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

判斷資料表的分區

如要開始發出 ReadChangeStream 要求,您必須知道資料表的分區。您可以使用 GenerateInitialChangeStreamPartitions 方法判斷這項資訊。以下範例說明如何使用這個方法取得 ByteStringRanges 串流,代表資料表中的每個分區。每個 ByteStringRange 都包含分區的開始和結束鍵。

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

處理每個分區的變更

然後使用 ReadChangeStream 方法處理每個分割區的變更。以下範例說明如何從目前時間開始,開啟分區的串流。

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery 接受下列引數:

  • 串流分區 (必要) - 要從中串流變更的分區
  • 下列其中一項:
    • 開始時間 - 開始處理變更的提交時間戳記
    • 接續符記 - 代表要從哪個位置繼續串流的符記
  • 結束時間 (選用) - 達到此時間時,系統會停止處理變更的修訂時間戳記。如未提供值,串流會繼續讀取。
  • 心跳時間長度 (選用) - 沒有新變化時的心跳訊息頻率 (預設為五秒)

變更串流錄製格式

傳回的變更串流記錄是下列三種回應類型之一:

  • ChangeStreamMutation:代表資料變更記錄的訊息。

  • CloseStream:表示用戶端應停止從串流讀取資料的訊息。

    • 狀態 - 顯示關閉串流的原因。下列其中一個值:
      • OK - 已達指定分區的結束時間
      • OUT_OF_RANGE - 指定的分區已不存在,表示該分區已分割或合併。每個新分割區都必須建立新的 ReadChangeStream 要求。
    • NewPartitions:提供 OUT_OF_RANGE 回應的最新分區資訊。
    • ChangeStreamContinuationTokens - 用於從相同位置繼續提出新 ReadChangeStream 要求的符記清單。每個 NewPartition 一個。
  • Heartbeat - 定期傳送的訊息,內含可用於檢查串流狀態的資訊。

    • EstimatedLowWatermark - 估算指定分區的低水位線
    • ContinuationToken:從目前位置繼續串流指定分割區的權杖。

資料變更記錄內容

如要瞭解變更串流記錄,請參閱「資料變更記錄的內容」。

處理分區異動

資料表的分區變更時,ReadChangeStream 請求會傳回 CloseStream 訊息,其中包含從新分區繼續串流所需的資訊。

如果是分割,這會包含多個新分區,以及每個分區對應的 ContinuationToken。如要從相同位置繼續串流新分割區,請為每個新分割區發出新的 ReadChangeStream 要求,並附上對應的權杖。

舉例來說,如果您正在串流分區 [A,C),而該分區一分為二,變成 [A,B)[B,C),則預期會發生下列事件序列:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

如要從傳送下列 ReadChangeStreamQuery 要求時的位置繼續串流每個分割區,請執行下列操作:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

如要合併,並從相同分割區繼續作業,您需要傳送新的 ReadChangeStream 要求,其中包含合併分割區的每個權杖。

舉例來說,如果您串流兩個分區 ([A,B)[B,C)),並將其合併至分區 [A,C),則預期會發生下列事件序列:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

如要從相同位置繼續串流傳輸分割區 [A, C),請傳送類似下列內容的 ReadChangeStreamQuery

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

後續步驟