使用 Bigtable Spark 连接器

借助 Bigtable Spark 连接器,您可以往返 Bigtable 读写数据。您可以使用 Spark SQL 和 DataFrame 从 Spark 应用中读取数据。使用 Bigtable Spark 连接器支持以下 Bigtable 操作:

  • 写入数据
  • 读取数据
  • 创建新表

本文档介绍了如何将 Spark SQL DataFrame 表转换为 Bigtable 表,然后编译并创建 JAR 文件以提交 Spark 作业。

Spark 和 Scala 支持状态

Bigtable Spark 连接器仅支持 Scala 2.12 版本以及以下 Spark 版本:

Bigtable Spark 连接器支持以下 Dataproc 版本:

计算费用

如果您决定使用 Google Cloud 的以下任一收费组件,则需要为所使用的资源付费:

  • Bigtable(使用 Bigtable 模拟器无需付费)
  • Dataproc
  • Cloud Storage

Dataproc 价格适用于在 Compute Engine 集群上使用 Dataproc。Dataproc Serverless 定价适用于在 Dataproc Serverless for Spark 上运行的工作负载和会话。

您可使用价格计算器根据您的预计使用量来估算费用。

准备工作

在使用 Bigtable Spark 连接器之前,请先完成以下前提条件。

所需的角色

如需获取使用 Bigtable Spark 连接器所需的权限,请让您的管理员为您授予项目的以下 IAM 角色:

  • Bigtable Administrator (roles/bigtable.admin)(可选):您可以使用此角色读取或写入数据以及创建新表。
  • Bigtable User (roles/bigtable.user):允许您读取或写入数据,但不允许您创建新表。

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

如果您使用的是 Dataproc 或 Cloud Storage,则可能需要额外的权限。如需了解详情,请参阅 Dataproc 权限Cloud Storage 权限。

设置 Spark

除了创建 Bigtable 实例之外,您还需要设置 Spark 实例。您可以在本地执行此操作,也可以选择以下任一选项将 Spark 与 Dataproc 搭配使用:

  • Dataproc 集群
  • Dataproc Serverless

如需详细了解如何在 Dataproc 集群和无服务器选项之间进行选择,请参阅 Dataproc Serverless for Spark 与 Dataproc on Compute Engine 文档。

下载连接器 JAR 文件

您可以在 Bigtable Spark 连接器 GitHub 代码库中找到 Bigtable Spark 连接器源代码及示例。

根据您的 Spark 设置,您可以按如下方式访问 JAR 文件:

  • 如果您在本地运行 PySpark,则应从 gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar Cloud Storage 位置下载连接器的 JAR 文件。

    SCALA_VERSION 替换为 Scala 版本,将其设为 2.12 作为唯一受支持的版本,并将 CONNECTOR_VERSION 替换为您要使用的连接器版本。

  • 对于 Dataproc 集群或无服务器选项,请将最新的 JAR 文件用作可添加到 Scala 或 Java Spark 应用中的工件。如需详细了解如何将 JAR 文件用作工件,请参阅管理依赖项

  • 如果您要将 PySpark 作业提交到 Dataproc,请使用 gcloud dataproc jobs submit pyspark --jars 标志将 URI 设置为 Cloud Storage 中的 JAR 文件位置,例如 gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar

确定计算类型

对于只读作业,您可以使用 Data Boost(预览版)无服务器计算,以免影响应用服务集群。您的 Spark 应用必须使用 1.1.0 或更高版本的 Spark 连接器才能使用数据提升功能。

如需使用 Data Boost,您必须创建 Data Boost 应用配置文件,然后在将 Bigtable 配置添加到 Spark 应用时为 spark.bigtable.app_profile.id Spark 选项提供应用配置文件 ID。如果您已为 Spark 读取作业创建了应用配置文件,并且希望在不更改应用代码的情况下继续使用该配置文件,则可以将该应用配置文件转换为 Data Boost 应用配置文件。如需了解详情,请参阅转换应用配置文件

如需了解详情,请参阅 Bigtable Data Boost 概览

对于涉及读取和写入的作业,您可以通过在请求中指定标准应用配置文件,使用实例的集群节点进行计算。

确定或创建要使用的应用配置文件

如果您未指定应用配置文件 ID,则连接器会使用默认应用配置文件。

