将 Spanner 连接器与 Spark 搭配使用

本页介绍了如何使用 Spark Spanner 连接器使用 Apache SparkSpanner 读取数据

计算费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

  • Dataproc
  • Spanner
  • Cloud Storage

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

在运行本教程之前,请务必了解连接器版本并获取连接器 URI。

如何指定连接器 JAR 文件 URI

GitHub GoogleCloudDataproc/spark-spanner-connector 代码库中列出了 Spark Spanner 连接器版本。

通过在以下 URI 字符串中替换连接器版本信息,指定连接器 JAR 文件:

gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar

该连接器适用于 Spark 版本 3.1+

gcloud CLI 示例:

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \
    -- job-args
  

准备 Spanner 数据库

如果您没有 Spanner 表,可以按照教程创建 Spanner 表。之后,您将获得实例 ID、数据库 ID 和表 Singers

创建 Dataproc 集群

使用该连接器的任何 Dataproc 集群都需要 spannercloud-platform 镜重。对于映像 2.1 或更高版本,Dataproc 集群的默认范围为 cloud-platform。如果您使用的是较低版本,则可以使用 Google Cloud 控制台、Google Cloud CLI 和 Dataproc API 创建 Dataproc 集群。

控制台

  1. 在 Google Cloud 控制台中,打开 Dataproc 创建集群页面
  2. 在“管理安全”标签页中,点击“项目访问权限”部分下的“为此集群启用 cloud-platform 范围”。
  3. 填写或确认其他集群创建字段,然后点击“创建”。

Google Cloud CLI

gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

API

您可以在 clusters.create 请求中指定 GceClusterConfig.serviceAccountScopes。例如:
        "serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
    

您必须确保向 Dataproc 虚拟机服务账号分配了相应的 Spanner 权限。如果您在本教程中使用 Data Boost,请参阅 Data Boost IAM 权限

从 Spanner 读取数据

您可以使用 Scala 和 Python 通过 Spark 数据源 API 将数据从 Spanner 读取到 Spark DataFrame。

Scala

  1. 检查代码,并将 [projectId][instanceId][databaseId][table] 占位符替换为您之前创建的项目 ID、实例 ID、数据库 ID 和表。enableDataBoost 选项用于启用 Spanner Data Boost 功能,该功能对主 Spanner 实例的影响几乎为零。
    object singers {
      def main(): Unit = {
        /*
         * Remove comment if you are not running in spark-shell.
         *
        import org.apache.spark.sql.SparkSession
        val spark = SparkSession.builder()
          .appName("spark-spanner-demo")
          .getOrCreate()
        */
    
        // Load data in from Spanner. See
        // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
        // for option information.
        val singersDF =
          (spark.read.format("cloud-spanner")
            .option("projectId", "[projectId]")
            .option("instanceId", "[instanceId]")
            .option("databaseId", "[databaseId]")
            .option("enableDataBoost", true)
            .option("table", "[table]")
            .load()
            .cache())
    
        singersDF.createOrReplaceTempView("Singers")
    
        // Load the Singers table.
        val result = spark.sql("SELECT * FROM Singers")
        result.show()
        result.printSchema()
      }
    }
  2. 在集群上运行代码
    1. 使用 SSH 连接到 Dataproc 集群主服务器实例节点
      1. 前往 Google Cloud 控制台中的 Dataproc 集群页面,然后点击集群的名称
        Cloud 控制台中的 Dataproc 集群页面。
      2. > 集群详情页面上,选择“虚拟机实例”标签页。然后,点击集群主服务器节点名称右侧的 SSH
        Cloud 控制台中的 Dataproc 集群详情页面。

        此时会打开一个浏览器窗口并显示主节点上的主目录
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 使用预装的 vivimnano 文本编辑器创建 singers.scala,然后粘贴 Scala 代码列表中的 Scala 代码
      nano singers.scala
        
    3. 启动 spark-shell REPL。
      $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
      
    4. 运行 singers.scala 并使用 :load singers.scala 命令创建 Spanner Singers 表。输出列表将显示来自 Singers 输出的示例。
      > :load singers.scala
      Loading singers.scala...
      defined object singers
      > singers.main()
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
       

PySpark

  1. 检查代码,并将 [projectId][instanceId][databaseId][table] 占位符替换为您之前创建的项目 ID、实例 ID、数据库 ID 和表。enableDataBoost 选项用于启用 Spanner Data Boost 功能,该功能对主 Spanner 实例的影响几乎为零。
    #!/usr/bin/env python
    
    """Spanner PySpark read example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-spanner-demo') \
      .getOrCreate()
    
    # Load data from Spanner.
    singers = spark.read.format('cloud-spanner') \
      .option("projectId", "[projectId]") \
      .option("instanceId", "[instanceId]") \
      .option("databaseId", "[databaseId]") \
      .option("enableDataBoost", "true") \
      .option("table", "[table]") \
      .load()
    singers.createOrReplaceTempView('Singers')
    
    # Read from Singers
    result = spark.sql('SELECT * FROM Singers')
    result.show()
    result.printSchema()
  2. 在集群上运行代码
    1. 使用 SSH 连接到 Dataproc 集群主服务器实例节点
      1. 前往 Google Cloud 控制台中的 Dataproc 集群页面,然后点击集群的名称
        Cloud 控制台中的“集群”页面。
      2. 集群详情页面上,选择“虚拟机实例”标签页。然后,点击集群主服务器节点名称右侧的 SSH
        在 Cloud 控制台中的“集群详情”页面上,选择集群名称行中的“SSH”。

        此时会打开一个浏览器窗口并显示主节点上的主目录
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. 使用预安装的 vivimnano 文本编辑器创建 singers.py,然后粘贴 PySpark 代码列表中的 PySpark 代码
      nano singers.py
      
    3. 使用 spark-submit 运行 singers.py 以创建 Spanner Singers 表。
      spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      
      输出结果如下:
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
      only showing top 20 rows
      

清理

为进行清理并避免系统因本演示中创建的资源向您的 Google Cloud 账号持续收取费用,请按照以下步骤操作。

gcloud dataproc clusters stop CLUSTER_NAME
gcloud dataproc clusters delete CLUSTER_NAME

了解详情