Use the Spanner connector with Spark

This page shows you how to create a Dataproc cluster that uses the Spark Spanner Connector to read data from Spanner using Apache Spark

Calculate costs

In this document, you use the following billable components of Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

Before you begin

Before using the Spanner connector in this tutorial, set up a Dataproc cluster and a Spanner instance and database.

Set up a Dataproc cluster

Create a Dataproc cluster or use an existing Dataproc cluster that has the following settings:

Set up a Spanner instance with a Singers database table

Create a Spanner instance with a database that contains a Singers table. Note the Spanner instance ID and database ID.

Use the Spanner connector with Spark

The Spanner connector is available for Spark versions 3.1+. You specify the connector version as part of the Cloud Storage connector JAR file specification when you submit a job to a Dataproc cluster.

Example: gcloud CLI Spark job submission with the Spanner connector.

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \
    ... [other job submission flags]
  

Replace the following:

CONNECTOR_VERSION: Spanner connector version. Choose the Spanner connector version from the version list in the GitHub GoogleCloudDataproc/spark-spanner-connector repository.

Read data from Spanner

You can use Python or Scala to read data from Spanner into a Spark Dataframe using the Spark data source API.

PySpark

You can run the example PySpark code in this section on your cluster by submitting the job to the Dataproc service or by running the job from the spark-submit REPL on the cluster master node.

Dataproc job

  1. Create a singers.py file in using a local text editor or in Cloud Shell using the pre-installed vi, vim, or nano text editor.
    1. Paste the following code into the singers.py file. Note that the Spanner Data Boost feature is enabled, which has near-zero impact on the main Spanner instance.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
          

      Replace the following:

      1. PROJECT_ID: Your Google Cloud project ID. Project IDs are listed in the Project info section on the Google Cloud console Dashboard.
      2. INSTANCE_ID, DATABASE_ID, and TABLE_NAME : See Set up a Spanner instance with Singers database table.
    2. Save the singers.py file.
  2. Submit the job to the Dataproc service using the Google Cloud console, gcloud CLI or Dataproc API.

    Example: gcloud CLI job submission with the Spanner connector.

    gcloud dataproc jobs submit pyspark singers.py \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION
          

    Replace the following:

    1. CLUSTER_NAME: The name of the new cluster.
    2. REGION: An available Compute Engine region to run the workload.
    3. CONNECTOR_VERSION: Spanner connector version. Choose the Spanner connector version from the version list in the GitHub GoogleCloudDataproc/spark-spanner-connector repository.

spark-submit job

  1. Connect to the Dataproc cluster master node using SSH.
    1. Go to the Dataproc Clusters page in the Google Cloud console, then click the name of your cluster.
    2. On the Cluster details page, select the VM Instances tab. Then click SSH to the right of the name of the cluster master node.
      Dataproc Cluster details page in the Cloud console.

      A browser window opens at your home directory on the master node.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Create a singers.py file on the master node using the pre-installed vi, vim, or nano text editor.
    1. Paste the following code into the singers.py file. Note that the Spanner Data Boost feature is enabled, which has near-zero impact on the main Spanner instance.
      #!/usr/bin/env python
      
      """Spanner PySpark read example."""
      
      from pyspark.sql import SparkSession
      
      spark = SparkSession \
        .builder \
        .master('yarn') \
        .appName('spark-spanner-demo') \
        .getOrCreate()
      
      # Load data from Spanner.
      singers = spark.read.format('cloud-spanner') \
        .option("projectId", "PROJECT_ID") \
        .option("instanceId", "INSTANCE_ID") \
        .option("databaseId", "DATABASE_ID") \
        .option("table", "TABLE_NAME") \
        .option("enableDataBoost", "true") \
        .load()
      singers.createOrReplaceTempView('Singers')
      
      # Read from Singers
      result = spark.sql('SELECT * FROM Singers')
      result.show()
      result.printSchema()
        

      Replace the following:

      1. PROJECT_ID: Your Google Cloud project ID. Project IDs are listed in the Project info section on the Google Cloud console Dashboard.
      2. INSTANCE_ID, DATABASE_ID, and TABLE_NAME : See Set up a Spanner instance with Singers database table.
    2. Save the singers.py file.
  3. Run singers.py with spark-submit to create the Spanner Singers table.
    spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      

    Replace the following:

    1. CONNECTOR_VERSION: Spanner connector version. Choose the Spanner connector version from the version list in the GitHub GoogleCloudDataproc/spark-spanner-connector repository.

    The output is:

    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
    only showing top 20 rows
    