我们建议您为您运行的每个应用(包括 Spark 应用)使用各自不同的应用配置文件。如需详细了解应用配置文件类型和设置,请参阅应用配置文件概览。如需查看相关说明,请参阅创建和配置应用配置文件

将 Bigtable 配置添加到 Spark 应用

在 Spark 应用中,添加可让您与 Bigtable 交互的 Spark 选项。

支持的 Spark 选项

使用 com.google.cloud.spark.bigtable 软件包中提供的 Spark 选项。

选项名称 必填 默认值 含义
spark.bigtable.project.id 不适用 设置 Bigtable 项目 ID。
spark.bigtable.instance.id 不适用 设置 Bigtable 实例 ID。
catalog 不适用 设置 JSON 格式,用于指定 DataFrame 的类 SQL 架构与 Bigtable 表的架构之间的转换格式。

如需了解详情,请参阅以 JSON 格式创建表元数据
spark.bigtable.app_profile.id default 设置 Bigtable 应用配置文件 ID。
spark.bigtable.write.timestamp.milliseconds 当前系统时间 设置要在将 DataFrame 写入 Bigtable 时使用的毫秒级时间戳。

请注意,由于 DataFrame 中的所有行都使用相同的时间戳,因此 DataFrame 中具有相同行键列的行会在 Bigtable 中保留为单个版本,因为它们具有相同的时间戳。
spark.bigtable.create.new.table false 设置为 true 以便在写入 Bigtable 之前创建新表。
spark.bigtable.read.timerange.start.millisecondsspark.bigtable.read.timerange.end.milliseconds 不适用 分别设置时间戳(以从公元纪年开始计算的毫秒数为单位),以过滤具有特定开始日期和结束日期的单元格。
spark.bigtable.push.down.row.key.filters true 设置为 true 可允许在服务器端进行简单的行键过滤。对复合行键进行过滤是在客户端实现的。

如需了解详情,请参阅使用过滤条件读取特定 DataFrame 行
spark.bigtable.read.rows.attempt.timeout.milliseconds 30m 在 Java 版 Bigtable 客户端中,为与一个 DataFrame 分区对应的读取行尝试设置超时时长。
spark.bigtable.read.rows.total.timeout.milliseconds 12 小时 在 Java 版 Bigtable 客户端中,为与一个 DataFrame 分区对应的读取行尝试设置超时时长。
spark.bigtable.mutate.rows.attempt.timeout.milliseconds 1 分钟 在 Java 版 Bigtable 客户端中,为与一个 DataFrame 分区对应的行更改尝试设置超时时长。
spark.bigtable.mutate.rows.total.timeout.milliseconds 10 分钟 在 Java 版 Bigtable 客户端中,为与一个 DataFrame 分区对应的行更改尝试设置超时时长。
spark.bigtable.batch.mutate.size 100 设置为每个批次中的更改数量。您可以设置的最大值为 100000
spark.bigtable.enable.batch_mutate.flow_control false 设置为 true 可为批量更改启用流控制

以 JSON 格式创建表元数据

必须使用 JSON 格式的字符串将 Spark SQL DataFrame 表格式转换为 Bigtable 表格式。这种字符串 JSON 格式使数据格式与 Bigtable 兼容。您可以使用 .option("catalog", catalog_json_string) 选项在应用代码中传递 JSON 格式。

例如,请考虑以下 DataFrame 表和相应的 Bigtable 表。

在此示例中,DataFrame 中的 namebirthYear 列会在 info 列族下归为一组,并分别重命名为 namebirth_year。同样,address 列也存储在 location 列族下,具有相同的列名称。DataFrame 中的 id 列会转换为 Bigtable 行键。

行键在 Bigtable 中没有专用列名称,在本例中,id_rowkey 仅用于向连接器表明这是行键列。您可以为行键列使用任何名称,并确保在 JSON 格式中声明 "rowkey":"column_name" 字段时使用相同的名称。

DataFrame Bigtable 表 = t1
行键 列族
信息 location
id name birthYear 地址 id_rowkey name birth_year 地址

目录的 JSON 格式如下:

    """
    {
      "table": {"name": "t1"},
      "rowkey": "id_rowkey",
      "columns": {
        "id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
        "name": {"cf": "info", "col": "name", "type": "string"},
        "birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
        "address": {"cf": "location", "col": "address", "type": "string"}
      }
    }
    """

