Bigtable HBase Beam 連接器

為協助您在 Dataflow 管道中使用 Bigtable,我們提供兩個開放原始碼的 Bigtable Beam I/O 連接器。

如果您要從 HBase 遷移至 Bigtable,或是應用程式會呼叫 HBase API,請使用本頁討論的 Bigtable HBase Beam 連接器 (CloudBigtableIO)。

在所有其他情況下,您都應搭配使用 Bigtable Beam 連接器 (BigtableIO) 和適用於 Java 的 Cloud Bigtable 用戶端,後者可與 Cloud Bigtable API 搭配使用。如要開始使用該連接器,請參閱「Bigtable Beam 連接器」。

如要進一步瞭解 Apache Beam 程式設計模型,請參閱 Beam 說明文件

開始使用 HBase

Bigtable HBase Beam 連接器是以 Java 編寫,並建構在適用 Java 的 Bigtable HBase 用戶端上。其與基於 Apache Beam 的 Java Dataflow SDK 2.x 相容。連接器的原始碼可於 GitHub 的 googleapis/java-bigtable-hbase 存放區找到。

本頁概述如何使用 ReadWrite 轉換。

設定驗證方法

如要在本機開發環境中使用本頁的 Java 範例,請安裝並初始化 gcloud CLI,然後使用使用者憑證設定應用程式預設憑證。

  1. Install the Google Cloud CLI.

  2. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

詳情請參閱 Set up authentication for a local development environment

如要瞭解如何設定正式版環境的驗證作業,請參閱 Set up Application Default Credentials for code running on Google Cloud

將連接器新增至 Maven 專案

如要將 Bigtable HBase Beam 連接器新增至 Maven 專案,請在 pom.xml 檔案中新增 Maven 構件做為依附元件:

<dependency>
  <groupId>com.google.cloud.bigtable</groupId>
  <artifactId>bigtable-hbase-beam</artifactId>
  <version>2.12.0</version>
</dependency>

指定 Bigtable 設定

建立選項介面,允許輸入內容來執行管道:

public interface BigtableOptions extends DataflowPipelineOptions {

  @Description("The Bigtable project ID, this can be different than your Dataflow project")
  @Default.String("bigtable-project")
  String getBigtableProjectId();

  void setBigtableProjectId(String bigtableProjectId);

  @Description("The Bigtable instance ID")
  @Default.String("bigtable-instance")
  String getBigtableInstanceId();

  void setBigtableInstanceId(String bigtableInstanceId);

  @Description("The Bigtable table ID in the instance.")
  @Default.String("mobile-time-series")
  String getBigtableTableId();

  void setBigtableTableId(String bigtableTableId);
}

您在讀取或寫入 Bigtable 時,必須提供 CloudBigtableConfiguration 設定物件。此物件指定您資料表的專案 ID 與執行個體 ID,以及資料表本身的名稱:

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .build();

如要讀取資料,請提供 CloudBigtableScanConfiguration 設定物件,指定會限制及篩選讀取結果的 Apache HBase Scan 物件。詳情請參閱「從 Bigtable 讀取資料」。

從 Bigtable 讀取資料

如要讀取 Bigtable 資料表,請將 Read 轉換套用到 CloudBigtableIO.read 作業的結果。Read 轉換會傳回 HBase Result 物件的 PCollection,其中 PCollection 中的每個元素都代表表格中的單一資料列。

p.apply(Read.from(CloudBigtableIO.read(config)))
    .apply(
        ParDo.of(
            new DoFn<Result, Void>() {
              @ProcessElement
              public void processElement(@Element Result row, OutputReceiver<Void> out) {
                System.out.println(Bytes.toString(row.getRow()));
              }
            }));

根據預設,CloudBigtableIO.read 作業會傳回資料表中所有的資料列。您可以使用 HBase Scan 物件,將讀取作業限制在資料表中特定的資料列索引鍵範圍,或是將篩選器套用到讀取作業的結果。如要使用 Scan 物件,請將其納入您的 CloudBigtableScanConfiguration

