使用 Bigtable Spark 連接器
您可以使用 Bigtable Spark 連接器,在 Bigtable 中讀取和寫入資料。您可以使用 Spark SQL 和 DataFrame 從 Spark 應用程式讀取資料。Bigtable Spark 連接器支援下列 Bigtable 作業:
- 寫入資料
- 讀取資料
- 建立新的資料表
本文說明如何將 Spark SQL DataFrames 資料表轉換為 Bigtable 資料表,然後編譯及建立 JAR 檔案,以提交 Spark 工作。
Spark 和 Scala 支援狀態
Bigtable Spark 連接器支援下列 Scala 版本:
Bigtable Spark 連接器支援下列 Spark 版本:
Bigtable Spark 連接器支援下列 Dataproc 版本:
計算費用
如果您決定使用 Google Cloud的任何計費元件,系統會針對您使用的資源向您收費:
- Bigtable (使用 Bigtable 模擬器不會產生費用)
- Dataproc
- Cloud Storage
使用 Dataproc on Compute Engine 叢集時,適用 Dataproc 定價。在 Dataproc Serverless for Spark 上執行的工作負載和工作階段,適用 Dataproc Serverless 定價。
如要根據預測用量估算費用,請使用 Pricing Calculator。
事前準備
使用 Bigtable Spark 連接器前,請先完成下列先決條件。
必要的角色
如要取得使用 Bigtable Spark 連接器所需的權限,請要求管理員為您授予專案的下列 IAM 角色:
-
Bigtable 管理員 (
roles/bigtable.admin
)(選用): 可讓您讀取或寫入資料,以及建立新資料表。 -
Bigtable 使用者 (
roles/bigtable.user
): 可讀取或寫入資料,但無法建立新資料表。
如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。
如果您使用 Dataproc 或 Cloud Storage,可能需要額外權限。詳情請參閱「Dataproc 權限」和「Cloud Storage 權限」。
設定 Spark
除了建立 Bigtable 執行個體,您還需要設定 Spark 執行個體。您可以在本機執行這項操作,也可以選取下列任一選項,搭配 Dataproc 使用 Spark:
- Dataproc 叢集
- Dataproc Serverless
如要進一步瞭解如何選擇 Dataproc 叢集或無伺服器選項,請參閱「Dataproc Serverless for Spark 與 Dataproc on Compute Engine 比較 」說明文件。
下載連接器 JAR 檔案
您可以在 Bigtable Spark 連接器 GitHub 存放區中,找到 Bigtable Spark 連接器原始碼和範例。
視 Spark 設定而定,您可以透過下列方式存取 JAR 檔案:
如果您在本機執行 PySpark,請從
gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
Cloud Storage 位置下載連接器的 JAR 檔案。將
SCALA_VERSION
替換為2.12
或2.13
(僅支援這兩個 Scala 版本),並將CONNECTOR_VERSION
替換為要使用的連接器版本。如果是 Dataproc 叢集或無伺服器選項,請使用最新的 JAR 檔案做為構件,新增至 Scala 或 Java Spark 應用程式。如要進一步瞭解如何將 JAR 檔案做為構件使用,請參閱「管理依附元件」。
如要將 PySpark 工作提交至 Dataproc,請使用
gcloud dataproc jobs submit pyspark --jars
旗標,將 URI 設為 Cloud Storage 中的 JAR 檔案位置,例如gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
。
判斷運算類型
如果是唯讀工作,您可以使用 Data Boost 無伺服器運算,避免影響應用程式服務叢集。如要使用 Data Boost,Spark 應用程式必須使用 1.1.0 以上版本的 Spark 連接器。
如要使用 Data Boost,您必須建立 Data Boost 應用程式設定檔,然後在將 Bigtable 設定新增至 Spark 應用程式時,為 spark.bigtable.app_profile.id
Spark 選項提供應用程式設定檔 ID。如果您已為 Spark 讀取作業建立應用程式設定檔,並想繼續使用該設定檔,但不想變更應用程式程式碼,可以將應用程式設定檔轉換為 Data Boost 應用程式設定檔。詳情請參閱「轉換應用程式設定檔」。
詳情請參閱 Bigtable Data Boost 總覽。
對於涉及讀取和寫入的工作,您可以透過在要求中指定標準應用程式設定檔,使用執行個體的叢集節點進行運算。
找出或建立要使用的應用程式設定檔
如果您未指定應用程式設定檔 ID,連接器會使用預設應用程式設定檔。
建議您為執行的每個應用程式 (包括 Spark 應用程式) 使用專屬的應用程式設定檔。如要進一步瞭解應用程式設定檔類型和設定,請參閱「應用程式設定檔總覽」。如需操作說明,請參閱「建立及設定應用程式設定檔」。
在 Spark 應用程式中新增 Bigtable 設定
在 Spark 應用程式中,新增可與 Bigtable 互動的 Spark 選項。
支援的 Spark 選項
使用 com.google.cloud.spark.bigtable
套件提供的 Spark 選項。
選項名稱 | 必填 | 預設值 | 意義 |
---|---|---|---|
spark.bigtable.project.id |
是 | 不適用 | 設定 Bigtable 專案 ID。 |
spark.bigtable.instance.id |
是 | 不適用 | 設定 Bigtable 執行個體 ID。 |
catalog |
是 | 不適用 | 設定 JSON 格式,指定 DataFrame 的類 SQL 結構定義與 Bigtable 資料表的結構定義之間的轉換格式。 詳情請參閱「以 JSON 格式建立資料表的中繼資料」。 |
spark.bigtable.app_profile.id |
否 | default |
設定 Bigtable 應用程式設定檔 ID。 |
spark.bigtable.write.timestamp.milliseconds |
否 | 目前系統時間 | 設定將 DataFrame 寫入 Bigtable 時要使用的時間戳記 (以毫秒為單位)。 請注意,由於 DataFrame 中的所有資料列都使用相同的時間戳記,因此 DataFrame 中具有相同資料列鍵資料欄的資料列會以單一版本形式保留在 Bigtable 中,因為這些資料列共用相同的時間戳記。 |
spark.bigtable.create.new.table |
否 | false |
設為 true ,即可在寫入 Bigtable 前建立新資料表。 |
spark.bigtable.read.timerange.start.milliseconds 或spark.bigtable.read.timerange.end.milliseconds |
否 | 不適用 | 設定時間戳記 (以紀元時間後的毫秒數為單位),分別篩選出特定開始日期和結束日期的儲存格。 |
spark.bigtable.push.down.row.key.filters |
否 | true |
設為 true ,即可在伺服器端進行簡單的資料列鍵篩選。複合資料列鍵的篩選作業是在用戶端實作。詳情請參閱「使用篩選條件讀取特定 DataFrame 資料列」。 |
spark.bigtable.read.rows.attempt.timeout.milliseconds |
否 | 30m | 在 Java 適用的 Bigtable 用戶端中,為對應至一個 DataFrame 分區的讀取資料列嘗試設定 timeout 持續時間。 |
spark.bigtable.read.rows.total.timeout.milliseconds |
否 | 12 小時 | 在 Java 適用的 Bigtable 用戶端中,為對應至一個 DataFrame 分區的讀取資料列嘗試設定總逾時時間長度。 |
spark.bigtable.mutate.rows.attempt.timeout.milliseconds |
否 | 1 個月 | 在 Java 適用的 Bigtable 用戶端中,為對應至一個 DataFrame 分區的突變資料列嘗試設定 timeout 持續時間。 |
spark.bigtable.mutate.rows.total.timeout.milliseconds |
否 | 10 分鐘 | 在 Java 適用的 Bigtable 用戶端中,為對應至一個 DataFrame 分區的變動資料列嘗試作業,設定 total 超時持續時間。 |
spark.bigtable.batch.mutate.size |
否 | 100 |
設為每個批次中的異動次數。可設定的最大值為 100000 。 |
spark.bigtable.enable.batch_mutate.flow_control |
否 | false |
設為 true ,即可為批次變動啟用流量控制。 |
以 JSON 格式建立資料表的中繼資料
Spark SQL DataFrames 資料表格式必須使用 JSON 格式的字串轉換為 Bigtable 資料表。這個字串 JSON 格式可讓資料格式與 Bigtable 相容。您可以使用 .option("catalog", catalog_json_string)
選項,在應用程式程式碼中傳遞 JSON 格式。
舉例來說,請參考下列 DataFrame 資料表和對應的 Bigtable 資料表。
在這個範例中,DataFrame 中的 name
和 birthYear
資料欄會歸類在 info
資料欄系列下,並分別重新命名為 name
和 birth_year
。同樣地,address
資料欄會儲存在 location
資料欄系列中,並使用相同的資料欄名稱。DataFrame 中的「id
」資料欄會轉換為 Bigtable 資料列鍵。
在 Bigtable 中,資料列鍵沒有專屬的資料欄名稱,在本範例中,id_rowkey
僅用於向連接器指出這是資料列鍵資料欄。您可以為資料列鍵欄使用任何名稱,並確保在 JSON 格式中宣告 "rowkey":"column_name"
欄位時使用相同名稱。
DataFrame | Bigtable table = t1 | |||||||
欄 | 資料列索引鍵 | 資料欄系列 | ||||||
資訊 | 地點 | |||||||
欄 | 欄 | |||||||
id | name | birthYear | address | id_rowkey | name | birth_year | address |
目錄的 JSON 格式如下:
"""
{
"table": {"name": "t1"},
"rowkey": "id_rowkey",
"columns": {
"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
"name": {"cf": "info", "col": "name", "type": "string"},
"birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
"address": {"cf": "location", "col": "address", "type": "string"}
}
}
"""
JSON 格式中使用的鍵和值如下:
目錄鍵 | 目錄值 | JSON 格式 |
---|---|---|
資料表 | Bigtable 資料表的名稱。 | "table":{"name":"t1"} 如果資料表不存在,請使用 .option("spark.bigtable.create.new.table", "true") 建立資料表。 |
rowkey | 做為 Bigtable 資料列鍵的資料欄名稱。請確保 DataFrame 資料欄的資料欄名稱會做為資料列索引鍵,例如 id_rowkey 。複合鍵也可做為資料列鍵。例如: "rowkey":"name:address" 。這種做法可能會導致資料列鍵需要完整掃描資料表,才能處理所有讀取要求。 |
"rowkey":"id_rowkey" 、 |
欄 | 將每個 DataFrame 資料欄對應至相應的 Bigtable 資料欄系列 ("cf" ) 和資料欄名稱 ("col" )。資料欄名稱可以與 DataFrame 表格中的資料欄名稱不同。支援的資料類型包括 string 、long 和 binary 。 |
"columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}" 在本例中, id_rowkey 是資料列鍵,而 info 和 location 是資料欄系列。 |
支援的資料類型
連接器支援在目錄中使用 string
、long
和 binary
(位元組陣列) 類型。在新增對 int
和 float
等其他類型的支援之前,您可以先手動將這類資料類型轉換為位元組陣列 (Spark SQL 的 BinaryType
),再使用連接器將其寫入 Bigtable。
此外,您可以使用 Avro 序列化複雜型別,例如 ArrayType
。詳情請參閱「使用 Apache Avro 序列化複雜資料類型」。
寫入 Bigtable
使用 .write()
函式和支援的選項,將資料寫入 Bigtable。
Java
GitHub 存放區中的下列程式碼使用 Java 和 Maven 寫入 Bigtable。
String catalog = "{" +
"\"table\":{\"name\":\"" + tableName + "\"," +
"\"tableCoder\":\"PrimitiveType\"}," +
"\"rowkey\":\"wordCol\"," +
"\"columns\":{" +
"\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
"\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
"}}".replaceAll("\\s+", "");
…
private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
String createNewTable) {
dataframe
.write()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.option("spark.bigtable.create.new.table", createNewTable)
.save();
}
Python
GitHub 存放區中的下列程式碼會使用 Python 寫入 Bigtable。
catalog = ''.join(("""{
"table":{"name":" """ + bigtable_table_name + """
", "tableCoder":"PrimitiveType"},
"rowkey":"wordCol",
"columns":{
"word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
"count":{"cf":"example_family", "col":"countCol", "type":"long"}
}
}""").split())
…
input_data = spark.createDataFrame(data)
print('Created the DataFrame:')
input_data.show()
input_data.write \
.format('bigtable') \
.options(catalog=catalog) \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.option('spark.bigtable.create.new.table', create_new_table) \
.save()
print('DataFrame was written to Bigtable.')
…
從 Bigtable 讀取資料
使用 .read()
函式檢查資料表是否已成功匯入 Bigtable。
Java
…
private static Dataset<Row> readDataframeFromBigtable(String catalog) {
Dataset<Row> dataframe = spark
.read()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.load();
return dataframe;
}
Python
…
records = spark.read \
.format('bigtable') \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.options(catalog=catalog) \
.load()
print('Reading the DataFrame from Bigtable:')
records.show()
編譯專案
產生 JAR 檔案,用於在 Dataproc 叢集、Dataproc Serverless 或本機 Spark 執行個體中執行工作。您可以在本機編譯 JAR 檔案,然後用來提交工作。提交工作時,編譯後的 JAR 路徑會設為 PATH_TO_COMPILED_JAR
環境變數。
這個步驟不適用於 PySpark 應用程式。
管理依附關係
Bigtable Spark 連接器支援下列依附元件管理工具:
編譯 JAR 檔案
Maven
將
spark-bigtable
依附元件新增至 pom.xml 檔案。<dependencies> <dependency> <groupId>com.google.cloud.spark.bigtable</groupId> <artifactId>spark-bigtable_SCALA_VERSION</artifactId> <version>0.1.0</version> </dependency> </dependencies>
將 Maven Shade 外掛程式新增至
pom.xml
檔案,即可建立 uber JAR:<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins>
執行
mvn clean install
指令來產生 JAR 檔案。
sbt
將
spark-bigtable
依附元件新增至build.sbt
檔案:libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
將
sbt-assembly
外掛程式新增至project/plugins.sbt
或project/assembly.sbt
檔案,建立 Uber JAR 檔案。addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
執行
sbt clean assembly
指令來產生 JAR 檔案。
Gradle
在
build.gradle
檔案中加入spark-bigtable
依附元件。dependencies { implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0' }
在
build.gradle
檔案中新增 Shadow 外掛程式,建立 uber JAR 檔案:plugins { id 'com.github.johnrengelman.shadow' version '8.1.1' id 'java' }
如需更多設定和 JAR 編譯資訊,請參閱 Shadow 外掛程式說明文件。
提交工作
使用 Dataproc、Dataproc Serverless 或本機 Spark 執行個體提交 Spark 工作,啟動應用程式。
設定執行階段環境
設定下列環境變數。
#Google Cloud
export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE
#Dataproc Serverless
export BIGTABLE_SPARK_SUBNET=SUBNET
export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME
#Scala/Java
export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR
#PySpark
export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR
更改下列內容:
- PROJECT_ID:Bigtable 專案的永久 ID。
- INSTANCE_ID:Bigtable 執行個體的永久 ID。
- TABLE_NAME:資料表的永久 ID。
- DATAPROC_CLUSTER:Dataproc 叢集的永久 ID。
- DATAPROC_REGION:Dataproc 執行個體中包含叢集的 Dataproc 區域,例如
northamerica-northeast2
。 - DATAPROC_ZONE:Dataproc 叢集執行的區域。
- SUBNET:子網路的完整資源路徑。
- GCS_BUCKET_NAME:用於上傳 Spark 工作負載依附元件的 Cloud Storage 值區。
- PATH_TO_COMPILED_JAR:已編譯 JAR 的完整或相對路徑,例如 Maven 的
/path/to/project/root/target/<compiled_JAR_name>
。 - GCS_PATH_TO_CONNECTOR_JAR:
gs://spark-lib/bigtable
Cloud Storage 值區,其中包含spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
檔案。 - PATH_TO_PYTHON_FILE:適用於 PySpark 應用程式,用於將資料寫入 Bigtable 和從 Bigtable 讀取資料的 Python 檔案路徑。
- LOCAL_PATH_TO_CONNECTOR_JAR:適用於 PySpark 應用程式,已下載的 Bigtable Spark 連接器 JAR 檔案路徑。
提交 Spark 工作
如果是 Dataproc 執行個體或本機 Spark 設定,請執行 Spark 工作,將資料上傳至 Bigtable。
Dataproc 叢集
使用編譯的 JAR 檔案,建立 Dataproc 叢集工作,以便在 Bigtable 間讀寫資料。
建立 Dataproc 叢集。下列範例顯示建立 Dataproc 2.0 版叢集的範例指令,該叢集採用 Debian 10、兩個工作站節點和預設設定。
gcloud dataproc clusters create \ $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \ --zone $BIGTABLE_SPARK_DATAPROC_ZONE \ --master-machine-type n2-standard-4 --master-boot-disk-size 500 \ --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \ --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
提交工作。
Scala/Java
下列範例顯示
spark.bigtable.example.WordCount
類別,其中包含在 DataFrame 中建立測試資料表、將資料表寫入 Bigtable,然後計算資料表中字數的邏輯。gcloud dataproc jobs submit spark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --class=spark.bigtable.example.WordCount \ --jar=$PATH_TO_COMPILED_JAR \ -- \ $BIGTABLE_SPARK_PROJECT_ID \ $BIGTABLE_SPARK_INSTANCE_ID \ $BIGTABLE_SPARK_TABLE_NAME \
PySpark
gcloud dataproc jobs submit pyspark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --jars=$GCS_PATH_TO_CONNECTOR_JAR \ --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \ $PATH_TO_PYTHON_FILE \ -- \ --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \ --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \ --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
Dataproc Serverless
使用已編譯的 JAR 檔案,建立 Dataproc 工作,透過 Dataproc Serverless 執行個體從 Bigtable 讀取及寫入資料。
Scala/Java
gcloud dataproc batches submit spark \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
-- \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
--jars=$GCS_PATH_TO_CONNECTOR_JAR \
--properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
-- \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
本機 Spark
使用下載的 JAR 檔案,建立 Spark 工作,透過本機 Spark 執行個體從 Bigtable 讀取及寫入資料。您也可以使用 Bigtable 模擬器提交 Spark 工作。
使用 Bigtable 模擬器
如果您決定使用 Bigtable 模擬器,請按照下列步驟操作:
執行以下指令來啟用模擬器:
gcloud beta emulators bigtable start
模擬器預設會選擇
localhost:8086
。設定
BIGTABLE_EMULATOR_HOST
環境變數:export BIGTABLE_EMULATOR_HOST=localhost:8086
如要進一步瞭解如何使用 Bigtable 模擬器,請參閱「使用模擬器進行測試」。
提交 Spark 工作
無論是否使用本機 Bigtable 模擬器,都可以使用 spark-submit
指令提交 Spark 工作。
Scala/Java
spark-submit $PATH_TO_COMPILED_JAR \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
spark-submit \
--jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
--packages=org.slf4j:slf4j-reload4j:1.7.36 \
$PATH_TO_PYTHON_FILE \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
驗證資料表資料
執行下列
cbt
CLI
指令,確認資料已寫入 Bigtable。
cbt
CLI
是 Google Cloud CLI 的元件。詳情請參閱
cbt
CLI
總覽。
cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
read $BIGTABLE_SPARK_TABLE_NAME
其他解決方案
針對特定解決方案使用 Bigtable Spark 連接器,例如序列化複雜的 Spark SQL 型別、讀取特定資料列,以及產生用戶端指標。
使用篩選器讀取特定 DataFrame 資料列
使用 DataFrame 從 Bigtable 讀取資料時,您可以指定篩選器,只讀取特定資料列。系統會在伺服器端套用資料列鍵欄的 ==
、<=
和 startsWith
等簡單篩選器,避免完整掃描資料表。複合資料列鍵的篩選條件或複雜篩選條件 (例如資料列鍵資料欄的 LIKE
篩選條件) 會在用戶端套用。
如果正在讀取大型資料表,建議使用簡單的資料列索引鍵篩選器,避免執行完整資料表掃描。下列範例陳述式說明如何使用簡單的篩選器讀取資料。請務必在 Spark 篩選器中,使用轉換為列鍵的 DataFrame 資料欄名稱:
dataframe.filter("id == 'some_id'").show()
套用篩選器時,請使用 DataFrame 資料欄名稱,而非 Bigtable 資料表資料欄名稱。
使用 Apache Avro 序列化複雜資料型別
Bigtable Spark 連接器支援使用 Apache Avro 序列化複雜的 Spark SQL 型別,例如 ArrayType
、MapType
或 StructType
。Apache Avro 提供記錄資料的資料序列化功能,通常用於處理及儲存複雜的資料結構。
使用 "avro":"avroSchema"
等語法,指定應使用 Avro 編碼的 Bigtable 資料欄。然後,您可以在從 Bigtable 讀取資料或將資料寫入 Bigtable 時使用 .option("avroSchema", avroSchemaString)
,以字串格式指定與該資料欄對應的 Avro 結構定義。您可以針對不同資料欄使用不同的選項名稱 (例如 "anotherAvroSchema"
),並傳遞多個資料欄的 Avro 結構定義。
def catalogWithAvroColumn = s"""{
|"table":{"name":"ExampleAvroTable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
使用用戶端指標
由於 Bigtable Spark 連接器是以 Bigtable Client for Java 為基礎,因此連接器內預設會啟用用戶端指標。如要進一步瞭解如何存取及解讀這些指標,請參閱用戶端指標說明文件。
搭配低階 RDD 函式使用 Java 適用的 Bigtable 用戶端
由於 Bigtable Spark 連接器是以 Bigtable Java 用戶端為基礎,因此您可以在 Spark 應用程式中直接使用該用戶端,並在 mapPartitions
和 foreachPartition
等低階 RDD 函式中執行分散式讀取或寫入要求。
如要使用 Java 類別適用的 Bigtable 用戶端,請在套件名稱中附加 com.google.cloud.spark.bigtable.repackaged
前置字串。舉例來說,請使用 com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient
,而非 com.google.cloud.bigtable.data.v2.BigtableDataClient
做為類別名稱。
如要進一步瞭解 Java 適用的 Bigtable 用戶端,請參閱「Java 適用的 Bigtable 用戶端」。
後續步驟
- 瞭解如何在 Dataproc 中調整 Spark 工作。
- 搭配 Bigtable Spark 連接器使用適用於 Java 的 Bigtable 用戶端中的類別。