使用 Spark BigQuery 连接器

您可以将 spark-bigquery-connectorApache Spark 搭配使用,以从 BigQuery 中读取数据以及将数据写入其中。该连接器在从 BigQuery 读取数据时利用 BigQuery Storage API

本教程介绍了预安装连接器的可用性,并展示了如何让 Spark 作业使用特定版本的连接器。示例代码展示了如何在 Spark 应用中使用 Spark BigQuery 连接器。

使用预安装的连接器

Spark BigQuery 连接器预安装在使用 2.1 及更高版本映像创建的 Dataproc 集群上,并可供这些集群上的 Spark 作业使用。预安装的连接器版本会列在每个映像版本发布页面上。例如,2.2.x 映像发布版本页面上的 BigQuery 连接器行显示了最新 2.2 映像版本上安装的连接器版本。

让 Spark 作业使用特定版本的连接器

如果您想使用与 2.1 或更高版本映像集群上预安装版本不同的连接器版本,或者想在 2.1 之前的映像版本集群上安装连接器,请按照本部分中的说明操作。

重要提示:spark-bigquery-connector 版本必须与 Dataproc 集群映像版本兼容。请参阅连接器与 Dataproc 映像兼容性矩阵

2.1 及更高版本的映像版本集群

使用 2.1 或更高版本的映像创建 Dataproc 集群时,将连接器版本指定为集群元数据

gcloud CLI 示例:

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --image-version=2.2 \
    --metadata=SPARK_BQ_CONNECTOR_VERSION or SPARK_BQ_CONNECTOR_URL\
    other flags

注意:

  • SPARK_BQ_CONNECTOR_VERSION:指定连接器版本。GitHub 中的 spark-bigquery-connector/releases 页面上列出了 Spark BigQuery 连接器版本。

    示例:

    --metadata=SPARK_BQ_CONNECTOR_VERSION=0.42.1
    
  • SPARK_BQ_CONNECTOR_URL:指定指向 Cloud Storage 中的 jar 文件的网址。您可以指定 GitHub 中下载和使用连接器页面链接列中列出的连接器的网址,也可以指定您放置自定义连接器 jar 文件的 Cloud Storage 位置的路径。

    示例:

    --metadata=SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar
    --metadata=SPARK_BQ_CONNECTOR_URL=gs://PATH_TO_CUSTOM_JAR
    

2.0 及更早版本的映像版本集群

您可以通过以下任意一种方式将 Spark BigQuery 连接器提供给您的应用:

  1. 在创建集群时,使用 Dataproc 连接器初始化操作,在每个节点的 Spark jars 目录中安装 spark-bigquery-connector。

  2. 使用 Google Cloud 控制台、gcloud CLI 或 Dataproc API 将作业提交到集群时,请提供连接器 jar 文件网址。

    控制台

    使用 Dataproc 提交作业页面上的 Spark 作业 Jars 文件项。

    gcloud

    使用 gcloud dataproc jobs submit spark --jars 标志

    API

    使用 SparkJob.jarFileUris 字段

    如何在 2.0 之前的映像版本集群上指定连接器 jar 来运行 Spark 作业

    • 通过在以下 URI 字符串中替换 Scala 和连接器版本信息来指定连接器 jar:
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      
    • 将 Scala 2.12 与 Dataproc 映像版本 1.5+ 搭配使用
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
      
      gcloud CLI 示例:
      gcloud dataproc jobs submit spark \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
          -- job args
      
    • 将 Scala 2.11 与 Dataproc 映像版本 1.4 及更低版本搭配使用:
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
      
      gcloud CLI 示例:
      gcloud dataproc jobs submit spark \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
          -- job-args
      
  3. 将连接器 jar 作为依赖项添加到 Scala 或 Java Spark 应用中(请参阅针对连接器进行编译)。

计算费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

  • Dataproc
  • BigQuery
  • Cloud Storage

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

从 BigQuery 读取并写入数据