舉例來說,您可以新增 Scan,這個物件只會傳回資料表中各列的第一個鍵/值組合,這可在計算資料表的列數時派上用場:

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldRead {
  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withScan(scan)
            .build();

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(
            ParDo.of(
                new DoFn<Result, Void>() {
                  @ProcessElement
                  public void processElement(@Element Result row, OutputReceiver<Void> out) {
                    System.out.println(Bytes.toString(row.getRow()));
                  }
                }));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {
    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }
}

寫入 Bigtable

如要寫入 Bigtable 資料表,請 apply 一個 CloudBigtableIO.writeToTable 作業。您需要在 HBase Mutation 物件的 PCollection 上執行這項作業,其中可能包括 PutDelete 物件。

Bigtable 資料表必須已存在,且必須定義適當的資料欄系列。Dataflow 連接器不能即時建立資料表與資料欄系列。您可以使用 cbt CLI 建立資料表及設定資料欄系列,也可以透過程式來執行此作業。

在寫入 Bigtable 之前,您必須先建立 Dataflow 管道,以便透過網路將放置和刪除作業序列化:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

在一般情況下,您需要執行轉換 (例如 ParDo) 將輸出的資料格式化成 HBase PutDelete 物件的集合。以下範例顯示 DoFn 轉換,該轉換會取得目前值,並將其做為 Put 的資料列鍵。接著,您就可以將 Put 物件寫入 Bigtable。

p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
    .apply(
        ParDo.of(
            new DoFn<String, Mutation>() {
              @ProcessElement
              public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                long timestamp = System.currentTimeMillis();
                Put row = new Put(Bytes.toBytes(rowkey));

                row.addColumn(
                    Bytes.toBytes("stats_summary"),
                    Bytes.toBytes("os_build"),
                    timestamp,
                    Bytes.toBytes("android"));
                out.output(row);
              }
            }))
    .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

如要啟用批次寫入流程控制,請將 BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL 設為 true。這項功能會自動限制批次寫入要求的流量,並讓 Bigtable 自動調度資源功能自動新增或移除節點,以處理 Dataflow 工作。

CloudBigtableTableConfiguration bigtableTableConfig =
    new CloudBigtableTableConfiguration.Builder()
        .withProjectId(options.getBigtableProjectId())
        .withInstanceId(options.getBigtableInstanceId())
        .withTableId(options.getBigtableTableId())
        .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
            "true")
        .build();
return bigtableTableConfig;

以下是完整的寫入範例,包括可啟用批次寫入流程控制的變體。


import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HelloWorldWrite {

  public static void main(String[] args) {
    BigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
    Pipeline p = Pipeline.create(options);

    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .build();

    p.apply(Create.of("phone#4c410523#20190501", "phone#4c410523#20190502"))
        .apply(
            ParDo.of(
                new DoFn<String, Mutation>() {
                  @ProcessElement
                  public void processElement(@Element String rowkey, OutputReceiver<Mutation> out) {
                    long timestamp = System.currentTimeMillis();
                    Put row = new Put(Bytes.toBytes(rowkey));

                    row.addColumn(
                        Bytes.toBytes("stats_summary"),
                        Bytes.toBytes("os_build"),
                        timestamp,
                        Bytes.toBytes("android"));
                    out.output(row);
                  }
                }))
        .apply(CloudBigtableIO.writeToTable(bigtableTableConfig));

    p.run().waitUntilFinish();
  }

  public interface BigtableOptions extends DataflowPipelineOptions {

    @Description("The Bigtable project ID, this can be different than your Dataflow project")
    @Default.String("bigtable-project")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Bigtable instance ID")
    @Default.String("bigtable-instance")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Bigtable table ID in the instance.")
    @Default.String("mobile-time-series")
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);
  }

  public static CloudBigtableTableConfiguration batchWriteFlowControlExample(
      BigtableOptions options) {
    CloudBigtableTableConfiguration bigtableTableConfig =
        new CloudBigtableTableConfiguration.Builder()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withConfiguration(BigtableOptionsFactory.BIGTABLE_ENABLE_BULK_MUTATION_FLOW_CONTROL,
                "true")
            .build();
    return bigtableTableConfig;
  }
}