Bigtable Spark コネクタを使用する

Bigtable Spark コネクタを使用すると、Bigtable との間でデータを読み書きできます。Spark SQL と DataFrames を使用して Spark アプリケーションからデータを読み取ることができます。Bigtable Spark コネクタを使用すると、次の Bigtable オペレーションがサポートされます。

  • データを書き込む
  • データを読み取る
  • 新しいテーブルを作成する

このドキュメントでは、Spark SQL DataFrames テーブルを Bigtable テーブルに変換し、Spark ジョブを送信するための JAR ファイルをコンパイルして作成する方法を説明します。

Spark と Scala のサポート状況

Bigtable Spark コネクタは、Scala 2.12 バージョンと次の Spark バージョンのみをサポートしています。

Bigtable Spark コネクタは、次の Dataproc バージョンをサポートしています。

費用の計算

課金対象である次の Google Cloud コンポーネントのいずれかを使用する場合は、使用したリソースに対して課金されます。

  • Bigtable(Bigtable エミュレータの使用は無料)
  • Dataproc
  • Cloud Storage

Dataproc の料金は、Compute Engine クラスタでの Dataproc の使用に適用されます。Dataproc Serverless の料金は、Dataproc Serverless for Spark で実行されるワークロードとセッションに適用されます。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。

始める前に

Bigtable Spark コネクタを使用する前に、次の前提条件を満たす必要があります。

必要なロール

Bigtable Spark コネクタの使用に必要な権限を取得するには、プロジェクトに対する次の IAM ロールの付与を管理者に依頼してください。

  • Bigtable 管理者(roles/bigtable.admin)(省略可): データを読み取りまたは書き込みして、新しいテーブルを作成できます。
  • Bigtable ユーザー(roles/bigtable.user): データを読み取りまたは書き込みできますが、新しいテーブルの作成はできません。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

Dataproc または Cloud Storage を使用している場合は、追加の権限が必要になる場合があります。詳細については、Dataproc の権限Cloud Storage の権限をご覧ください。

Spark を設定する

Bigtable インスタンスを作成する以外に、Spark インスタンスも設定する必要があります。ローカルで行うか、次のいずれかのオプションを選択して、Dataproc で Spark を使用できます。

  • Dataproc クラスタ
  • Dataproc Serverless

Dataproc クラスタまたはサーバーレス オプションの選択の詳細については、Spark 用 Dataproc Serverless と Compute Engine 上の Dataproc の比較に関するドキュメントをご覧ください。

コネクタの JAR ファイルをダウンロードする

Bigtable Spark コネクタのソースコードと例は、Bigtable Spark コネクタの GitHub リポジトリにあります。

Spark の設定に基づいて、次のように JAR ファイルにアクセスできます。

  • PySpark をローカルで実行する場合は、gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar Cloud Storage の場所からコネクタの JAR ファイルをダウンロードする必要があります。

    SCALA_VERSION は Scala バージョンに置き換え、サポートされている唯一のバージョンとして 2.12 に設定します。CONNECTOR_VERSION は、使用するコネクタ バージョンに置き換えます。

  • Dataproc クラスタまたはサーバーレス オプションの場合は、Scala または Java Spark アプリケーションに追加できるアーティファクトとして最新の JAR ファイルを使用します。JAR ファイルをアーティファクトとして使用する方法については、依存関係を管理するをご覧ください。

  • PySpark ジョブを Dataproc に送信する場合は、gcloud dataproc jobs submit pyspark --jars フラグを使用して、URI を Cloud Storage 内の JAR ファイルの場所(例: gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar)に設定します。

コンピューティング タイプを決定する

読み取り専用ジョブの場合は、Data Boost(プレビュー版)サーバーレス コンピューティングを使用できます。これにより、アプリケーション サービング クラスタへの影響を回避できます。Data Boost を使用するには、Spark アプリケーションで Spark コネクタのバージョン 1.1.0 以降を使用する必要があります。

