This page shows you how to create a Dataproc cluster that uses the Spark Spanner Connector to read data from Spanner using Apache Spark
Calculate costs
In this document, you use the following billable components of Google Cloud:
- Dataproc
- Spanner
- Cloud Storage
To generate a cost estimate based on your projected usage,
use the pricing calculator.
Before you begin
Before using the Spanner connector in this tutorial, set up a Dataproc cluster and a Spanner instance and database.
Set up a Dataproc cluster
Create a Dataproc cluster or use an existing Dataproc cluster that has the following settings:
VM service account permissions. The cluster VM service account must be assigned the appropriate Spanner permissions. If you use Data Boost (Data Boost is enabled in the example code in Read data from Spanner), the VM service account must also have the required Data Boost IAM permissions.
Access scope. The cluster must be created with
cloud-platform
scope or the appropriatespanner
scope enabled.cloud-platform
scope is enabled by default for clusters created with image version 2.1 or higher.The following instructions show you how to set
cloud-platform
scope as part of a a cluster creation request that uses the Google Cloud console, gcloud CLI, or Dataproc API. For additional cluster creation instructions, see Create a cluster.Google Cloud console
- In the Google Cloud console, open the Dataproc Create a cluster page.
- On the Manage security panel in the Project access section, click "Enables the cloud-platform scope for this cluster".
- Fill in or confirm the other cluster creation fields. then click Create.
gcloud CLI
You can run the following
gcloud dataproc clusters create
command to create a cluster with thecloud-platform
scope enabled.gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
You can specify the GceClusterConfig.serviceAccountScopes as part of a clusters.create request.
"serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
Set up a Spanner instance with a Singers
database table
Create a Spanner instance
with a database that contains a Singers
table. Note the Spanner
instance ID and database ID.
Use the Spanner connector with Spark
The Spanner connector is available for Spark versions 3.1+
.
You specify the
connector version as part of the Cloud Storage
connector JAR file specification when you
submit a job to
a Dataproc cluster.
Example: gcloud CLI Spark job submission with the Spanner connector.
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
Replace the following:
CONNECTOR_VERSION: Spanner connector version.
Choose the Spanner connector version from the version list in the GitHub
GoogleCloudDataproc/spark-spanner-connector
repository.
Read data from Spanner
You can use Python or Scala to read data from Spanner into a Spark Dataframe using the Spark data source API.
PySpark
You can run the example PySpark code in this section on your cluster by submitting the job to the
Dataproc service or by running the job from the spark-submit
REPL
on the cluster master node.
Dataproc job
- Create a
singers.py
file in using a local text editor or in Cloud Shell using the pre-installedvi
,vim
, ornano
text editor. - Paste the following code into the
singers.py
file. Note that the Spanner Data Boost feature is enabled, which has near-zero impact on the main Spanner instance.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Replace the following:
- PROJECT_ID: Your Google Cloud project ID. Project IDs are listed in the Project info section on the Google Cloud console Dashboard.
- INSTANCE_ID, DATABASE_ID, and TABLE_NAME : See
Set up a Spanner instance with
Singers
database table.
- Save the
singers.py
file. - Submit the job
to the Dataproc service using the Google Cloud console, gcloud CLI or
Dataproc API.
Example: gcloud CLI job submission with the Spanner connector.
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION
Replace the following:
- CLUSTER_NAME: The name of the new cluster.
- REGION: An available Compute Engine region to run the workload.
- CONNECTOR_VERSION: Spanner connector version.
Choose the Spanner connector version from the version list in the GitHub
GoogleCloudDataproc/spark-spanner-connector
repository.
spark-submit job
- Connect to the Dataproc cluster master node using SSH.
- Go to the Dataproc Clusters page in the Google Cloud console, then click the name of your cluster.
- On the Cluster details page, select the VM Instances tab. Then click
SSH
to the right of the name of the cluster master node.A browser window opens at your home directory on the master node.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Create a
singers.py
file on the master node using the pre-installedvi
,vim
, ornano
text editor.- Paste the following code into the
singers.py
file. Note that the Spanner Data Boost feature is enabled, which has near-zero impact on the main Spanner instance.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Replace the following:
- PROJECT_ID: Your Google Cloud project ID. Project IDs are listed in the Project info section on the Google Cloud console Dashboard.
- INSTANCE_ID, DATABASE_ID, and TABLE_NAME : See
Set up a Spanner instance with
Singers
database table.
- Save the
singers.py
file.
- Paste the following code into the
- Run
singers.py
withspark-submit
to create the SpannerSingers
table.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
Replace the following:
- CONNECTOR_VERSION: Spanner connector version.
Choose the Spanner connector version from the version list in the GitHub
GoogleCloudDataproc/spark-spanner-connector
repository.
The output is:
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION: Spanner connector version.
Choose the Spanner connector version from the version list in the GitHub
Scala
To run the example Scala code on your cluster, complete the following steps:
- Connect to the Dataproc cluster master node using SSH.
- Go to the Dataproc Clusters page in the Google Cloud console, then click the name of your cluster.
- On the Cluster details page, select the VM Instances tab. Then click
SSH
to the right of the name of the cluster master node.A browser window opens at your home directory on the master node.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Create a
singers.scala
file on the master node using the pre-installedvi
,vim
, ornano
text editor.- Paste the following code into the
singers.scala
file. Note that the Spanner Data Boost feature is enabled, which has near-zero impact on the main Spanner instance.object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
Replace the following:
- PROJECT_ID: Your Google Cloud project ID. Project IDs are listed in the Project info section on the Google Cloud console Dashboard.
- INSTANCE_ID, DATABASE_ID, and TABLE_NAME : See
Set up a Spanner instance with
Singers
database table.
- Save the
singers.scala
file.
- Paste the following code into the
- Launch the
spark-shell
REPL.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Replace the following:
CONNECTOR_VERSION: Spanner connector version. Choose the Spanner connector version from the version list in the GitHub
GoogleCloudDataproc/spark-spanner-connector
repository. - Run
singers.scala
with the:load singers.scala
command to create the SpannerSingers
table. The output listing displays examplesfrom the Singers output.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
Cleanup
To avoid incurring ongoing charges to your Google Cloud account, you can stop or delete your Dataproc cluster and delete your Spanner instance.