使用 Bigtable Spark 連接器

您可以使用 Bigtable Spark 連接器,在 Bigtable 中讀取和寫入資料。您可以使用 Spark SQL 和 DataFrame 從 Spark 應用程式讀取資料。Bigtable Spark 連接器支援下列 Bigtable 作業:

  • 寫入資料
  • 讀取資料
  • 建立新的資料表

本文說明如何將 Spark SQL DataFrames 資料表轉換為 Bigtable 資料表,然後編譯及建立 JAR 檔案,以提交 Spark 工作。

Spark 和 Scala 支援狀態

Bigtable Spark 連接器支援下列 Scala 版本:

Bigtable Spark 連接器支援下列 Spark 版本:

Bigtable Spark 連接器支援下列 Dataproc 版本:

計算費用

如果您決定使用 Google Cloud的任何計費元件,系統會針對您使用的資源向您收費:

  • Bigtable (使用 Bigtable 模擬器不會產生費用)
  • Dataproc
  • Cloud Storage

使用 Dataproc on Compute Engine 叢集時,適用 Dataproc 定價。在 Dataproc Serverless for Spark 上執行的工作負載和工作階段,適用 Dataproc Serverless 定價

如要根據預測用量估算費用,請使用 Pricing Calculator

事前準備

使用 Bigtable Spark 連接器前,請先完成下列先決條件。

必要的角色

如要取得使用 Bigtable Spark 連接器所需的權限,請要求管理員為您授予專案的下列 IAM 角色:

  • Bigtable 管理員 (roles/bigtable.admin)(選用): 可讓您讀取或寫入資料,以及建立新資料表。
  • Bigtable 使用者 (roles/bigtable.user): 可讀取或寫入資料,但無法建立新資料表。

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

您或許還可透過自訂角色或其他預先定義的角色取得必要權限。

如果您使用 Dataproc 或 Cloud Storage,可能需要額外權限。詳情請參閱「Dataproc 權限」和「Cloud Storage 權限」。

設定 Spark

除了建立 Bigtable 執行個體,您還需要設定 Spark 執行個體。您可以在本機執行這項操作,也可以選取下列任一選項,搭配 Dataproc 使用 Spark:

  • Dataproc 叢集
  • Dataproc Serverless

如要進一步瞭解如何選擇 Dataproc 叢集或無伺服器選項,請參閱「Dataproc Serverless for Spark 與 Dataproc on Compute Engine 比較 」說明文件。

下載連接器 JAR 檔案

您可以在 Bigtable Spark 連接器 GitHub 存放區中,找到 Bigtable Spark 連接器原始碼和範例。

視 Spark 設定而定,您可以透過下列方式存取 JAR 檔案:

  • 如果您在本機執行 PySpark,請從 gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar Cloud Storage 位置下載連接器的 JAR 檔案。

    SCALA_VERSION 替換為 2.122.13 (僅支援這兩個 Scala 版本),並將 CONNECTOR_VERSION 替換為要使用的連接器版本。

  • 如果是 Dataproc 叢集或無伺服器選項,請使用最新的 JAR 檔案做為構件,新增至 Scala 或 Java Spark 應用程式。如要進一步瞭解如何將 JAR 檔案做為構件使用,請參閱「管理依附元件」。

  • 如要將 PySpark 工作提交至 Dataproc,請使用 gcloud dataproc jobs submit pyspark --jars 旗標,將 URI 設為 Cloud Storage 中的 JAR 檔案位置,例如 gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar

判斷運算類型

如果是唯讀工作,您可以使用 Data Boost 無伺服器運算,避免影響應用程式服務叢集。如要使用 Data Boost,Spark 應用程式必須使用 1.1.0 以上版本的 Spark 連接器。

如要使用 Data Boost,您必須建立 Data Boost 應用程式設定檔,然後在將 Bigtable 設定新增至 Spark 應用程式時,為 spark.bigtable.app_profile.id Spark 選項提供應用程式設定檔 ID。如果您已為 Spark 讀取作業建立應用程式設定檔,並想繼續使用該設定檔,但不想變更應用程式程式碼,可以將應用程式設定檔轉換為 Data Boost 應用程式設定檔。詳情請參閱「轉換應用程式設定檔」。

詳情請參閱 Bigtable Data Boost 總覽

對於涉及讀取和寫入的工作,您可以透過在要求中指定標準應用程式設定檔,使用執行個體的叢集節點進行運算。

找出或建立要使用的應用程式設定檔

如果您未指定應用程式設定檔 ID,連接器會使用預設應用程式設定檔。