JSON 格式中使用的键和值如下所示:

目录键 目录价值 JSON 格式
Bigtable 表的名称。 "table":{"name":"t1"}

如果该表不存在,请使用 .option("spark.bigtable.create.new.table", "true") 创建表。
rowkey 将用作 Bigtable 行键的列的名称。确保将 DataFrame 列的列名称用作行键,例如 id_rowkey

复合键也可用作行键。例如 "rowkey":"name:address"。这种方法可能会导致行键需要针对所有读取请求执行全表扫描。
"rowkey":"id_rowkey"
将每个 DataFrame 列映射到相应的 Bigtable 列族 ("cf") 和列名称 ("col")。列名称可以与 DataFrame 表中的列名称不同。支持的数据类型包括 stringlongbinary "columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}"

在此示例中,id_rowkey 是行键,infolocation 是列族。

支持的数据类型

该连接器支持在目录中使用 stringlongbinary(字节数组)类型。在添加对 intfloat 等其他类型的支持之前,您可以先手动将此类数据类型转换为字节数组(Spark SQL 的 BinaryType),然后再使用连接器将其写入 Bigtable。

此外,您还可以使用 Avro 序列化复杂类型,例如 ArrayType。如需了解详情,请参阅使用 Apache Avro 序列化复杂数据类型

写入 Bigtable

使用 .write() 函数和受支持的选项将数据写入 Bigtable。

Java

GitHub 代码库中的以下代码使用 Java 和 Maven 写入 Bigtable。

  String catalog = "{" +
        "\"table\":{\"name\":\"" + tableName + "\"," +
        "\"tableCoder\":\"PrimitiveType\"}," +
        "\"rowkey\":\"wordCol\"," +
        "\"columns\":{" +
        "\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
        "\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
        "}}".replaceAll("\\s+", "");



  private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
        String createNewTable) {
      dataframe
          .write()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .option("spark.bigtable.create.new.table", createNewTable)
          .save();
    }

Python

GitHub 代码库中的以下代码使用 Python 写入 Bigtable。

  catalog = ''.join(("""{
        "table":{"name":" """ + bigtable_table_name + """
        ", "tableCoder":"PrimitiveType"},
        "rowkey":"wordCol",
        "columns":{
          "word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
          "count":{"cf":"example_family", "col":"countCol", "type":"long"}
        }
        }""").split())
  

  input_data = spark.createDataFrame(data)
  print('Created the DataFrame:')
  input_data.show()

  input_data.write \
        .format('bigtable') \
        .options(catalog=catalog) \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .option('spark.bigtable.create.new.table', create_new_table) \
        .save()
  print('DataFrame was written to Bigtable.')

  

从 Bigtable 中读取

使用 .read() 函数检查表是否已成功导入 Bigtable。

Java

  
  private static Dataset<Row> readDataframeFromBigtable(String catalog) {
      Dataset<Row> dataframe = spark
          .read()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .load();
      return dataframe;
    }

Python

  

  records = spark.read \
        .format('bigtable') \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .options(catalog=catalog) \
        .load()

  print('Reading the DataFrame from Bigtable:')
  records.show()

编译项目

生成用于在 Dataproc 集群、Dataproc Serverless 或本地 Spark 实例中运行作业的 JAR 文件。您可以在本地编译 JAR 文件,然后使用该文件提交作业。提交作业时,编译后的 JAR 文件的路径会设置为 PATH_TO_COMPILED_JAR 环境变量。

此步骤不适用于 PySpark 应用。

管理依赖项

Bigtable Spark 连接器支持以下依赖项管理工具:

编译 JAR 文件

Maven

  1. spark-bigtable 依赖项添加到 pom.xml 文件中。

    <dependencies>
    <dependency>
      <groupId>com.google.cloud.spark.bigtable</groupId>
      <artifactId>spark-bigtable_SCALA_VERSION</artifactId>
      <version>0.1.0</version>
    </dependency>
    </dependencies>
    
  2. Maven Shade 插件添加到 pom.xml 文件以创建超级 JAR:

    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
    
  3. 运行 mvn clean install 命令以生成 JAR 文件。

sbt

  1. spark-bigtable 依赖项添加到 build.sbt 文件中:

    libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
  2. sbt-assembly 插件添加到 project/plugins.sbtproject/assembly.sbt 文件,以创建 Uber JAR 文件。

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
  3. 运行 sbt clean assembly 命令以生成 JAR 文件。