Data Boost を使用するには、Data Boost アプリ プロファイルを作成し、Bigtable 構成を Spark アプリケーションに追加するときに spark.bigtable.app_profile.id Spark オプションのアプリ プロファイル ID を指定する必要があります。Spark 読み取りジョブのアプリ プロファイルをすでに作成していて、アプリケーション コードを変更せずに引き続き使用する場合、アプリ プロファイルを Data Boost アプリ プロファイルに変換できます。詳細については、アプリ プロファイルを変換するをご覧ください。

詳細については、Bigtable Data Boost の概要をご覧ください。

読み取りと書き込みを含むジョブの場合、リクエストで標準アプリ プロファイルを指定することで、インスタンスのクラスタノードをコンピューティングに使用できます。

使用するアプリ プロファイルを特定または作成する

アプリ プロファイル ID を指定しないと、コネクタはデフォルトのアプリ プロファイルを使用します。

Spark アプリケーションを含む、実行するアプリケーションごとに一意のアプリ プロファイルを使用することをおすすめします。アプリ プロファイルの種類と設定の詳細については、アプリ プロファイルの概要をご覧ください。手順については、アプリ プロファイルの作成と構成をご覧ください。

Spark アプリケーションに Bigtable 構成を追加する

Spark アプリケーションに Bigtable を操作するための Spark オプションを追加します。

サポートされている Spark オプション

com.google.cloud.spark.bigtable パッケージの一部として利用可能な Spark オプションを使用します。

オプション名 必須 デフォルト値 意味
spark.bigtable.project.id はい なし Bigtable プロジェクト ID を設定します。
spark.bigtable.instance.id はい なし Bigtable インスタンス ID を設定します。
catalog はい なし DataFrame の SQL に似たスキーマと Bigtable テーブルのスキーマの間の変換形式を指定する JSON 形式を設定します。

詳細については、JSON 形式でテーブル メタデータを作成するをご覧ください。
spark.bigtable.app_profile.id × default Bigtable アプリ プロファイル ID を設定します。
spark.bigtable.write.timestamp.milliseconds × 現在のシステム時刻 Bigtable に DataFrame を書き込むときに使用するタイムスタンプをミリ秒単位で設定します。

DataFrame 内のすべての行が同じタイムスタンプを使用するため、DataFrame に同じ行キー列を持つ行は、同じタイムスタンプを共有するため、Bigtable では単一のバージョンとして保持されることに注意してください。
spark.bigtable.create.new.table × false Bigtable に書き込む前に新しいテーブルを作成するには、true に設定します。
spark.bigtable.read.timerange.start.milliseconds または spark.bigtable.read.timerange.end.milliseconds × なし タイムスタンプ(エポックタイムからのミリ秒単位)を設定して、それぞれ特定の開始日と終了日を持つセルをフィルタします。
spark.bigtable.push.down.row.key.filters × true サーバーサイドで単純な行キーフィルタリングを許可するには、true に設定します。複合行キーのフィルタリングはクライアントサイドに実装されています。

詳しくは、フィルタを使用して特定の DataFrame 行を読み取るをご覧ください。
spark.bigtable.read.rows.attempt.timeout.milliseconds × 30 分 Java 用 Bigtable クライアントの 1 つの DataFrame パーティションに対応する行の読み取り試行のタイムアウト時間を設定します。
spark.bigtable.read.rows.total.timeout.milliseconds × 12 時間 Java 用 Bigtable クライアントの 1 つの DataFrame パーティションに対応する行の読み取り試行の合計タイムアウト時間を設定します。
spark.bigtable.mutate.rows.attempt.timeout.milliseconds × 1m Java 用 Bigtable クライアントの 1 つの DataFrame パーティションに対応する行の変更試行のタイムアウト時間を設定します。
spark.bigtable.mutate.rows.total.timeout.milliseconds × 10 分 Java 用 Bigtable クライアントの 1 つの DataFrame パーティションに対応する行の変更試行の合計タイムアウト時間を設定します。
spark.bigtable.batch.mutate.size × 100 各バッチのミューテーションの数を設定します。設定できる最大値は 100000 です。
spark.bigtable.enable.batch_mutate.flow_control × false バッチ ミューテーションのフロー制御を有効にするには、true に設定します。