建議您為執行的每個應用程式 (包括 Spark 應用程式) 使用專屬的應用程式設定檔。如要進一步瞭解應用程式設定檔類型和設定,請參閱「應用程式設定檔總覽」。如需操作說明,請參閱「建立及設定應用程式設定檔」。

在 Spark 應用程式中新增 Bigtable 設定

在 Spark 應用程式中,新增可與 Bigtable 互動的 Spark 選項。

支援的 Spark 選項

使用 com.google.cloud.spark.bigtable 套件提供的 Spark 選項。

選項名稱 必填 預設值 意義
spark.bigtable.project.id 不適用 設定 Bigtable 專案 ID。
spark.bigtable.instance.id 不適用 設定 Bigtable 執行個體 ID。
catalog 不適用 設定 JSON 格式,指定 DataFrame 的類 SQL 結構定義與 Bigtable 資料表的結構定義之間的轉換格式。

詳情請參閱「以 JSON 格式建立資料表的中繼資料」。
spark.bigtable.app_profile.id default 設定 Bigtable 應用程式設定檔 ID。
spark.bigtable.write.timestamp.milliseconds 目前系統時間 設定將 DataFrame 寫入 Bigtable 時要使用的時間戳記 (以毫秒為單位)。

請注意,由於 DataFrame 中的所有資料列都使用相同的時間戳記,因此 DataFrame 中具有相同資料列鍵資料欄的資料列會以單一版本形式保留在 Bigtable 中,因為這些資料列共用相同的時間戳記。
spark.bigtable.create.new.table false 設為 true,即可在寫入 Bigtable 前建立新資料表。
spark.bigtable.read.timerange.start.millisecondsspark.bigtable.read.timerange.end.milliseconds 不適用 設定時間戳記 (以紀元時間後的毫秒數為單位),分別篩選出特定開始日期和結束日期的儲存格。
spark.bigtable.push.down.row.key.filters true 設為 true,即可在伺服器端進行簡單的資料列鍵篩選。複合資料列鍵的篩選作業是在用戶端實作。

詳情請參閱「使用篩選條件讀取特定 DataFrame 資料列」。
spark.bigtable.read.rows.attempt.timeout.milliseconds 30m 在 Java 適用的 Bigtable 用戶端中,為對應至一個 DataFrame 分區的讀取資料列嘗試設定 timeout 持續時間。
spark.bigtable.read.rows.total.timeout.milliseconds 12 小時 在 Java 適用的 Bigtable 用戶端中,為對應至一個 DataFrame 分區的讀取資料列嘗試設定逾時時間長度。
spark.bigtable.mutate.rows.attempt.timeout.milliseconds 1 個月 在 Java 適用的 Bigtable 用戶端中,為對應至一個 DataFrame 分區的突變資料列嘗試設定 timeout 持續時間。
spark.bigtable.mutate.rows.total.timeout.milliseconds 10 分鐘 在 Java 適用的 Bigtable 用戶端中,為對應至一個 DataFrame 分區的變動資料列嘗試作業,設定 total 超時持續時間。
spark.bigtable.batch.mutate.size 100 設為每個批次中的異動次數。可設定的最大值為 100000
spark.bigtable.enable.batch_mutate.flow_control false 設為 true,即可為批次變動啟用流量控制

以 JSON 格式建立資料表的中繼資料

Spark SQL DataFrames 資料表格式必須使用 JSON 格式的字串轉換為 Bigtable 資料表。這個字串 JSON 格式可讓資料格式與 Bigtable 相容。您可以使用 .option("catalog", catalog_json_string) 選項,在應用程式程式碼中傳遞 JSON 格式。

舉例來說,請參考下列 DataFrame 資料表和對應的 Bigtable 資料表。

在這個範例中,DataFrame 中的 namebirthYear 資料欄會歸類在 info 資料欄系列下,並分別重新命名為 namebirth_year。同樣地,address 資料欄會儲存在 location 資料欄系列中,並使用相同的資料欄名稱。DataFrame 中的「id」資料欄會轉換為 Bigtable 資料列鍵。

在 Bigtable 中,資料列鍵沒有專屬的資料欄名稱,在本範例中,id_rowkey 僅用於向連接器指出這是資料列鍵資料欄。您可以為資料列鍵欄使用任何名稱,並確保在 JSON 格式中宣告 "rowkey":"column_name" 欄位時使用相同名稱。

DataFrame Bigtable table = t1
資料列索引鍵 資料欄系列
資訊 地點
id name birthYear address id_rowkey name birth_year address