Gradle

  1. spark-bigtable 依赖项添加到 build.gradle 文件中。

    dependencies {
    implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0'
    }
  2. build.gradle 文件中添加 Shadow 插件,以创建超级 JAR 文件:

    plugins {
    id 'com.github.johnrengelman.shadow' version '8.1.1'
    id 'java'
    }
  3. 如需了解更多配置和 JAR 编译信息,请参阅 Shadow 插件文档。

提交作业

使用 Dataproc、Dataproc Serverless 或本地 Spark 实例提交 Spark 作业以启动应用。

设置运行时环境

设置以下环境变量。

      #Google Cloud
      export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
      export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
      export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
      export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
      export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
      export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE

      #Dataproc Serverless
      export BIGTABLE_SPARK_SUBNET=SUBNET
      export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME

      #Scala/Java
      export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR

      #PySpark
      export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
      export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
      export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR

替换以下内容:

  • PROJECT_ID:Bigtable 项目的永久标识符。
  • INSTANCE_ID:Bigtable 实例的永久性标识符。
  • TABLE_NAME:表的永久标识符。
  • DATAPROC_CLUSTER:Dataproc 集群的永久性标识符。
  • DATAPROC_REGION:包含 Dataproc 实例中某一集群的 Dataproc 区域,例如 northamerica-northeast2
  • DATAPROC_ZONE:Dataproc 集群运行所在的区域。
  • SUBNET子网的完整资源路径。
  • GCS_BUCKET_NAME:用于上传 Spark 工作负载依赖项的 Cloud Storage 存储桶。
  • PATH_TO_COMPILED_JAR:已编译 JAR 文件的绝对或相对路径,例如 Maven 的 /path/to/project/root/target/<compiled_JAR_name>
  • GCS_PATH_TO_CONNECTOR_JARspark-bigtable_SCALA_VERSION_CONNECTOR_VERSION.jar 文件所在的 gs://spark-lib/bigtable Cloud Storage 存储桶。
  • PATH_TO_PYTHON_FILE:对于 PySpark 应用,用于向 Bigtable 写入和从 Bigtable 读取数据的 Python 文件的路径。
  • LOCAL_PATH_TO_CONNECTOR_JAR:对于 PySpark 应用,下载的 Bigtable Spark 连接器 JAR 文件的路径。

提交 Spark 作业

对于 Dataproc 实例或本地 Spark 设置,请运行 Spark 作业以将数据上传到 Bigtable。

Dataproc 集群

使用编译后的 JAR 文件,创建一个 Dataproc 集群作业,以便从 Bigtable 读取数据以及将数据写入 Bigtable。

  1. 创建 Dataproc 集群。以下示例展示了一个示例命令,用于创建一个包含 Debian 10、两个工作器节点和默认配置的 Dataproc v2.0 集群。

    gcloud dataproc clusters create \
      $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \
      --zone $BIGTABLE_SPARK_DATAPROC_ZONE \
      --master-machine-type n2-standard-4 --master-boot-disk-size 500 \
      --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \
      --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
    
  2. 提交作业。

    Scala/Java

    以下示例展示了 spark.bigtable.example.WordCount 类,其中包含在 DataFrame 中创建测试表、将表写入 Bigtable,然后统计表中单词数量的逻辑。

        gcloud dataproc jobs submit spark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --class=spark.bigtable.example.WordCount \
        --jar=$PATH_TO_COMPILED_JAR \
        -- \
        $BIGTABLE_SPARK_PROJECT_ID \
        $BIGTABLE_SPARK_INSTANCE_ID \
        $BIGTABLE_SPARK_TABLE_NAME \
    

    PySpark

        gcloud dataproc jobs submit pyspark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --jars=$GCS_PATH_TO_CONNECTOR_JAR \
        --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
        $PATH_TO_PYTHON_FILE \
        -- \
        --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
        --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
        --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
    

Dataproc Serverless

使用已编译的 JAR 文件,创建一个 Dataproc 作业,以便使用 Dataproc Serverless 实例在 Bigtable 中读取和写入数据。

Scala/Java

  gcloud dataproc batches submit spark \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
  --  \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
  --jars=$GCS_PATH_TO_CONNECTOR_JAR \
  --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
  -- \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

本地 Spark

