En esta página, se muestra cómo usar el Conector de Spanner de Spark para leer datos de Spanner con Apache Spark.
Calcula los costos
En este documento, usarás los siguientes componentes facturables de Google Cloud:
- Dataproc
- Spanner
- Cloud Storage
Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.
Antes de comenzar
Antes de ejecutar el instructivo, asegúrate de conocer la versión del conector y obtener un URI del conector.
Cómo especificar el URI del archivo JAR del conector
Las versiones del conector de Spanner de Spark se enumeran en el repositorio de GoogleCloudDataproc/spark-spanner-connector de GitHub.
Para especificar el archivo JAR del conector, reemplaza la información de la versión del conector en la siguiente cadena de URI:
gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
El conector está disponible para las versiones 3.1+
de Spark.
Ejemplo de gcloud CLI:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \ -- job-args
Prepara la base de datos de Spanner
Si no tienes una tabla de Spanner, puedes seguir el instructivo para crear una. Después de eso, tendrás un ID de instancia,
un ID de base de datos y una tabla Singers
.
Crea un clúster de Dataproc
Cualquier clúster de Dataproc que use el conector necesita los permisos spanner
o cloud-platform
. Los clústeres de Dataproc tienen el alcance predeterminado cloud-platform
para la imagen 2.1 o posterior. Si usas una versión anterior, puedes usar la consola de Google Cloud , Google Cloud CLI y la API de Dataproc para crear un clúster de Dataproc.
Console
- En la consola de Google Cloud , abre la página Create a cluster de Dataproc.
- En la pestaña "Administrar seguridad", haz clic en "Habilita el alcance de la plataforma de nube para este clúster" en la sección "Acceso al proyecto".
- Completa o confirma los otros campos de creación del clúster y, luego, haz clic en “Crear”.
Google Cloud CLI
gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
Puedes especificar GceClusterConfig.serviceAccountScopes como parte de una solicitud clusters.create. Por ejemplo:"serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
Deberás asegurarte de que el permiso de Spanner correspondiente se asigne a la cuenta de servicio de VM de Dataproc. Si usas Data Boost en el instructivo, consulta el Permiso de IAM de Data Boost.
Lee datos de Spanner
Puedes usar Scala y Python para leer datos de Spanner en un DataFrame de Spark con la API de fuente de datos de Spark.
Scala
- Examina el código y reemplaza los marcadores de posición [projectId], [instanceId], [databaseId] y [table] por el ID del proyecto, el ID de la instancia, el ID de la base de datos y la tabla que creaste antes. La opción enableDataBoost habilita la función Data Boost de Spanner, que tiene un impacto casi nulo en la instancia principal de Spanner.
object singers { def main(): Unit = { /* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "[projectId]") .option("instanceId", "[instanceId]") .option("databaseId", "[databaseId]") .option("enableDataBoost", true) .option("table", "[table]") .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
- Ejecuta el código de tu clúster
- Usa SSH para conectarte al nodo instancia principal del clúster de Dataproc
- Ve a la página de clústeres de Dataproc en la consola de Google Cloud y, luego, haz clic en el nombre de tu clúster.
- En la página >Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en
SSH
a la derecha del nombre del nodo instancia principal del clúster
Se abrirá una ventana del navegador en tu directorio principal del nodo principalConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea
singers.scala
con el editor de textovi
,vim
onano
preinstalado y, luego, pega el código de Scala desde la lista de códigos de Scala.nano singers.scala
- Inicia el REPL de
spark-shell
.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
- Ejecuta singers.scala con el comando
:load singers.scala
para crear la tablaSingers
de Spanner. La lista de salida muestra ejemplos del resultado de Singers.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
PySpark
- Examina el código y reemplaza los marcadores de posición [projectId], [instanceId], [databaseId] y [table] por el ID del proyecto, el ID de la instancia, el ID de la base de datos y la tabla que creaste antes. La opción enableDataBoost habilita la función Data Boost de Spanner, que tiene un impacto casi nulo en la instancia principal de Spanner.
#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "[projectId]") \ .option("instanceId", "[instanceId]") \ .option("databaseId", "[databaseId]") \ .option("enableDataBoost", "true") \ .option("table", "[table]") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
- Ejecuta el código en tu clúster.
- Usa SSH para conectarte al nodo instancia principal del clúster de Dataproc
- Ve a la página de clústeres de Dataproc en la consola de Google Cloud y, luego, haz clic en el nombre de tu clúster.
- En la página Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en
SSH
a la derecha del nombre del nodo instancia principal del clúster
Se abrirá una ventana del navegador en tu directorio principal del nodo principalConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crea
singers.py
con el editor de textovi
,vim
onano
preinstalado y, luego, pega el código de PySpark desde la lista de código de PySpark.nano singers.py
- Ejecuta singers.py con
spark-submit
para crear la tablaSingers
de Spanner. El resultado es el siguiente:spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- Usa SSH para conectarte al nodo instancia principal del clúster de Dataproc
Limpieza
Sigue estos pasos para limpiar y evitar que se apliquen cargos continuos a tu cuenta de Google Cloud por los recursos que creaste en esta explicación.
gcloud dataproc clusters stop CLUSTER_NAME gcloud dataproc clusters delete CLUSTER_NAME