目錄的 JSON 格式如下:

    """
    {
      "table": {"name": "t1"},
      "rowkey": "id_rowkey",
      "columns": {
        "id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
        "name": {"cf": "info", "col": "name", "type": "string"},
        "birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
        "address": {"cf": "location", "col": "address", "type": "string"}
      }
    }
    """

JSON 格式中使用的鍵和值如下:

目錄鍵 目錄值 JSON 格式
資料表 Bigtable 資料表的名稱。 "table":{"name":"t1"}

如果資料表不存在,請使用 .option("spark.bigtable.create.new.table", "true") 建立資料表。
rowkey 做為 Bigtable 資料列鍵的資料欄名稱。請確保 DataFrame 資料欄的資料欄名稱會做為資料列索引鍵,例如 id_rowkey

複合鍵也可做為資料列鍵。例如:"rowkey":"name:address"。這種做法可能會導致資料列鍵需要完整掃描資料表,才能處理所有讀取要求。
"rowkey":"id_rowkey"
將每個 DataFrame 資料欄對應至相應的 Bigtable 資料欄系列 ("cf") 和資料欄名稱 ("col")。資料欄名稱可以與 DataFrame 表格中的資料欄名稱不同。支援的資料類型包括 stringlongbinary "columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}"

在本例中,id_rowkey 是資料列鍵,而 infolocation 是資料欄系列。

支援的資料類型

連接器支援在目錄中使用 stringlongbinary (位元組陣列) 類型。在新增對 intfloat 等其他類型的支援之前,您可以先手動將這類資料類型轉換為位元組陣列 (Spark SQL 的 BinaryType),再使用連接器將其寫入 Bigtable。

此外,您可以使用 Avro 序列化複雜型別,例如 ArrayType。詳情請參閱「使用 Apache Avro 序列化複雜資料類型」。

寫入 Bigtable

使用 .write() 函式和支援的選項,將資料寫入 Bigtable。

Java

GitHub 存放區中的下列程式碼使用 Java 和 Maven 寫入 Bigtable。

  String catalog = "{" +
        "\"table\":{\"name\":\"" + tableName + "\"," +
        "\"tableCoder\":\"PrimitiveType\"}," +
        "\"rowkey\":\"wordCol\"," +
        "\"columns\":{" +
        "\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
        "\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
        "}}".replaceAll("\\s+", "");



  private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
        String createNewTable) {
      dataframe
          .write()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .option("spark.bigtable.create.new.table", createNewTable)
          .save();
    }

Python

GitHub 存放區中的下列程式碼會使用 Python 寫入 Bigtable。

  catalog = ''.join(("""{
        "table":{"name":" """ + bigtable_table_name + """
        ", "tableCoder":"PrimitiveType"},
        "rowkey":"wordCol",
        "columns":{
          "word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
          "count":{"cf":"example_family", "col":"countCol", "type":"long"}
        }
        }""").split())
  

  input_data = spark.createDataFrame(data)
  print('Created the DataFrame:')
  input_data.show()

  input_data.write \
        .format('bigtable') \
        .options(catalog=catalog) \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .option('spark.bigtable.create.new.table', create_new_table) \
        .save()
  print('DataFrame was written to Bigtable.')

  

從 Bigtable 讀取資料

使用 .read() 函式檢查資料表是否已成功匯入 Bigtable。

Java

  
  private static Dataset<Row> readDataframeFromBigtable(String catalog) {
      Dataset<Row> dataframe = spark
          .read()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .load();
      return dataframe;
    }

Python

  

  records = spark.read \
        .format('bigtable') \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .options(catalog=catalog) \
        .load()

  print('Reading the DataFrame from Bigtable:')
  records.show()

編譯專案

產生 JAR 檔案,用於在 Dataproc 叢集、Dataproc Serverless 或本機 Spark 執行個體中執行工作。您可以在本機編譯 JAR 檔案,然後用來提交工作。提交工作時,編譯後的 JAR 路徑會設為 PATH_TO_COMPILED_JAR 環境變數。

這個步驟不適用於 PySpark 應用程式。

管理依附關係

Bigtable Spark 連接器支援下列依附元件管理工具:

編譯 JAR 檔案

Maven

  1. spark-bigtable 依附元件新增至 pom.xml 檔案。

    <dependencies>
    <dependency>
      <groupId>com.google.cloud.spark.bigtable</groupId>
      <artifactId>spark-bigtable_SCALA_VERSION</artifactId>
      <version>0.1.0</version>
    </dependency>
    </dependencies>
    
  2. Maven Shade 外掛程式新增至 pom.xml 檔案,即可建立 uber JAR:

    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
    
  3. 執行 mvn clean install 指令來產生 JAR 檔案。

