Cómo leer un flujo de cambios con Java

La biblioteca cliente de Cloud Bigtable para Java proporciona métodos de bajo nivel para procesar registros de cambios de datos. Sin embargo, en la mayoría casos, recomendamos que transmites los cambios con Dataflow en vez de usar los métodos descritos en esta página, ya que Dataflow controla divisiones de partición y se combina automáticamente.

Antes de comenzar

Antes de leer un flujo de cambios con Java, asegúrate de conocer el Descripción general de los flujos de cambios. Luego, completa los siguientes requisitos previos.

Configura la autenticación

Para usar las muestras de Java de esta página en un entorno de desarrollo local, instala e inicializa gcloud CLI y, luego, configura las credenciales predeterminadas de la aplicación con tus credenciales de usuario.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

Para obtener más información, consulta Set up authentication for a local development environment.

Si quieres obtener información sobre cómo configurar la autenticación para un entorno de producción, consulta Set up Application Default Credentials for code running on Google Cloud.

Habilita un flujo de cambios

Debes habilitar un cambio transmitir sobre una mesa antes de que puedas leerlos. También puedes crear un nuevo tabla con un flujo de cambios habilitado.

Roles obligatorios

A fin de obtener los permisos que necesitas para leer un cambio de Bigtable Pídele a tu administrador que te otorgue el siguiente rol de IAM.

  • Administrador de Bigtable (roles/bigtable.admin) en la instancia de Bigtable que contiene la tabla que planeas usar transmitir cambios de

Agrega la biblioteca cliente de Java como dependencia

Agrega un código similar al siguiente al archivo pom.xml de Maven. Reemplazar VERSION por la versión de la biblioteca cliente que usas usan. La versión debe ser 2.21.0 o posterior.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Determina las particiones de la tabla

Para comenzar a realizar solicitudes ReadChangeStream, debes conocer las particiones de tu tabla. Esto se puede determinar con el GenerateInitialChangeStreamPartitions. En el siguiente ejemplo, se muestra cómo usar este método para obtener un flujo de ByteStringRanges que representan cada partición de la tabla. Cada ByteStringRange contiene la clave de inicio y finalización de una partición.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Cambios del proceso para cada partición

Luego, puedes procesar los cambios para cada partición con el ReadChangeStream . Este es un ejemplo de cómo abrir una transmisión para una partición, a partir del desde la hora actual.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery acepta los siguientes argumentos:

  • Partición de transmisión (obligatorio): La partición desde la que se transmiten los cambios
  • Uno de los siguientes:
    • Hora de inicio: Es la marca de tiempo de confirmación para comenzar a procesar los cambios a partir de ella.
    • Tokens de continuación: tokens que representan una posición para reanudar transmitiendo desde
  • Hora de finalización (opcional): Confirma una marca de tiempo para dejar de procesar cambios cuando alcanzada. Si no proporcionas un valor, la transmisión continúa leyendo.
  • Duración de la señal de monitoreo de funcionamiento (opcional): Es la frecuencia de los mensajes de señal de monitoreo de funcionamiento cuando no hay cambios nuevos (el valor predeterminado es de cinco segundos)

Cambia el formato del registro de la transmisión

Un registro de flujo de cambios que se muestra es uno de los tres tipos de respuesta:

  • ChangeStreamMutation: Es un mensaje que representa un registro de cambios en los datos.

  • CloseStream: Es un mensaje que indica que el cliente debe dejar de leer. de la transmisión.

    • Estado: Indica el motivo por el que se cerró la transmisión. Uno de los siguientes:
      • OK: Se alcanzó la hora de finalización para la partición especificada.
      • OUT_OF_RANGE: La partición determinada ya no existe, lo que significa que se produjeron divisiones o combinaciones en esta partición. Un nuevo Se deberá crear una solicitud ReadChangeStream para cada partición nueva.
    • NewPartitions: Proporciona la información actualizada de la partición sobre OUT_OF_RANGE respuestas.
    • ChangeStreamContinuationTokens: Lista de tokens que se usan para reanudar solicitudes nuevas de ReadChangeStream de la misma posición. Uno por NewPartition.
  • Heartbeat: Es un mensaje periódico con información que se puede usar para control del estado de la transmisión.

    • EstimatedLowWatermark: estimación de la marca de agua baja del partición determinada
    • ContinuationToken: Es el token para reanudar la transmisión del valor especificado. a partir de la posición actual.

Contenido del registro de cambios de datos

Para obtener información sobre los registros de flujos de cambios, consulta ¿Qué incluye un cambio de datos? registro.

Controla los cambios en las particiones

Cuando cambian las particiones de una tabla, las solicitudes de ReadChangeStream muestran un mensaje CloseStream con la información necesaria para reanudar la transmisión desde las particiones nuevas.

Para una división, esto contendrá varias particiones nuevas y un ContinuationToken para cada partición. Sigue estos pasos para reanudar la transmisión de las particiones nuevas: desde la misma posición, realizas una nueva solicitud ReadChangeStream para cada partición nueva con su token correspondiente.

Por ejemplo, si transmites la partición [A,C) y se divide en dos particiones, [A,B) y [B,C), puedes esperar la siguiente secuencia eventos:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Para reanudar la transmisión de cada partición desde la misma posición, envía el siguientes ReadChangeStreamQuery solicitudes:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Para una combinación, para reanudar desde la misma partición, debes enviar una nueva solicitud ReadChangeStream que contenga cada token de las particiones combinadas.

Por ejemplo, si transmites dos particiones, [A,B) y [B,C), y ambas fusionar en la partición [A,C), puedes esperar la siguiente secuencia de eventos:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Para reanudar la partición de transmisión [A, C) desde la misma posición, envía un ReadChangeStreamQuery de la siguiente manera:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

¿Qué sigue?