テーブル メタデータを JSON 形式で作成する

Spark SQL DataFrames テーブル形式は、JSON 形式の文字列を使用して Bigtable テーブルに変換する必要があります。この文字列 JSON 形式により、データ形式が Bigtable 互換になります。.option("catalog", catalog_json_string) オプションを使用すると、アプリケーション コードで JSON 形式を渡すことができます。

たとえば、次の DataFrame テーブルとそれに対応する Bigtable テーブルについて考えてみましょう。

以下の例では、DataFrame の name 列と birthYear 列が info 列ファミリーの下にグループ化され、名前がそれぞれ namebirth_year に変更されます。同様に、address 列は同じ列名の location 列ファミリーの下に格納されます。DataFrame の id 列は Bigtable の行キーに変換されます。

Bigtable には行キーに専用の列名がありません。この例では、id_rowkey は、これが行キー列であることをコネクタに示すためだけに使用されます。行キー列には任意の名前を使用できます。また、JSON 形式で "rowkey":"column_name" フィールドを宣言するときは、同じ名前を使用してください。

DataFrame Bigtable テーブル = t1
行キー 列ファミリー
info 場所
id name birthYear address id_rowkey name birth_year address

カタログの JSON 形式は次のとおりです。

    """
    {
      "table": {"name": "t1"},
      "rowkey": "id_rowkey",
      "columns": {
        "id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
        "name": {"cf": "info", "col": "name", "type": "string"},
        "birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
        "address": {"cf": "location", "col": "address", "type": "string"}
      }
    }
    """

JSON 形式で使用されるキーと値は次のとおりです。

カタログキー カタログ値 JSON 形式
テーブル Bigtable テーブルの名前。 "table":{"name":"t1"}

テーブルが存在しない場合は、.option("spark.bigtable.create.new.table", "true") を使用してテーブルを作成します。
rowkey Bigtable の行キーとして使用される列の名前。DataFrame 列の列名が行キーとして使用されていることを確認します(例: id_rowkey)。

複合キーは行キーとしても受け入れられます。例: "rowkey":"name:address"このアプローチをとると、すべての読み取りリクエストでテーブル全体のスキャンが必要になる行キーが発生する可能性があります。
"rowkey":"id_rowkey"
各 DataFrame 列を対応する Bigtable 列ファミリー("cf")と列名("col")にマッピングします。この列名は、DataFrame テーブルの列名と異なる場合があります。サポートされているデータ型には、stringlongbinary があります。 "columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}"

この例では、id_rowkey が行キー、infolocation が列ファミリーです。

サポートされるデータタイプ

コネクタは、カタログでの stringlongbinary(バイト配列)型の使用をサポートしています。intfloat などの他の型のサポートが追加されるまで、このようなデータ型は、コネクタを使用して Bigtable に書き込む前に、バイト配列(Spark SQL の BinaryType)に手動で変換できます。

さらに、Avro を使用して ArrayType などの複雑な型をシリアル化することもできます。詳細については、Apache Avro を使用して複雑なデータ型をシリアル化するをご覧ください。

Bigtable に書き込む

.write() 関数とサポートされているオプションを使用して、Bigtable にデータを書き込みます。

Java

GitHub リポジトリの次のコードは、Java と Maven を使用して Bigtable に書き込みます。

  String catalog = "{" +
        "\"table\":{\"name\":\"" + tableName + "\"," +
        "\"tableCoder\":\"PrimitiveType\"}," +
        "\"rowkey\":\"wordCol\"," +
        "\"columns\":{" +
        "\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
        "\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
        "}}".replaceAll("\\s+", "");



  private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
        String createNewTable) {
      dataframe
          .write()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .option("spark.bigtable.create.new.table", createNewTable)
          .save();
    }

Python

