使用 Bigtable Spark 连接器
借助 Bigtable Spark 连接器,您可以从 Bigtable 中读取数据以及向 Bigtable 中写入数据。您可以使用 Spark SQL 和 DataFrame 从 Spark 应用中读取数据。Bigtable Spark 连接器支持以下 Bigtable 操作:
- 写入数据
- 读取数据
- 创建新表
本文档介绍了如何将 Spark SQL DataFrames 表转换为 Bigtable 表,然后编译并创建 JAR 文件以提交 Spark 作业。
Spark 和 Scala 支持状态
Bigtable Spark 连接器支持以下 Scala 版本:
Bigtable Spark 连接器支持以下 Spark 版本:
Bigtable Spark 连接器支持以下 Dataproc 版本:
计算费用
如果您决定使用 Google Cloud的以下任何收费组件,则需要为所使用的资源付费:
- Bigtable(使用 Bigtable 模拟器不会产生费用)
- Dataproc
- Cloud Storage
Dataproc 价格适用于 Dataproc on Compute Engine 集群的使用。Dataproc Serverless 价格适用于在 Dataproc Serverless for Spark 上运行的工作负载和会话。
您可使用价格计算器根据您的预计使用量来估算费用。
准备工作
在使用 Bigtable Spark 连接器之前,请先完成以下前提条件。
所需的角色
如需获得使用 Bigtable Spark 连接器所需的权限,请让管理员向您授予项目的以下 IAM 角色:
-
Bigtable Administrator (
roles/bigtable.admin
)(可选): 可让您读取或写入数据以及创建新表。 -
Bigtable User (
roles/bigtable.user
): 允许您读取或写入数据,但不允许您创建新表。
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
如果您使用的是 Dataproc 或 Cloud Storage,可能需要额外权限。如需了解详情,请参阅 Dataproc 权限和 Cloud Storage 权限。
设置 Spark
除了创建 Bigtable 实例之外,您还需要设置 Spark 实例。您可以在本地执行此操作,也可以选择以下任一选项,将 Spark 与 Dataproc 搭配使用:
- Dataproc 集群
- Dataproc Serverless
如需详细了解如何选择 Dataproc 集群或无服务器选项,请参阅 Dataproc Serverless for Spark 与 Dataproc on Compute Engine 文档。
下载连接器 JAR 文件
您可以在 Bigtable Spark 连接器 GitHub 代码库中找到 Bigtable Spark 连接器源代码及示例。
根据您的 Spark 设置,您可以按如下方式访问 JAR 文件:
如果您在本地运行 PySpark,则应从
gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
Cloud Storage 位置下载连接器的 JAR 文件。将
SCALA_VERSION
替换为2.12
或2.13
(这是唯一受支持的 Scala 版本),并将CONNECTOR_VERSION
替换为您要使用的连接器版本。对于 Dataproc 集群或无服务器选项,请使用最新的 JAR 文件作为可在 Scala 或 Java Spark 应用中添加的制品。如需详细了解如何将 JAR 文件用作制品,请参阅管理依赖项。
如果您要将 PySpark 作业提交到 Dataproc,请使用
gcloud dataproc jobs submit pyspark --jars
标志将 URI 设置为 Cloud Storage 中 JAR 文件的位置,例如gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
。
确定计算类型
对于只读作业,您可以使用 Data Boost 无服务器计算,这样可以避免影响应用处理集群。您的 Spark 应用必须使用 1.1.0 版或更高版本的 Spark 连接器才能使用 Data Boost。
如需使用 Data Boost,您必须创建 Data Boost 应用配置文件,然后在将 Bigtable 配置添加到 Spark 应用时,为 spark.bigtable.app_profile.id
Spark 选项提供应用配置文件 ID。如果您已为 Spark 读取作业创建应用配置文件,并且希望继续使用该配置文件而不更改应用代码,则可以将该应用配置文件转换为 Data Boost 应用配置文件。如需了解详情,请参阅转换应用配置文件。
如需了解详情,请参阅 Bigtable Data Boost 概览。
对于涉及读取和写入的作业,您可以在请求中指定标准应用配置文件,以使用实例的集群节点进行计算。
确定或创建要使用的应用配置文件
如果您未指定应用配置文件 ID,连接器将使用默认应用配置文件。
我们建议您为运行的每个应用(包括 Spark 应用)使用各自不同的应用配置文件。如需详细了解应用配置文件类型和设置,请参阅应用配置文件概览。如需查看相关说明,请参阅创建和配置应用配置文件。
向 Spark 应用添加 Bigtable 配置
在 Spark 应用中,添加可让您与 Bigtable 交互的 Spark 选项。
支持的 Spark 选项
使用作为 com.google.cloud.spark.bigtable
软件包一部分提供的 Spark 选项。
选项名称 | 必需 | 默认值 | 含义 |
---|---|---|---|
spark.bigtable.project.id |
是 | 不适用 | 设置 Bigtable 项目 ID。 |
spark.bigtable.instance.id |
是 | 不适用 | 设置 Bigtable 实例 ID。 |
catalog |
是 | 不适用 | 设置 JSON 格式,用于指定 DataFrame 的类 SQL 架构与 Bigtable 表的架构之间的转换格式。 如需了解详情,请参阅以 JSON 格式创建表元数据。 |
spark.bigtable.app_profile.id |
否 | default |
设置 Bigtable 应用配置文件 ID。 |
spark.bigtable.write.timestamp.milliseconds |
否 | 当前系统时间 | 设置将 DataFrame 写入 Bigtable 时要使用的以毫秒为单位的时间戳。 请注意,由于 DataFrame 中的所有行都使用相同的时间戳,因此 DataFrame 中具有相同行键列的行在 Bigtable 中会以单个版本形式保留,因为它们共享相同的时间戳。 |
spark.bigtable.create.new.table |
否 | false |
设置为 true 可在写入 Bigtable 之前创建新表。 |
spark.bigtable.read.timerange.start.milliseconds 或 spark.bigtable.read.timerange.end.milliseconds |
否 | 不适用 | 设置时间戳(以毫秒为单位,从纪元时间开始计算),以过滤具有特定开始日期和结束日期的单元格。 |
spark.bigtable.push.down.row.key.filters |
否 | true |
设置为 true 可在服务器端实现简单的行键过滤。对复合行键的过滤是在客户端实现的。如需了解详情,请参阅使用过滤条件读取特定的 DataFrame 行。 |
spark.bigtable.read.rows.attempt.timeout.milliseconds |
否 | 30 分钟 | 在 Java 版 Bigtable 客户端中,为读取行尝试设置与一个 DataFrame 分区对应的超时时长。 |
spark.bigtable.read.rows.total.timeout.milliseconds |
否 | 12 小时 | 为 Java 版 Bigtable 客户端中与一个 DataFrame 分区对应的读取行尝试设置总超时时长。 |
spark.bigtable.mutate.rows.attempt.timeout.milliseconds |
否 | 1 分钟 | 为 Java 版 Bigtable 客户端中与一个 DataFrame 分区对应的 mutate rows 尝试设置超时时长。 |
spark.bigtable.mutate.rows.total.timeout.milliseconds |
否 | 10 分钟 | 为 Java 版 Bigtable 客户端中与一个 DataFrame 分区对应的 mutateRows 尝试设置总超时时长。 |
spark.bigtable.batch.mutate.size |
否 | 100 |
设置为每个批次中的突变数量。您可以设置的最大值为 100000 。 |
spark.bigtable.enable.batch_mutate.flow_control |
否 | false |
设置为 true 可为批量变更启用流量控制。 |
创建 JSON 格式的表格元数据
必须使用 JSON 格式的字符串将 Spark SQL DataFrame 表格式转换为 Bigtable 表。这种字符串 JSON 格式可使数据格式与 Bigtable 兼容。您可以使用 .option("catalog", catalog_json_string)
选项在应用代码中传递 JSON 格式。
例如,假设有以下 DataFrame 表和相应的 Bigtable 表。
在此示例中,DataFrame 中的 name
和 birthYear
列被分组到 info
列族下,并分别重命名为 name
和 birth_year
。同样,address
列存储在 location
列族下,列名称相同。DataFrame 中的 id
列会转换为 Bigtable 行键。
行键在 Bigtable 中没有专用列名称,在本例中,id_rowkey
仅用于向连接器指示这是行键列。您可以为行键列使用任意名称,但请务必在以 JSON 格式声明 "rowkey":"column_name"
字段时使用相同的名称。
DataFrame | Bigtable 表 = t1 | |||||||
列 | 行键 | 列族 | ||||||
信息 | location | |||||||
列 | 列 | |||||||
id | name | birthYear | 地址 | id_rowkey | name | birth_year | 地址 |
目录的 JSON 格式如下:
"""
{
"table": {"name": "t1"},
"rowkey": "id_rowkey",
"columns": {
"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
"name": {"cf": "info", "col": "name", "type": "string"},
"birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
"address": {"cf": "location", "col": "address", "type": "string"}
}
}
"""
JSON 格式中使用的键和值如下所示:
目录键 | 目录值 | JSON 格式 |
---|---|---|
表 | Bigtable 表的名称。 | "table":{"name":"t1"} 如果表不存在,请使用 .option("spark.bigtable.create.new.table", "true") 创建表。 |
rowkey | 将用作 Bigtable 行键的列的名称。确保 DataFrame 列的列名用作行键,例如 id_rowkey 。复合键也可作为行键。例如 "rowkey":"name:address" 。这种方法可能会导致行键需要进行全表扫描才能满足所有读取请求。 |
"rowkey":"id_rowkey" 、 |
列 | 将每个 DataFrame 列映射到相应的 Bigtable 列族 ("cf" ) 和列名称 ("col" )。列名称可以与 DataFrame 表中的列名称不同。支持的数据类型包括 string 、long 和 binary 。 |
"columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}" 在此示例中, id_rowkey 是行键,info 和 location 是列族。 |
支持的数据类型
该连接器支持在目录中使用 string
、long
和 binary
(字节数组)类型。在添加对 int
和 float
等其他类型的支持之前,您可以先手动将这些数据类型转换为字节数组 (Spark SQL 的 BinaryType
),然后再使用连接器将它们写入 Bigtable。
此外,您还可以使用 Avro 序列化复杂类型,例如 ArrayType
。如需了解详情,请参阅使用 Apache Avro 序列化复杂数据类型。
写入 Bigtable
使用 .write()
函数和支持的选项将数据写入 Bigtable。
Java
GitHub 代码库中的以下代码使用 Java 和 Maven 写入 Bigtable。
String catalog = "{" +
"\"table\":{\"name\":\"" + tableName + "\"," +
"\"tableCoder\":\"PrimitiveType\"}," +
"\"rowkey\":\"wordCol\"," +
"\"columns\":{" +
"\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
"\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
"}}".replaceAll("\\s+", "");
…
private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
String createNewTable) {
dataframe
.write()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.option("spark.bigtable.create.new.table", createNewTable)
.save();
}
Python
GitHub 代码库中的以下代码使用 Python 写入 Bigtable。
catalog = ''.join(("""{
"table":{"name":" """ + bigtable_table_name + """
", "tableCoder":"PrimitiveType"},
"rowkey":"wordCol",
"columns":{
"word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
"count":{"cf":"example_family", "col":"countCol", "type":"long"}
}
}""").split())
…
input_data = spark.createDataFrame(data)
print('Created the DataFrame:')
input_data.show()
input_data.write \
.format('bigtable') \
.options(catalog=catalog) \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.option('spark.bigtable.create.new.table', create_new_table) \
.save()
print('DataFrame was written to Bigtable.')
…
从 Bigtable 中读取
使用 .read()
函数检查表是否已成功导入到 Bigtable 中。
Java
…
private static Dataset<Row> readDataframeFromBigtable(String catalog) {
Dataset<Row> dataframe = spark
.read()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.load();
return dataframe;
}
Python
…
records = spark.read \
.format('bigtable') \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.options(catalog=catalog) \
.load()
print('Reading the DataFrame from Bigtable:')
records.show()
编译项目
生成用于在 Dataproc 集群、Dataproc Serverless 或本地 Spark 实例中运行作业的 JAR 文件。您可以在本地编译 JAR 文件,然后使用该文件提交作业。提交作业时,已编译 JAR 的路径会设置为 PATH_TO_COMPILED_JAR
环境变量。
此步骤不适用于 PySpark 应用。
管理依赖项
Bigtable Spark 连接器支持以下依赖项管理工具:
编译 JAR 文件
Maven
将
spark-bigtable
依赖项添加到您的 pom.xml 文件中。<dependencies> <dependency> <groupId>com.google.cloud.spark.bigtable</groupId> <artifactId>spark-bigtable_SCALA_VERSION</artifactId> <version>0.1.0</version> </dependency> </dependencies>
将 Maven Shade 插件添加到
pom.xml
文件中,以创建超级 JAR:<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins>
运行
mvn clean install
命令以生成 JAR 文件。
sbt
将
spark-bigtable
依赖项添加到build.sbt
文件中:libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
将
sbt-assembly
插件添加到project/plugins.sbt
或project/assembly.sbt
文件,以创建超级 JAR 文件。addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
运行
sbt clean assembly
命令以生成 JAR 文件。
Gradle
将
spark-bigtable
依赖项添加到build.gradle
文件中。dependencies { implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0' }
在
build.gradle
文件中添加 Shadow 插件,以创建超级 JAR 文件:plugins { id 'com.github.johnrengelman.shadow' version '8.1.1' id 'java' }
如需了解更多配置和 JAR 编译信息,请参阅 Shadow 插件的文档。
提交作业
使用 Dataproc、Dataproc Serverless 或本地 Spark 实例提交 Spark 作业,以启动应用。
设置运行时环境
设置以下环境变量。
#Google Cloud
export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE
#Dataproc Serverless
export BIGTABLE_SPARK_SUBNET=SUBNET
export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME
#Scala/Java
export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR
#PySpark
export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR
替换以下内容:
- PROJECT_ID:Bigtable 项目的永久性标识符。
- INSTANCE_ID:Bigtable 实例的永久性标识符。
- TABLE_NAME:表的永久标识符。
- DATAPROC_CLUSTER:Dataproc 集群的永久性标识符。
- DATAPROC_REGION:包含 Dataproc 实例中某个集群的 Dataproc 区域,例如
northamerica-northeast2
。 - DATAPROC_ZONE:Dataproc 集群运行的地区。
- SUBNET:子网的完整资源路径。
- GCS_BUCKET_NAME:用于上传 Spark 工作负载依赖项的 Cloud Storage 存储桶。
- PATH_TO_COMPILED_JAR:已编译 JAR 的完整路径或相对路径,例如 Maven 的
/path/to/project/root/target/<compiled_JAR_name>
。 - GCS_PATH_TO_CONNECTOR_JAR:
spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
文件所在的gs://spark-lib/bigtable
Cloud Storage 存储桶。 - PATH_TO_PYTHON_FILE:对于 PySpark 应用,将用于向 Bigtable 写入数据和从 Bigtable 读取数据的 Python 文件的路径。
- LOCAL_PATH_TO_CONNECTOR_JAR:对于 PySpark 应用,下载的 Bigtable Spark 连接器 JAR 文件的路径。
提交 Spark 作业
对于 Dataproc 实例或本地 Spark 设置,请运行 Spark 作业以将数据上传到 Bigtable。
Dataproc 集群
使用已编译的 JAR 文件,并创建一个 Dataproc 集群作业,用于从 Bigtable 读取数据以及向 Bigtable 写入数据。
创建 Dataproc 集群。以下示例展示了一个示例命令,用于创建具有 Debian 10、两个工作器节点和默认配置的 Dataproc v2.0 集群。
gcloud dataproc clusters create \ $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \ --zone $BIGTABLE_SPARK_DATAPROC_ZONE \ --master-machine-type n2-standard-4 --master-boot-disk-size 500 \ --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \ --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
提交作业。
Scala/Java
以下示例展示了
spark.bigtable.example.WordCount
类,其中包含在 DataFrame 中创建测试表、将表写入 Bigtable,然后统计表中字数的逻辑。gcloud dataproc jobs submit spark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --class=spark.bigtable.example.WordCount \ --jar=$PATH_TO_COMPILED_JAR \ -- \ $BIGTABLE_SPARK_PROJECT_ID \ $BIGTABLE_SPARK_INSTANCE_ID \ $BIGTABLE_SPARK_TABLE_NAME \
PySpark
gcloud dataproc jobs submit pyspark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --jars=$GCS_PATH_TO_CONNECTOR_JAR \ --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \ $PATH_TO_PYTHON_FILE \ -- \ --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \ --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \ --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
Dataproc Serverless
使用已编译的 JAR 文件创建一个 Dataproc 作业,该作业可使用 Dataproc Serverless 实例从 Bigtable 读取数据以及向 Bigtable 写入数据。
Scala/Java
gcloud dataproc batches submit spark \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
-- \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
--jars=$GCS_PATH_TO_CONNECTOR_JAR \
--properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
-- \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
本地 Spark
使用下载的 JAR 文件创建一个 Spark 作业,该作业可使用本地 Spark 实例从 Bigtable 读取数据以及向 Bigtable 写入数据。您还可以使用 Bigtable 模拟器提交 Spark 作业。
使用 Bigtable 模拟器
如果您决定使用 Bigtable 模拟器,请按以下步骤操作:
运行以下命令启动模拟器:
gcloud beta emulators bigtable start
默认情况下,该模拟器会选择
localhost:8086
。设置
BIGTABLE_EMULATOR_HOST
环境变量:export BIGTABLE_EMULATOR_HOST=localhost:8086
如需详细了解如何使用 Bigtable 模拟器,请参阅使用模拟器进行测试。
提交 Spark 作业
无论您是否使用本地 Bigtable 模拟器,都可以使用 spark-submit
命令提交 Spark 作业。
Scala/Java
spark-submit $PATH_TO_COMPILED_JAR \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
spark-submit \
--jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
--packages=org.slf4j:slf4j-reload4j:1.7.36 \
$PATH_TO_PYTHON_FILE \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
验证表数据
运行以下 cbt
CLI 命令,验证数据是否已写入 Bigtable。cbt
CLI 是 Google Cloud CLI 的一个组件。如需了解详情,请参阅
cbt
CLI 概览。
cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
read $BIGTABLE_SPARK_TABLE_NAME
其他解决方案
使用 Bigtable Spark 连接器可实现特定解决方案,例如序列化复杂的 Spark SQL 类型、读取特定行和生成客户端指标。
使用过滤条件读取特定的 DataFrame 行
使用 DataFrame 从 Bigtable 读取数据时,您可以指定过滤条件,以便仅读取特定行。系统会在服务器端应用行键列上的简单过滤条件(例如 ==
、<=
和 startsWith
),以避免进行全表扫描。对复合行键的过滤条件或复杂过滤条件(例如对行键列的 LIKE
过滤条件)在客户端应用。
如果您要读取大型表,建议使用简单的行键过滤条件,以免执行全表扫描。以下示例语句展示了如何使用简单过滤条件进行读取。请确保在 Spark 过滤器中,您使用的是转换为行键的 DataFrame 列的名称:
dataframe.filter("id == 'some_id'").show()
应用过滤条件时,请使用 DataFrame 列名称,而不是 Bigtable 表列名称。
使用 Apache Avro 序列化复杂数据类型
Bigtable Spark 连接器支持使用 Apache Avro 序列化复杂的 Spark SQL 类型,例如 ArrayType
、MapType
或 StructType
。Apache Avro 为记录数据提供数据序列化,通常用于处理和存储复杂的数据结构。
使用 "avro":"avroSchema"
等语法指定应使用 Avro 对 Bigtable 中的列进行编码。然后,您可以在从 Bigtable 读取数据或向其写入数据时使用 .option("avroSchema", avroSchemaString)
,以字符串格式指定与相应列对应的 Avro 架构。您可以为不同的列使用不同的选项名称(例如 "anotherAvroSchema"
),并传递多个列的 Avro 架构。
def catalogWithAvroColumn = s"""{
|"table":{"name":"ExampleAvroTable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
使用客户端指标
由于 Bigtable Spark 连接器基于 Java 版 Bigtable 客户端,因此默认情况下会在连接器内启用客户端指标。您可以参阅客户端指标文档,详细了解如何访问和解读这些指标。
将 Bigtable Java 客户端与低级 RDD 函数搭配使用
由于 Bigtable Spark 连接器基于 Bigtable Java 客户端,因此您可以在 Spark 应用中直接使用该客户端,并在 mapPartitions
和 foreachPartition
等低级 RDD 函数中执行分布式读取或写入请求。
如需使用 Java 类的 Bigtable 客户端,请在软件包名称中添加 com.google.cloud.spark.bigtable.repackaged
前缀。例如,使用 com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient
而不是 com.google.cloud.bigtable.data.v2.BigtableDataClient
作为类名称。
如需详细了解 Java 版 Bigtable 客户端,请参阅 Java 版 Bigtable 客户端。
后续步骤
- 了解如何在 Dataproc 中调整 Spark 作业。
- 将 Java 版 Bigtable 客户端中的类与 Bigtable Spark 连接器搭配使用。