Leggere un flusso di modifiche con Java

La libreria client di Cloud Bigtable per Java fornisce metodi di basso livello per l'elaborazione dei record delle modifiche dei dati. Tuttavia, nella maggior parte dei casi, ti consigliamo di eseguire lo streaming delle modifiche con Dataflow anziché utilizzare i metodi descritti in questa pagina, perché Dataflow gestisce le suddivisioni e le unioni delle partizioni per te.

Prima di iniziare

Prima di leggere un stream di variazioni con Java, assicurati di conoscere la panoramica degli stream di variazioni. Poi completa i seguenti prerequisiti.

Configura l'autenticazione

Per utilizzare gli Java esempi in questa pagina in un ambiente di sviluppo locale, installa e inizializza l'interfaccia alla gcloud CLI, quindi configura le Credenziali predefinite dell'applicazione con le tue credenziali utente.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

Per ulteriori informazioni, consulta Set up authentication for a local development environment.

Per informazioni sulla configurazione dell'autenticazione per un ambiente di produzione, consulta Set up Application Default Credentials for code running on Google Cloud.

Attivare un flusso di modifiche

Per poter leggere una tabella, devi attivare un flusso di modifiche su di essa. Puoi anche creare una nuova tabella con uno stream di modifiche abilitato.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per leggere un stream di variazioni Bigtable, chiedi all'amministratore di concederti il seguente ruolo IAM.

  • Amministratore Bigtable (roles/bigtable.admin) nell'istanza Bigtable contenente la tabella da cui vuoi eseguire lo streaming delle modifiche

Aggiungi la libreria client Java come dipendenza

Aggiungi codice simile al seguente al file Maven pom.xml. Sostituisci VERSION con la versione della libreria client in uso. La versione deve essere 2.21.0 o successiva.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Determina le partizioni della tabella

Per iniziare a effettuare richieste ReadChangeStream, devi conoscere le partizioni della tabella. Questo valore può essere determinato utilizzando il metodo GenerateInitialChangeStreamPartitions. L'esempio seguente mostra come utilizzare questo metodo per ottenere uno stream di ByteStringRanges che rappresenta ogni partizione della tabella. Ogni ByteStringRange contiene la chiave di inizio e di fine di una partizione.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Elabora le modifiche per ogni partizione

Puoi quindi elaborare le modifiche per ogni partizione utilizzando il ReadChangeStream metodo. Questo è un esempio di come aprire uno stream per una partizione, a partire dall'ora corrente.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery accetta i seguenti argomenti:

  • Partizione stream (obbligatoria): la partizione da cui eseguire lo streaming delle modifiche
  • Uno dei seguenti:
    • Ora di inizio: timestamp del commit da cui iniziare l'elaborazione delle modifiche
    • Token di continuazione: token che rappresentano una posizione da cui riprendere lo streaming
  • Ora fine (facoltativo): timestamp del commit per interrompere l'elaborazione delle modifiche al suo raggiungimento. Se non fornisci un valore, lo stream continua a leggere.
  • Durata dell'intervallo (facoltativo) - Frequenza dei messaggi di intervallo quando non ci sono nuove modifiche (il valore predefinito è cinque secondi)

Modificare il formato del record dello stream

Un record dello stream di modifiche restituito è uno dei tre tipi di risposta:

  • ChangeStreamMutation: un messaggio che rappresenta un record di modifica dei dati.

  • CloseStream: un messaggio che indica che il client deve interrompere la lettura dallo stream.

    • Stato: indica il motivo della chiusura dello stream. Uno dei seguenti:
      • OK: è stata raggiunta l'ora di fine per la partizione specificata
      • OUT_OF_RANGE: la partizione specificata non esiste più, il che significa che sono state eseguite suddivisioni o unioni in questa partizione. Per ogni nuova partizione dovrà essere creata una nuova richiesta ReadChangeStream.
    • NewPartitions: fornisce le informazioni aggiornate sulla suddivisione delle risposteOUT_OF_RANGE.
    • ChangeStreamContinuationTokens: elenco di token utilizzati per riprendere le nuove richieste ReadChangeStream dalla stessa posizione. Una per NewPartition.
  • Heartbeat: un messaggio periodico con informazioni che possono essere utilizzate per eseguire il checkpoint dello stato dello stream.

    • EstimatedLowWatermark - Stima della filigrana bassa per la partizione specificata
    • ContinuationToken: token per riprendere lo streaming della partizione specificata dalla posizione corrente.

Contenuti dei record di modifica dei dati

Per informazioni sui record dello stream di modifiche, vedi Che cosa contiene un record di variazione dei dati.

Gestire le modifiche nelle partizioni

Quando le partizioni di una tabella cambiano, le richieste ReadChangeStream resistono un messaggio CloseStream con le informazioni necessarie per riprendere lo streaming dalle nuove partizioni.

Per una divisione, conterrà più nuove partizioni e un corrispondente ContinuationToken per ogni partizione. Per riprendere lo streaming delle nuove parti dalla stessa posizione, invia una nuova richiesta ReadChangeStream per ogni nuova partizione con il token corrispondente.

Ad esempio, se stai eseguendo lo streaming della partizione [A,C) e questa si suddivide in due partizioni, [A,B) e [B,C), puoi aspettarti la seguente sequenza di eventi:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Per riprendere lo streaming di ogni partizione dalla stessa posizione, invia le seguenti richieste ReadChangeStreamQuery:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Per un'unione, per riprendere dalla stessa partizione, devi inviare una nuova richiestaReadChangeStream contenente ogni token delle partizioni unite.

Ad esempio, se stai eseguendo lo streaming di due partizioni, [A,B) e [B,C), che si fondono nella partizione [A,C), puoi prevedere la seguente sequenza di eventi:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Per riprendere lo streaming della partizione [A, C) dalla stessa posizione, invia un messaggio ReadChangeStreamQuery come il seguente:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

Passaggi successivi