GitHub リポジトリの次のコードは、Python を使用して Bigtable に書き込みます。

  catalog = ''.join(("""{
        "table":{"name":" """ + bigtable_table_name + """
        ", "tableCoder":"PrimitiveType"},
        "rowkey":"wordCol",
        "columns":{
          "word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
          "count":{"cf":"example_family", "col":"countCol", "type":"long"}
        }
        }""").split())
  

  input_data = spark.createDataFrame(data)
  print('Created the DataFrame:')
  input_data.show()

  input_data.write \
        .format('bigtable') \
        .options(catalog=catalog) \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .option('spark.bigtable.create.new.table', create_new_table) \
        .save()
  print('DataFrame was written to Bigtable.')

  

Bigtable から読み取る

.read() 関数を使用して、テーブルが Bigtable に正常にインポートされたかどうかを確認します。

Java

  
  private static Dataset<Row> readDataframeFromBigtable(String catalog) {
      Dataset<Row> dataframe = spark
          .read()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .load();
      return dataframe;
    }

Python

  

  records = spark.read \
        .format('bigtable') \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .options(catalog=catalog) \
        .load()

  print('Reading the DataFrame from Bigtable:')
  records.show()

プロジェクトをコンパイルする

Dataproc クラスタ、Dataproc Serverless、またはローカル Spark インスタンスでジョブを実行するために使用される JAR ファイルを生成します。JAR ファイルをローカルでコンパイルしてから、それを使用してジョブを送信できます。ジョブを送信すると、コンパイル済みの JAR へのパスが PATH_TO_COMPILED_JAR 環境変数として設定されます。

この手順は PySpark アプリケーションには適用されません。

依存関係を管理する

Bigtable Spark コネクタは、次の依存関係管理ツールをサポートしています。

JAR ファイルをコンパイルする

Maven

  1. pom.xml ファイルに spark-bigtable 依存関係を追加します。

    <dependencies>
    <dependency>
      <groupId>com.google.cloud.spark.bigtable</groupId>
      <artifactId>spark-bigtable_SCALA_VERSION</artifactId>
      <version>0.1.0</version>
    </dependency>
    </dependencies>
    
  2. Maven Shade プラグインpom.xml ファイルに追加して uber JAR を作成します。

    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
    
  3. mvn clean install コマンドを実行して JAR ファイルを生成します。

sbt

  1. spark-bigtable 依存関係を build.sbt ファイルに追加します。

    libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
  2. project/plugins.sbt ファイルまたは project/assembly.sbt ファイルに sbt-assembly プラグインを追加して、Uber JAR ファイルを作成します。

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
  3. sbt clean assembly コマンドを実行して JAR ファイルを生成します。

Gradle

  1. spark-bigtable 依存関係を build.gradle ファイルに追加します。

    dependencies {
    implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0'
    }
  2. build.gradle ファイルに Shadow プラグインを追加して、uber JAR ファイルを作成します。

    plugins {
    id 'com.github.johnrengelman.shadow' version '8.1.1'
    id 'java'
    }
  3. 構成と JAR コンパイルの詳細については、Shadow プラグインのドキュメントをご覧ください。

ジョブの送信

Dataproc、Dataproc Serverless、またはローカル Spark インスタンスのいずれかを使用して Spark ジョブを送信し、アプリケーションを起動します。

ランタイム環境の設定

次の環境変数を設定します。

      #Google Cloud
      export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
      export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
      export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
      export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
      export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
      export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE

      #Dataproc Serverless
      export BIGTABLE_SPARK_SUBNET=SUBNET
      export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME

      #Scala/Java
      export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR

      #PySpark
      export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
      export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
      export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR

