Transmite cambios con Dataflow

El conector de Bigtable Beam te permite usar Dataflow para leer los registros de cambios de datos de Bigtable sin necesidad de hacer un seguimiento ni procesar cambios de partición en tu código, ya que el conector controla esa lógica por ti.

En este documento, se describe cómo configurar y usar el conector de Beam de Bigtable para leer un flujo de cambios con una canalización de Dataflow. Antes de leer este documento, debes leer la descripción general de los flujos de cambios y familiarizarte con Dataflow.

Alternativas para crear tu propia canalización

Si no quieres compilar tu propia canalización de Dataflow, puedes usar una de las siguientes opciones.

Puedes usar una plantilla de Dataflow proporcionada por Google.

También puedes usar los ejemplos de código del instructivo o la guía de Bigtable como punto de partida para tu código.

Asegúrate de que el código que generes use la versión 26.14.0 o una posterior de google cloud libraries-bom.

Detalles del conector

El método del conector de Beam de Bigtable, BigtableIO.readChangeStream, te permite leer un flujo de registros de cambios de datos (ChangeStreamMutation) que puedes procesar. El conector de Bigtable Beam es un componente del repositorio de GitHub de Apache Beam. Para obtener una descripción del código del conector, consulta los comentarios en BigtableIO.java.

Debes usar el conector con la versión 2.48.0 de Beam o una posterior. Consulta la compatibilidad con el entorno de ejecución de Apache Beam para asegurarte de usar una versión compatible de Java. Luego, puedes implementar una canalización que use el conector para Dataflow, que controla el aprovisionamiento y la administración de recursos, y ayuda con la escalabilidad y la confiabilidad del procesamiento de datos de transmisión.

Para obtener más información sobre el modelo de programación de Apache Beam, consulta la documentación de Beam.

Cómo agrupar datos sin horas de eventos

Los registros de cambios de datos que se transmiten con el conector de Beam de Bigtable no son compatibles con las funciones de Dataflow que dependen de los tiempos de los eventos.

Como se explica en Replicación y marcas de agua, es posible que una marca de agua baja no avance si la replicación de la partición no alcanzó al resto de la instancia. Cuando una marca de agua baja deja de avanzar, puede hacer que se detenga el flujo de cambios.

Para evitar que la transmisión se detenga, el conector de Beam de Bigtable genera todos los datos con una marca de tiempo de salida de cero. La marca de tiempo cero hace que Dataflow considere que todos los registros de cambios de datos son datos tardíos. Como resultado, las funciones de Dataflow que dependen de los tiempos de eventos no son compatibles con los flujos de cambios de Bigtable. Específicamente, no puedes usar funciones de ventanas, activadores de tiempo de evento ni temporizadores de tiempo de evento.

En su lugar, puedes usar GlobalWindows con activadores de tiempo que no sean de eventos para agrupar estos datos tardíos en paneles, como se muestra en el ejemplo del instructivo. Para obtener detalles sobre los activadores y los paneles, consulta Activadores en la guía de programación de Beam.

Ajuste de escala automático

El conector admite el ajuste de escala automático de Dataflow, que se habilita de forma predeterminada cuando se usa Runner v2 (obligatorio). El algoritmo de ajuste de escala automático de Dataflow tiene en cuenta el retraso estimado del flujo de cambios, que se puede supervisar en la página Supervisión de Dataflow, en la sección Backlog. Usa la marca --maxNumWorkers cuando implementes un trabajo para limitar la cantidad de trabajadores.

Para escalar tu canalización de forma manual en lugar de usar el ajuste de escala automático, consulta Escala una canalización de transmisión de forma manual.

Limitaciones

Ten en cuenta las siguientes limitaciones antes de usar el conector de Bigtable Beam con Dataflow.

Dataflow Runner v2

El conector solo se puede ejecutar con Dataflow Runner v2. Para habilitar esto, especifica --experiments=use_runner_v2 en tus argumentos de línea de comandos. Ejecutar con Runner v1 hace que tu canalización falle con la siguiente excepción:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Instantáneas

El conector no admite instantáneas de Dataflow.

Duplicados

El conector de Bigtable Beam transmite cambios para cada clave de fila y cada clúster en orden de marca de tiempo de confirmación, pero, como a veces se reinicia desde momentos anteriores en la transmisión, puede producir duplicados.

Antes de comenzar

Antes de usar el conector, completa los siguientes requisitos previos.

Configura la autenticación

Para usar las muestras de Java de esta página en un entorno de desarrollo local, instala e inicializa gcloud CLI y, luego, configura las credenciales predeterminadas de la aplicación con tus credenciales de usuario.

  1. Install the Google Cloud CLI.
  2. To initialize the gcloud CLI, run the following command:

    gcloud init
  3. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