sbt

  1. spark-bigtable 依附元件新增至 build.sbt 檔案:

    libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
  2. sbt-assembly 外掛程式新增至 project/plugins.sbtproject/assembly.sbt 檔案,建立 Uber JAR 檔案。

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
  3. 執行 sbt clean assembly 指令來產生 JAR 檔案。

Gradle

  1. build.gradle 檔案中加入 spark-bigtable 依附元件。

    dependencies {
    implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0'
    }
  2. build.gradle 檔案中新增 Shadow 外掛程式,建立 uber JAR 檔案:

    plugins {
    id 'com.github.johnrengelman.shadow' version '8.1.1'
    id 'java'
    }
  3. 如需更多設定和 JAR 編譯資訊,請參閱 Shadow 外掛程式說明文件。

提交工作

使用 Dataproc、Dataproc Serverless 或本機 Spark 執行個體提交 Spark 工作,啟動應用程式。

設定執行階段環境

設定下列環境變數。

      #Google Cloud
      export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
      export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
      export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
      export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
      export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
      export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE

      #Dataproc Serverless
      export BIGTABLE_SPARK_SUBNET=SUBNET
      export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME

      #Scala/Java
      export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR

      #PySpark
      export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
      export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
      export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR

更改下列內容:

  • PROJECT_ID:Bigtable 專案的永久 ID。
  • INSTANCE_ID:Bigtable 執行個體的永久 ID。
  • TABLE_NAME:資料表的永久 ID。
  • DATAPROC_CLUSTER:Dataproc 叢集的永久 ID。
  • DATAPROC_REGION:Dataproc 執行個體中包含叢集的 Dataproc 區域,例如 northamerica-northeast2
  • DATAPROC_ZONE:Dataproc 叢集執行的區域。
  • SUBNET子網路的完整資源路徑。
  • GCS_BUCKET_NAME:用於上傳 Spark 工作負載依附元件的 Cloud Storage 值區。
  • PATH_TO_COMPILED_JAR:已編譯 JAR 的完整或相對路徑,例如 Maven 的 /path/to/project/root/target/<compiled_JAR_name>
  • GCS_PATH_TO_CONNECTOR_JARgs://spark-lib/bigtable Cloud Storage 值區,其中包含 spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar 檔案。
  • PATH_TO_PYTHON_FILE:適用於 PySpark 應用程式,用於將資料寫入 Bigtable 和從 Bigtable 讀取資料的 Python 檔案路徑。
  • LOCAL_PATH_TO_CONNECTOR_JAR:適用於 PySpark 應用程式,已下載的 Bigtable Spark 連接器 JAR 檔案路徑。

提交 Spark 工作

如果是 Dataproc 執行個體或本機 Spark 設定,請執行 Spark 工作,將資料上傳至 Bigtable。

Dataproc 叢集

使用編譯的 JAR 檔案,建立 Dataproc 叢集工作,以便在 Bigtable 間讀寫資料。

  1. 建立 Dataproc 叢集。下列範例顯示建立 Dataproc 2.0 版叢集的範例指令,該叢集採用 Debian 10、兩個工作站節點和預設設定。

    gcloud dataproc clusters create \
      $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \
      --zone $BIGTABLE_SPARK_DATAPROC_ZONE \
      --master-machine-type n2-standard-4 --master-boot-disk-size 500 \
      --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \
      --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
    
  2. 提交工作。

    Scala/Java

    下列範例顯示 spark.bigtable.example.WordCount 類別,其中包含在 DataFrame 中建立測試資料表、將資料表寫入 Bigtable,然後計算資料表中字數的邏輯。

        gcloud dataproc jobs submit spark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --class=spark.bigtable.example.WordCount \
        --jar=$PATH_TO_COMPILED_JAR \
        -- \
        $BIGTABLE_SPARK_PROJECT_ID \
        $BIGTABLE_SPARK_INSTANCE_ID \
        $BIGTABLE_SPARK_TABLE_NAME \
    

    PySpark

        gcloud dataproc jobs submit pyspark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --jars=$GCS_PATH_TO_CONNECTOR_JAR \
        --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
        $PATH_TO_PYTHON_FILE \
        -- \
        --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
        --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
        --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
    

Dataproc Serverless

使用已編譯的 JAR 檔案,建立 Dataproc 工作,透過 Dataproc Serverless 執行個體從 Bigtable 讀取及寫入資料。

Scala/Java

  gcloud dataproc batches submit spark \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
  --  \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
  --jars=$GCS_PATH_TO_CONNECTOR_JAR \
  --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
  -- \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