以下を置き換えます。

  • PROJECT_ID: Bigtable プロジェクトの永続的な識別子。
  • INSTANCE_ID: Bigtable インスタンスの永続的な識別子。
  • TABLE_NAME: テーブルの永続的な識別子。
  • DATAPROC_CLUSTER: Dataproc クラスタの永続的な識別子。
  • DATAPROC_REGION: Dataproc インスタンス内のクラスタのいずれかを含む Dataproc リージョン(例: northamerica-northeast2)。
  • DATAPROC_ZONE: Dataproc クラスタが実行されるゾーン。
  • SUBNET: サブネットの完全なリソースパス。
  • GCS_BUCKET_NAME: Spark ワークロードの依存関係をアップロードするための Cloud Storage バケット。
  • PATH_TO_COMPILED_JAR: コンパイルされた JAR へのフルパスまたは相対パス(Maven の場合は /path/to/project/root/target/<compiled_JAR_name>)。
  • GCS_PATH_TO_CONNECTOR_JAR: spark-bigtable_SCALA_VERSION_CONNECTOR_VERSION.jar ファイルが配置されている gs://spark-lib/bigtable Cloud Storage バケット。
  • PATH_TO_PYTHON_FILE: PySpark アプリケーションの場合、Bigtable に対するデータの書き込みと読み取りに使用される Python ファイルへのパス。
  • LOCAL_PATH_TO_CONNECTOR_JAR: PySpark アプリケーションの場合は、ダウンロードされた Bigtable Spark コネクタの JAR ファイルへのパス。

Spark ジョブの送信

Dataproc インスタンスまたはローカルの Spark 設定では、Spark ジョブを実行してデータを Bigtable にアップロードします。

Dataproc クラスタ

コンパイルされた JAR ファイルを使用して、Bigtable との間でデータの読み取りと書き込みを行う Dataproc クラスタジョブを作成します。

  1. Dataproc クラスタを作成します。次の例は、Debian 10、2 つのワーカーノード、デフォルト構成を使用して Dataproc v2.0 クラスタを作成するサンプル コマンドを示しています。

    gcloud dataproc clusters create \
      $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \
      --zone $BIGTABLE_SPARK_DATAPROC_ZONE \
      --master-machine-type n2-standard-4 --master-boot-disk-size 500 \
      --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \
      --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
    
  2. ジョブの送信

    Scala / Java

    次の例は、DataFrame でテストテーブルを作成し、テーブルを Bigtable に書き込み、テーブル内の単語数をカウントするロジックを含む spark.bigtable.example.WordCount クラスを示しています。

        gcloud dataproc jobs submit spark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --class=spark.bigtable.example.WordCount \
        --jar=$PATH_TO_COMPILED_JAR \
        -- \
        $BIGTABLE_SPARK_PROJECT_ID \
        $BIGTABLE_SPARK_INSTANCE_ID \
        $BIGTABLE_SPARK_TABLE_NAME \
    

    PySpark

        gcloud dataproc jobs submit pyspark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --jars=$GCS_PATH_TO_CONNECTOR_JAR \
        --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
        $PATH_TO_PYTHON_FILE \
        -- \
        --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
        --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
        --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
    

Dataproc Serverless

コンパイルされた JAR ファイルを使用して、Dataproc Serverless インスタンスで Bigtable との間でデータの読み取りと書き込みを行う Dataproc ジョブを作成します。

Scala / Java

  gcloud dataproc batches submit spark \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
  --  \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
  --jars=$GCS_PATH_TO_CONNECTOR_JAR \
  --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
  -- \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

ローカル Spark

ダウンロードした JAR ファイルを使用して、ローカルの Spark インスタンスで Bigtable との間でデータの読み取りと書き込みを行う Spark ジョブを作成します。Bigtable エミュレータを使用して Spark ジョブを送信することもできます。

Bigtable エミュレータを使用する

Bigtable エミュレータを使用する場合は、次の手順を行います。

  1. 次のコマンドを実行して、エミュレータを起動します。

    gcloud beta emulators bigtable start
    

    デフォルトでは、エミュレータは localhost:8086 を選択します。

  2. BIGTABLE_EMULATOR_HOST 環境変数を設定します。

    export BIGTABLE_EMULATOR_HOST=localhost:8086
    
  3. Spark ジョブを送信します。

Bigtable エミュレータの使用について詳しくは、エミュレータを使用してテストするをご覧ください。