此示例展示如何将 BigQuery 中的数据读取到 Spark DataFrame 中,以使用标准数据源 API执行字数统计操作。

连接器通过先将所有数据缓冲到 Cloud Storage 临时表中,再将数据写入 BigQuery。然后,它通过一次操作将所有数据复制到 BigQuery。在 BigQuery 加载操作成功并且当 Spark 应用终止时再次成功之后,连接器便会尝试删除临时文件。如果作业失败,请移除任何剩余的临时 Cloud Storage 文件。通常,临时 BigQuery 文件位于 gs://[bucket]/.spark-bigquery-[jobid]-[UUID] 中。

配置结算功能

默认情况下,与凭证或服务账号关联的项目将为 API 使用支付费用。要对其他项目计费,请设置以下配置:spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")

还可以将其添加到读/写操作,如下所示:.option("parentProject", "<BILLED-GCP-PROJECT>")

运行代码

在运行此示例之前,创建名为“wordcount_dataset”的数据集,或将代码中的输出数据集更改为您Google Cloud 项目中的现有 BigQuery 数据集。

使用 bq 命令创建 wordcount_dataset

bq mk wordcount_dataset

使用 Google Cloud CLI 命令创建 Cloud Storage 存储桶,并将其用于导出到 BigQuery:

gcloud storage buckets create gs://[bucket]

Scala

  1. 检查代码,并将 [bucket] 占位符替换为您之前创建的 Cloud Storage 存储桶。
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      (spark.read.format("bigquery")
      .option("table","bigquery-public-data:samples.shakespeare")
      .load()
      .cache())
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .option("table","wordcount_dataset.wordcount_output")
      .save())
  2. 在集群上运行代码
    1. 使用 SSH 连接到 Dataproc 集群主节点
      1. 在 Google Cloud 控制台中前往 Dataproc 集群页面,然后点击您的集群名称
        Cloud 控制台中的 Dataproc“集群”页面。
      2. >集群详情页面上,选择“虚拟机实例”标签页。然后,点击集群主节点名称右侧的 SSH
        Cloud 控制台中的 Dataproc“集群详情”页面。

        此时会在主节点上的主目录打开一个浏览器窗口
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 使用预装的 vivimnano 文本编辑器创建 wordcount.scala,然后粘贴 Scala 代码列表中的 Scala 代码
      nano wordcount.scala
        
    3. 启动 spark-shell REPL。
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. 运行 wordcount.scala 并使用 :load wordcount.scala 命令创建 BigQuery wordcount_output 表。输出列表将显示来自 wordcount 输出的 20 行内容。
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      要预览输出表,请打开 BigQuery 页面,选择 wordcount_output 表,然后点击预览
      在 Cloud 控制台的 BigQuery 探索器页面中预览表。

PySpark

  1. 检查代码,并将 [bucket] 占位符替换为您之前创建的 Cloud Storage 存储桶。
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
  2. 在集群上运行代码
    1. 使用 SSH 连接到 Dataproc 集群主节点
      1. 在 Google Cloud 控制台中前往 Dataproc 集群页面,然后点击您的集群名称
        Cloud 控制台中的“集群”页面。
      2. 集群详情页面上,选择“虚拟机实例”标签页。然后,点击集群主节点名称右侧的 SSH
        在 Cloud 控制台的“集群详情”页面上,选择“集群名称”行中的“SSH”。

        此时会在主节点上的主目录打开一个浏览器窗口
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 使用预安装的 vivimnano 文本编辑器创建 wordcount.py,然后粘贴 PySpark 代码列表中的 PySpark 代码
      nano wordcount.py
      
    3. 使用 spark-submit 运行 wordcount 以创建 BigQuery wordcount_output 表。输出列表将显示来自 wordcount 输出的 20 行内容。
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      要预览输出表,请打开 BigQuery 页面,选择 wordcount_output 表,然后点击预览
      在 Cloud 控制台的 BigQuery 探索器页面中预览表。

了解详情