Para obtener más información, consulta Set up authentication for a local development environment.

Si quieres obtener información sobre cómo configurar la autenticación para un entorno de producción, consulta Set up Application Default Credentials for code running on Google Cloud.

Habilita un flujo de cambios

Debes habilitar un flujo de cambios en una tabla para poder leerla. También puedes crear una tabla nueva con las flujos de cambios habilitadas.

Cambia la tabla de metadatos del flujo

Cuando transmites cambios con Dataflow, el conector de Bigtable Beam crea una tabla de metadatos que se llama __change_stream_md_table de forma predeterminada. La tabla de metadatos del flujo de cambios administra el estado operativo del conector y almacena metadatos sobre los registros de cambios de datos.

De forma predeterminada, el conector crea la tabla en la misma instancia que la tabla que se transmite. Para garantizar que la tabla funcione correctamente, el perfil de app de la tabla de metadatos debe usar el enrutamiento de un solo clúster y tener habilitadas las transacciones de fila única.

Para obtener más información sobre la transmisión de cambios desde Bigtable con el conector de Bigtable Beam, consulta la documentación de BigtableIO.

Roles obligatorios

Para obtener los permisos que necesitas para leer un flujo de cambios de Bigtable con Dataflow, pídele a tu administrador que te otorgue los siguientes roles de IAM.

Para leer los cambios de Bigtable, necesitas este rol:

  • Administrador de Bigtable (roles/bigtable.admin) en la instancia de Bigtable que contiene la tabla de la que deseas transmitir cambios

Para ejecutar el trabajo de Dataflow, necesitas los siguientes roles:

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso.

También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.

Agrega el conector de Bigtable Beam como dependencia

Agrega código similar a la siguiente dependencia a tu archivo pom.xml de Maven. La versión debe ser 2.48.0 o posterior.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Cómo leer el flujo de cambios

Para compilar una canalización de Dataflow que lea tus registros de cambios de datos, debes configurar el conector y, luego, agregar transformaciones y destinos. Luego, usa el conector para leer objetos ChangeStreamMutation en una canalización de Beam.

En los ejemplos de código de esta sección, escritos en Java, se muestra cómo compilar una canalización y usarla para convertir pares clave-valor en una cadena. Cada par consta de una clave de fila y un objeto ChangeStreamMutation. La canalización convierte las entradas de cada objeto en una cadena separada por comas.

Compila la canalización

En esta muestra de código Java, se muestra cómo compilar la canalización:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

Procesa los registros de cambios de datos

En este ejemplo, se muestra cómo recorrer todas las entradas de un registro de cambios de datos para una fila y llamar a un método de conversión a cadena según el tipo de entrada.

Para obtener una lista de los tipos de entradas que puede contener un registro de cambios de datos, consulta Qué contiene un registro de cambios de datos.

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

En este ejemplo, se convierte una entrada de write:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

En este ejemplo, se convierte una entrada de eliminación de celdas:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

En este ejemplo, se convierte una entrada de borrado de una familia de columnas:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

Supervisar

Los siguientes recursos de la consola de Google Cloud te permiten supervisar tus recursos de Google Cloud mientras ejecutas una canalización de Dataflow para leer un flujo de cambios de Bigtable:

En particular, verifica las siguientes métricas:

  • En la página Supervisión de Bigtable, verifica estas métricas:
    • Datos de uso de CPU por flujos de cambios en la métrica cpu_load_by_app_profile_by_method_by_table. Muestra el impacto del flujo de cambios en el uso de la CPU de tu clúster.
    • Uso del almacenamiento del flujo de cambios (bytes) (change_stream_log_used_bytes).
  • En la página de supervisión de Dataflow, verifica la actualización de los datos, que muestra la diferencia entre la hora actual y la marca de agua. Debería ser de alrededor de dos minutos, con picos ocasionales de un minuto o dos más. Si la métrica de actualización de datos es constantemente superior a ese umbral, es probable que tu canalización tenga pocos recursos y que debas agregar más trabajadores de Dataflow. La actualización de los datos no indica si los registros de cambios de datos se procesan lentamente.
  • La métrica processing_delay_from_commit_timestamp_MEAN de Dataflow puede indicarte el tiempo de procesamiento promedio de los registros de cambios de datos durante el ciclo de vida del trabajo.

La métrica server/latencies de Bigtable no es útil cuando supervisas una canalización de Dataflow que lee un flujo de cambios de Bigtable, ya que refleja la duración de la solicitud de transmisión, no la latencia de procesamiento del registro de cambios de datos. Una latencia alta en un flujo de cambios no significa que las solicitudes se procesen con lentitud, sino que la conexión estuvo abierta durante ese tiempo.

¿Qué sigue?