Spark ジョブの送信

ローカル Bigtable エミュレータを使用しているかどうかにかかわらず、spark-submit コマンドを使用して Spark ジョブを送信します。

Scala / Java

  spark-submit $PATH_TO_COMPILED_JAR \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  spark-submit \
  --jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
  --packages=org.slf4j:slf4j-reload4j:1.7.36 \
  $PATH_TO_PYTHON_FILE \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

テーブルデータを確認する

次の cbt CLI コマンドを実行して、データが Bigtable に書き込まれていることを確認します。cbt CLI は、Google Cloud CLI のコンポーネントです。詳細については、cbt CLI の概要をご覧ください。

    cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
    read $BIGTABLE_SPARK_TABLE_NAME

追加ソリューション

複雑な Spark SQL 型のシリアル化、特定の行の読み取り、クライアントサイド指標の生成など、特定のソリューションに Bigtable Spark コネクタを使用します。

フィルタを使用して特定の DataFrame 行を読み取る

DataFrame を使用して Bigtable から読み取る場合は、特定の行のみを読み取るようにフィルタを指定できます。テーブル全体のスキャンを回避するために、行キー列の ==<=startsWith などの単純なフィルタはサーバー側で適用されます。複合行キーのフィルタや、行キー列の LIKE フィルタなどの複雑なフィルタは、クライアント側で適用されます。

大きいテーブルを読み取る場合は、テーブル全体のスキャンを実行しないように、シンプルな行キーフィルタを使用することをおすすめします。次のサンプル ステートメントは、シンプルなフィルタを使用して読み取る方法を示しています。Spark フィルタでは、行キーに変換された DataFrame 列の名前を使用していることを確認してください。

    dataframe.filter("id == 'some_id'").show()
  

フィルタを適用する際は、Bigtable テーブルの列名ではなく DataFrame 列名を使用します。

Apache Avro を使用して複雑なデータ型をシリアル化する

Bigtable Spark コネクタは、Apache Avro を使用した複雑な Spark SQL 型ArrayTypeMapTypeStructType など)のシリアル化のサポートを提供します。Apache Avro は、複雑なデータ構造の処理と保存によく使用されているレコードデータのデータシリアル化を提供します。

"avro":"avroSchema" などの構文を使用して、Bigtable 内の列を Avro でエンコードするように指定します。次に、Bigtable に対して読み取りまたは書き込みを行う際に .option("avroSchema", avroSchemaString) を使用すると、その列に対応する Avro スキーマを文字列形式で指定できます。異なるオプション名を使用できます。たとえば、異なる列に "anotherAvroSchema" を使用し、複数の列に Avro スキーマを渡すことができます。

def catalogWithAvroColumn = s"""{
                    |"table":{"name":"ExampleAvroTable"},
                    |"rowkey":"key",
                    |"columns":{
                    |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                    |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
                    |}
                    |}""".stripMargin

クライアントサイドの指標を使用する

Bigtable Spark コネクタは Java 用 Bigtable クライアントをベースにしているため、クライアントサイドの指標はデフォルトでコネクタ内で有効になっています。これらの指標へのアクセスと解釈の詳細については、クライアントサイドの指標のドキュメントをご覧ください。

低レベルの RDD 関数で Java 用 Bigtable クライアントを使用する

Bigtable Spark コネクタは Java 用 Bigtable クライアントをベースにしているため、Spark アプリケーションでクライアントを直接使用して、mapPartitionsforeachPartition などのような低レベルの RDD 関数内で分散された読み取りまたは書き込みのリクエストを実行できます。

Java 用 Bigtable クライアントのクラスを使用するには、パッケージ名に com.google.cloud.spark.bigtable.repackaged 接頭辞を追加します。たとえば、クラス名は com.google.cloud.bigtable.data.v2.BigtableDataClient ではなく com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient を使用します。

Java 用 Bigtable クライアントの詳細については、Java 用 Bigtable クライアントをご覧ください。

次のステップ