本機 Spark

使用下載的 JAR 檔案,建立 Spark 工作,透過本機 Spark 執行個體從 Bigtable 讀取及寫入資料。您也可以使用 Bigtable 模擬器提交 Spark 工作。

使用 Bigtable 模擬器

如果您決定使用 Bigtable 模擬器,請按照下列步驟操作:

  1. 執行以下指令來啟用模擬器:

    gcloud beta emulators bigtable start
    

    模擬器預設會選擇 localhost:8086

  2. 設定 BIGTABLE_EMULATOR_HOST 環境變數:

    export BIGTABLE_EMULATOR_HOST=localhost:8086
    
  3. 提交 Spark 工作。

如要進一步瞭解如何使用 Bigtable 模擬器,請參閱「使用模擬器進行測試」。

提交 Spark 工作

無論是否使用本機 Bigtable 模擬器,都可以使用 spark-submit 指令提交 Spark 工作。

Scala/Java

  spark-submit $PATH_TO_COMPILED_JAR \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  spark-submit \
  --jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
  --packages=org.slf4j:slf4j-reload4j:1.7.36 \
  $PATH_TO_PYTHON_FILE \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

驗證資料表資料

執行下列 cbt CLI 指令,確認資料已寫入 Bigtable。 cbt CLI 是 Google Cloud CLI 的元件。詳情請參閱 cbt CLI 總覽

    cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
    read $BIGTABLE_SPARK_TABLE_NAME

其他解決方案

針對特定解決方案使用 Bigtable Spark 連接器,例如序列化複雜的 Spark SQL 型別、讀取特定資料列,以及產生用戶端指標。

使用篩選器讀取特定 DataFrame 資料列

使用 DataFrame 從 Bigtable 讀取資料時,您可以指定篩選器,只讀取特定資料列。系統會在伺服器端套用資料列鍵欄的 ==<=startsWith 等簡單篩選器,避免完整掃描資料表。複合資料列鍵的篩選條件或複雜篩選條件 (例如資料列鍵資料欄的 LIKE 篩選條件) 會在用戶端套用。

如果正在讀取大型資料表,建議使用簡單的資料列索引鍵篩選器,避免執行完整資料表掃描。下列範例陳述式說明如何使用簡單的篩選器讀取資料。請務必在 Spark 篩選器中,使用轉換為列鍵的 DataFrame 資料欄名稱:

    dataframe.filter("id == 'some_id'").show()
  

套用篩選器時,請使用 DataFrame 資料欄名稱,而非 Bigtable 資料表資料欄名稱。

使用 Apache Avro 序列化複雜資料型別

Bigtable Spark 連接器支援使用 Apache Avro 序列化複雜的 Spark SQL 型別,例如 ArrayTypeMapTypeStructType。Apache Avro 提供記錄資料的資料序列化功能,通常用於處理及儲存複雜的資料結構。

使用 "avro":"avroSchema" 等語法,指定應使用 Avro 編碼的 Bigtable 資料欄。然後,您可以在從 Bigtable 讀取資料或將資料寫入 Bigtable 時使用 .option("avroSchema", avroSchemaString),以字串格式指定與該資料欄對應的 Avro 結構定義。您可以針對不同資料欄使用不同的選項名稱 (例如 "anotherAvroSchema"),並傳遞多個資料欄的 Avro 結構定義。

def catalogWithAvroColumn = s"""{
                    |"table":{"name":"ExampleAvroTable"},
                    |"rowkey":"key",
                    |"columns":{
                    |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                    |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
                    |}
                    |}""".stripMargin

使用用戶端指標

由於 Bigtable Spark 連接器是以 Bigtable Client for Java 為基礎,因此連接器內預設會啟用用戶端指標。如要進一步瞭解如何存取及解讀這些指標,請參閱用戶端指標說明文件。

搭配低階 RDD 函式使用 Java 適用的 Bigtable 用戶端

由於 Bigtable Spark 連接器是以 Bigtable Java 用戶端為基礎,因此您可以在 Spark 應用程式中直接使用該用戶端,並在 mapPartitionsforeachPartition 等低階 RDD 函式中執行分散式讀取或寫入要求。

如要使用 Java 類別適用的 Bigtable 用戶端,請在套件名稱中附加 com.google.cloud.spark.bigtable.repackaged 前置字串。舉例來說,請使用 com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient,而非 com.google.cloud.bigtable.data.v2.BigtableDataClient 做為類別名稱。

如要進一步瞭解 Java 適用的 Bigtable 用戶端,請參閱「Java 適用的 Bigtable 用戶端」。

後續步驟