使用下载的 JAR 文件,创建一个 Spark 作业,以便使用本地 Spark 实例在 Bigtable 中读取和写入数据。您还可以使用 Bigtable 模拟器提交 Spark 作业。

使用 Bigtable 模拟器

如果您决定使用 Bigtable 模拟器,请按以下步骤操作:

  1. 运行以下命令启动模拟器:

    gcloud beta emulators bigtable start
    

    默认情况下,该模拟器会选择 localhost:8086

  2. 设置 BIGTABLE_EMULATOR_HOST 环境变量:

    export BIGTABLE_EMULATOR_HOST=localhost:8086
    
  3. 提交 Spark 作业。

如需详细了解如何使用 Bigtable 模拟器,请参阅使用模拟器进行测试

提交 Spark 作业

无论您是使用本地 Bigtable 模拟器还是云端 Bigtable 模拟器,都可以使用 spark-submit 命令提交 Spark 作业。

Scala/Java

  spark-submit $PATH_TO_COMPILED_JAR \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  spark-submit \
  --jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
  --packages=org.slf4j:slf4j-reload4j:1.7.36 \
  $PATH_TO_PYTHON_FILE \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

验证表数据

运行以下 cbt CLI 命令,验证数据是否已写入 Bigtable。cbt CLI 是 Google Cloud CLI 的一个组件。如需了解详情,请参阅 cbt CLI 概览

    cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
    read $BIGTABLE_SPARK_TABLE_NAME

其他解决方案

使用 Bigtable Spark 连接器可实现特定解决方案,例如序列化复杂的 Spark SQL 类型、读取特定行以及生成客户端指标。

使用过滤条件读取特定 DataFrame 行

使用 DataFrame 从 Bigtable 读取数据时,您可以指定过滤条件,以便仅读取特定行。系统会在服务器端应用行键列上的简单过滤条件(例如 ==<=startsWith),以避免执行全表扫描。对复合行键的过滤条件或复杂过滤条件(例如对行键列的 LIKE 过滤条件)会在客户端应用。

如果您要读取大型表,我们建议您使用简单的行键过滤条件,以免执行全表扫描。以下示例语句展示了如何使用简单的过滤器进行读取。确保在 Spark 过滤器中,您使用的是转换为行键的 DataFrame 列的名称:

    dataframe.filter("id == 'some_id'").show()
  

应用过滤条件时,请使用 DataFrame 列名称,而不是 Bigtable 表列名称。

使用 Apache Avro 序列化复杂数据类型

Bigtable Spark 连接器支持使用 Apache Avro 序列化复杂的 Spark SQL 类型,例如 ArrayTypeMapTypeStructType。Apache Avro 为记录数据提供数据序列化功能,这些数据通常用于处理和存储复杂的数据结构。

使用 "avro":"avroSchema" 等语法指定 Bigtable 中的某个列应使用 Avro 进行编码。然后,在从 Bigtable 读取或向其写入数据时,您可以使用 .option("avroSchema", avroSchemaString) 以字符串格式指定与该列对应的 Avro 架构。您可以使用不同的选项名称(例如,为不同的列使用 "anotherAvroSchema"),并为多个列传递 Avro 架构。

def catalogWithAvroColumn = s"""{
                    |"table":{"name":"ExampleAvroTable"},
                    |"rowkey":"key",
                    |"columns":{
                    |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                    |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
                    |}
                    |}""".stripMargin

使用客户端指标

由于 Bigtable Spark 连接器基于 Java 版 Bigtable 客户端,因此客户端指标默认在连接器内启用。如需详细了解如何访问和解读这些指标,请参阅客户端指标文档。

将 Java 版 Bigtable 客户端与低级 RDD 函数搭配使用

由于 Bigtable Spark 连接器基于 Java 版 Bigtable 客户端,因此您可以在 Spark 应用中直接使用该客户端,并在低级 RDD 函数(例如 mapPartitionsforeachPartition)中执行分布式读写请求。

如需针对 Java 类使用 Bigtable 客户端,请将 com.google.cloud.spark.bigtable.repackaged 前缀附加到软件包名称。例如,请使用 com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient,而不是将类名称设为 com.google.cloud.bigtable.data.v2.BigtableDataClient

如需详细了解 Java 版 Bigtable 客户端,请参阅 Java 版 Bigtable 客户端

后续步骤