Conector Beam de HBase de Bigtable

Para ayudarte a usar Bigtable en una pipeline de Dataflow, tienes a tu disposición dos conectores de E/S de Bigtable Beam de código abierto.

Si vas a migrar de HBase a Bigtable o tu aplicación llama a la API de HBase, usa el conector de Bigtable HBase Beam (CloudBigtableIO) que se describe en esta página.

En el resto de los casos, debes usar el conector de Beam de Bigtable (BigtableIO) junto con el cliente de Cloud Bigtable para Java, que funciona con las APIs de Cloud Bigtable. Para empezar a usar ese conector, consulta el artículo Conector de Bigtable para Beam.

Para obtener más información sobre el modelo de programación de Apache Beam, consulta la documentación de Beam.

Empezar a usar HBase

El conector de Bigtable HBase Beam está escrito en Java y se basa en el cliente de HBase de Bigtable para Java. Es compatible con el SDK de Dataflow 2.x para Java, que se basa en Apache Beam. El código fuente del conector se encuentra en GitHub, en el repositorio googleapis/java-bigtable-hbase.

En esta página se ofrece una descripción general de cómo usar las transformaciones Read y Write.

Configurar la autenticación

Para usar las Java muestras de esta página en un entorno de desarrollo local, instala e inicializa la CLI de gcloud y, a continuación, configura las credenciales predeterminadas de la aplicación con tus credenciales de usuario.

    Instala Google Cloud CLI.

    Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.

    If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

Para obtener más información, consulta Set up authentication for a local development environment.

Para obtener información sobre cómo configurar la autenticación en un entorno de producción, consulta Set up Application Default Credentials for code running on Google Cloud.

Añadir el conector a un proyecto de Maven

Para añadir el conector Bigtable HBase Beam a un proyecto de Maven, añade el artefacto de Maven a tu archivo pom.xml como dependencia:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>2.12.0</version>
</dependency>

Especificar la configuración de Bigtable

Crea una interfaz de opciones para permitir entradas que ejecuten tu flujo de procesamiento:

public interface BigtableOptions extends DataflowPipelineOptions {

  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("mobile-time-series")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

Cuando lees o escribes en Bigtable, debes proporcionar un objeto de configuración CloudBigtableConfiguration. Este objeto especifica el ID del proyecto y el ID de la instancia de la tabla, así como el nombre de la tabla:

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

Para leer, proporciona un objeto de configuración CloudBigtableScanConfiguration, que te permite especificar un objeto Scan de Apache HBase que limita y filtra los resultados de una lectura. Consulta más información sobre lectura de Bigtable.

Leer desde Bigtable

Para leer datos de una tabla de Bigtable, aplica una transformación Read al resultado de una operación CloudBigtableIO.read. La transformación Read devuelve un PCollection de objetos Result de HBase, donde cada elemento de PCollection representa una sola fila de la tabla.

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

De forma predeterminada, una operación CloudBigtableIO.read devuelve todas las filas de la tabla. Puedes usar un objeto Scan de HBase para limitar la lectura a un intervalo de claves de fila de tu tabla o para aplicar filtros a los resultados de la lectura. Para usar un objeto Scan, inclúyelo en tu CloudBigtableScanConfiguration.

Por ejemplo, puedes añadir una Scan que devuelva solo el primer par clave-valor de cada fila de la tabla, lo que resulta útil para contar el número de filas de la tabla:

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

Escribir en Bigtable

Para escribir en una tabla de Bigtable, debes realizar una operación applya CloudBigtableIO.writeToTable. Deberás realizar esta operación en un PCollection de objetos HBase Mutation, que pueden incluir objetos Put y Delete.

La tabla de Bigtable ya debe existir y debe tener definidas las familias de columnas adecuadas. El conector Dataflow no crea tablas ni familias de columnas sobre la marcha. Puedes usar la CLI cbt para crear una tabla y configurar familias de columnas, o bien puedes hacerlo de forma programática.

Antes de escribir en Bigtable, debes crear tu flujo de procesamiento de Dataflow para que las inserciones y las eliminaciones se puedan serializar en la red:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

Por lo general, tendrás que realizar una transformación, como un ParDo, para dar formato a tus datos de salida en una colección de objetos Put o Delete de HBase. En el siguiente ejemplo se muestra una transformación DoFn que toma el valor actual y lo usa como clave de fila de un Put. Después, puedes escribir los objetos Put en Bigtable.

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

Para habilitar el control de flujo de escritura por lotes, asigna el valor true a BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL. Esta función limita automáticamente la velocidad del tráfico de las solicitudes de escritura por lotes y permite que el autoescalado de Bigtable añada o quite nodos automáticamente para gestionar tu tarea de Dataflow.

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
            "true")
        .build();
return bigtableTableConfig;

Aquí tienes el ejemplo de escritura completo, incluida la variación que permite el control del flujo de escritura por lotes.


import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {

  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {

    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }

  public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
      BigtableOptions options) {
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
                "true")
            .build();
    return bigtableTableConfig;
  }
}