Java で変更ストリームを読み取る

Java 用 Cloud Bigtable クライアント ライブラリには、データ変更レコードを処理する低レベルのメソッドが用意されています。ただし、ほとんどの場合、Dataflow がパーティション分割を処理し、結合するため、このページで説明されている方法を使用する代わりに、Dataflow で変更をストリーミングすることをおすすめします。

始める前に

Java で変更ストリームを読む前に、変更ストリームの概要を十分に理解してください。その後、次の前提条件を完了します。

認証の設定

ローカル開発環境でこのページの Java サンプルを使用するには、gcloud CLI をインストールして初期化し、ユーザー認証情報を使用してアプリケーションのデフォルト認証情報を設定します。

  1. Install the Google Cloud CLI.

  2. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

詳細については Set up authentication for a local development environment をご覧ください。

本番環境での認証の設定については、 Set up Application Default Credentials for code running on Google Cloudをご覧ください。

変更ストリームを有効にする

変更ストリームを読み取るには、まずテーブルで変更ストリームを有効にする必要があります。変更ストリームを有効にして新しいテーブルを作成することもできます。

必要なロール

Bigtable 変更ストリームの読み取りに必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。

  • 変更のストリーミング元となるテーブルを含む Bigtable インスタンスの Bigtable 管理者(roles/bigtable.admin

Java クライアント ライブラリを依存関係として追加する

Maven pom.xml ファイルに次のようなコードを追加します。VERSION は、使用しているクライアント ライブラリのバージョンに置き換えます。バージョンは 2.21.0 以降にする必要があります。

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

テーブルのパーティションを特定する

ReadChangeStream リクエストを開始するには、テーブルのパーティションを把握する必要があります。これは、GenerateInitialChangeStreamPartitions メソッドを使用して把握できます。次の例では、このメソッドを使用して、テーブル内の各パーティションを表す ByteStringRanges のストリームを取得する方法を示しています。各 ByteStringRange には、パーティションの開始キーと終了キーが含まれています。

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

各パーティションの変更を処理する

次に、ReadChangeStream メソッドを使用して、各パーティションの変更を処理します。次の例は、現在の時刻からパーティションのストリームを開始する方法を示しています。

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery は、以下の引数を受け入れます。

  • ストリーム パーティション(必須)- 変更のストリーミング元となるパーティション
  • 次のいずれかです。
    • 開始時刻 - 変更の処理を開始する時点の commit タイムスタンプ
    • 継続トークン - ストリーミングを再開する位置を示すトークン
  • 終了時刻(省略可)- 上限に達したときに変更の処理を停止する時点の commit タイムスタンプ。値を指定しない場合、ストリームの読み取りが継続します。
  • ハートビートの期間(省略可)- 新しい変更がない場合のハートビート メッセージの頻度(デフォルトは 5 秒)

変更ストリームのレコード形式

返される変更ストリーム レコードは、次のいずれかのレスポンス タイプになります。

  • ChangeStreamMutation - データ変更レコードを表すメッセージ。

  • CloseStream - クライアントがストリームからの読み取りを停止する必要があることを示すメッセージ。

    • ステータス - ストリームの終了理由を示します。次のいずれか:
      • OK - 指定されたパーティションの終了時刻に達しました。
      • OUT_OF_RANGE - 指定されたパーティションはすでに存在しません。つまり、このパーティションでスプリットまたはマージが行われました。新しいパーティションに対して、新しい ReadChangeStream リクエストを送信する必要があります。
    • NewPartitions - 更新されたパーティションの情報を提供します(OUT_OF_RANGE レスポンスが返された場合)。
    • ChangeStreamContinuationTokens - 同じ位置から新しい ReadChangeStream リクエストを再開するために使用されるトークンのリスト。NewPartition につき 1 つ。
  • Heartbeat - ストリームの状態を確認できる情報を含む定期的なメッセージ。

    • EstimatedLowWatermark - 指定されたパーティションの低ウォーターマークの見積もり。
    • ContinuationToken - 現在の位置から指定パーティションのストリーミングを再開するためのトークン。

データ変更レコードの内容

変更ストリーム レコードの詳細については、データ変更レコードの内容をご覧ください。

パーティションでの変更を処理する

テーブルのパーティションが変更されると、ReadChangeStream リクエストは、新しいパーティションからのストリーミングを再開するために必要な情報を含む CloseStream メッセージを返します。

スプリットの場合は、複数の新しいパーティションと、各パーティションの対応する ContinuationToken が含まれます。同じ位置から新しいパーティションのストリーミングを再開するには、対応するトークンを持つ新しいパーティションごとに新しい ReadChangeStream リクエストを行います。

たとえば、パーティション [A,C) をストリーミングしているときに、このパーティションが [A,B)[B,C) の 2 つのパーティションに分割された場合、イベント シーケンスは次のようになります。

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

同じ位置から各パーティションのストリーミングを再開するには、次の ReadChangeStreamQuery リクエストを送信します。

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

マージの場合、同じパーティションから再開するには、マージされたパーティションの各トークンを含む新しい ReadChangeStream リクエストを送信する必要があります。

たとえば、2 つのパーティション([A,B)[B,C))をストリーミングしているときに、これらのパーティションが [A,C) マージされた場合、イベント シーケンスは次のようになります。

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

同じ位置からパーティション [A, C) のストリーミングを再開するには、次のような ReadChangeStreamQuery を送信します。

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

次のステップ