Scala

To run the example Scala code on your cluster, complete the following steps:

  1. Connect to the Dataproc cluster master node using SSH.
    1. Go to the Dataproc Clusters page in the Google Cloud console, then click the name of your cluster.
    2. On the Cluster details page, select the VM Instances tab. Then click SSH to the right of the name of the cluster master node.
      Dataproc Cluster details page in the Cloud console.

      A browser window opens at your home directory on the master node.

          Connected, host fingerprint: ssh-rsa 2048 ...
          ...
          user@clusterName-m:~$
          
  2. Create a singers.scala file on the master node using the pre-installed vi, vim, or nano text editor.
    1. Paste the following code into the singers.scala file. Note that the Spanner Data Boost feature is enabled, which has near-zero impact on the main Spanner instance.
      object singers {
        def main(): Unit = {
          /*
           * Uncomment (use the following code) if you are not running in spark-shell.
           *
          import org.apache.spark.sql.SparkSession
          val spark = SparkSession.builder()
            .appName("spark-spanner-demo")
            .getOrCreate()
          */
      
          // Load data in from Spanner. See
          // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
          // for option information.
          val singersDF =
            (spark.read.format("cloud-spanner")
              .option("projectId", "PROJECT_ID")
              .option("instanceId", "INSTANCE_ID")
              .option("databaseId", "DATABASE_ID")
              .option("table", "TABLE_NAME")
              .option("enableDataBoost", true)
              .load()
              .cache())
      
          singersDF.createOrReplaceTempView("Singers")
      
          // Load the Singers table.
          val result = spark.sql("SELECT * FROM Singers")
          result.show()
          result.printSchema()
        }
      }
        

      Replace the following:

      1. PROJECT_ID: Your Google Cloud project ID. Project IDs are listed in the Project info section on the Google Cloud console Dashboard.
      2. INSTANCE_ID, DATABASE_ID, and TABLE_NAME : See Set up a Spanner instance with Singers database table.
    2. Save the singers.scala file.
  3. Launch the spark-shell REPL.
    $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
    

    Replace the following:

    CONNECTOR_VERSION: Spanner connector version. Choose the Spanner connector version from the version list in the GitHub GoogleCloudDataproc/spark-spanner-connector repository.

  4. Run singers.scala with the :load singers.scala command to create the Spanner Singers table. The output listing displays examplesfrom the Singers output.
    > :load singers.scala
    Loading singers.scala...
    defined object singers
    > singers.main()
    ...
    +--------+---------+--------+---------+-----------+
    |SingerId|FirstName|LastName|BirthDate|LastUpdated|
    +--------+---------+--------+---------+-----------+
    |       1|     Marc|Richards|     null|       null|
    |       2| Catalina|   Smith|     null|       null|
    |       3|    Alice| Trentor|     null|       null|
    +--------+---------+--------+---------+-----------+
    
    root
     |-- SingerId: long (nullable = false)
     |-- FirstName: string (nullable = true)
     |-- LastName: string (nullable = true)
     |-- BirthDate: date (nullable = true)
     |-- LastUpdated: timestamp (nullable = true)
      

Cleanup

To avoid incurring ongoing charges to your Google Cloud account, you can stop or delete your Dataproc cluster and delete your Spanner instance.

For more information