Usa el conector de BigQuery de Spark

El spark-bigquery-connector se usa con Apache Spark para leer y escribir datos desde y hacia BigQuery. El conector aprovecha la API de BigQuery Storage cuando lee datos de BigQuery.

En este instructivo, se proporciona información sobre la disponibilidad del conector preinstalado y se muestra cómo hacer que una versión específica del conector esté disponible para las tareas de Spark. En el código de ejemplo, se muestra cómo usar el conector de BigQuery de Spark dentro de una aplicación de Spark.

Usa el conector preinstalado

El conector de BigQuery de Spark está preinstalado y está disponible para trabajos de Spark que se ejecutan en clústeres de Dataproc creados con versiones de imagen 2.1 y posteriores. La versión del conector preinstalada se indica en cada página de lanzamiento de la versión de la imagen. Por ejemplo, la fila BigQuery Connector en la página Versiones de actualización con imágenes 2.2.x muestra la versión del conector que se instala en las versiones de imagen 2.2 más recientes.

Cómo hacer que una versión específica del conector esté disponible para los trabajos de Spark

Si deseas usar una versión de conector diferente de una versión preinstalada en un clúster de versión de imagen 2.1 o posterior, o si deseas instalar el conector en un clúster de versión de imagen anterior a 2.1, sigue las instrucciones de esta sección.

Importante: La versión de spark-bigquery-connector debe ser compatible con la versión de la imagen del clúster de Dataproc. Consulta la matriz de compatibilidad de conectores con imágenes de Dataproc.

Clústeres de versiones de imágenes 2.1 y posteriores

Cuando crees un clúster de Dataproc con una versión de imagen 2.1 o posterior, especifica la versión del conector como metadatos del clúster.

Ejemplo de la CLI de gcloud:

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --image-version=2.2 \
    --metadata=SPARK_BQ_CONNECTOR_VERSION or SPARK_BQ_CONNECTOR_URL\
    other flags

Notas:

  • SPARK_BQ_CONNECTOR_VERSION: Especifica una versión del conector. Las versiones del conector de BigQuery de Spark se enumeran en la página spark-bigquery-connector/releases de GitHub.

    Ejemplo:

    --metadata=SPARK_BQ_CONNECTOR_VERSION=0.42.1
    
  • SPARK_BQ_CONNECTOR_URL: Especifica una URL que apunte al archivo JAR en Cloud Storage. Puedes especificar la URL de un conector que aparece en la columna vínculo en Cómo descargar y usar el conector en GitHub, o bien la ruta de acceso a una ubicación de Cloud Storage en la que hayas colocado un archivo JAR de conector personalizado.

    Ejemplos:

    --metadata=SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar
    --metadata=SPARK_BQ_CONNECTOR_URL=gs://PATH_TO_CUSTOM_JAR
    

Clústeres de versiones de imágenes 2.0 y anteriores

Puedes hacer que el conector de BigQuery de Spark esté disponible para tu aplicación de una de las siguientes maneras:

  1. Instala el spark-bigquery-connector en el directorio de los archivos jar de Spark de cada nodo mediante la acción de inicialización de conectores de Dataproc cuando crees tu clúster.

  2. Proporciona la URL del archivo jar del conector cuando envíes tu trabajo al clúster con la consola de Google Cloud, gcloud CLI o la API de Dataproc.

    Console

    Usa el elemento Archivos jar del trabajo de Spark en la página Enviar un trabajo de Dataproc.

    gcloud

    Usa la marca gcloud dataproc jobs submit spark --jars.

    API

    Usa el campo SparkJob.jarFileUris.

    Cómo especificar el archivo jar del conector cuando se ejecutan trabajos de Spark en clústeres de versiones de imágenes anteriores a la 2.0

    • Para especificar el archivo jar del conector, sustituye la información de la versión de Scala y del conector en la siguiente cadena de URI:
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      
    • Usa Scala 2.12 con las versiones de imagen de Dataproc 1.5+
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
      
      Ejemplo de la CLI de gcloud:
      gcloud dataproc jobs submit spark \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
          -- job args
      
    • Usa Scala 2.11 con las versiones de imagen de Dataproc 1.4 y anteriores:
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
      
      Ejemplo de la CLI de gcloud:
      gcloud dataproc jobs submit spark \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
          -- job-args
      
  3. Incluye el archivo jar del conector en tu aplicación de Scala o Java Spark como una dependencia (consulta Compila con el conector).

Calcula los costos

En este documento, usarás los siguientes componentes facturables de Google Cloud:

  • Dataproc
  • BigQuery
  • Cloud Storage

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios. Es posible que los usuarios nuevos de Google Cloud califiquen para obtener una prueba gratuita.

Cómo leer y escribir datos desde y hacia BigQuery

Este ejemplo lee los datos de BigQuery en un DataFrame de Spark para realizar un recuento de palabras mediante la API de fuente de datos estándar.

El conector escribe los datos en BigQuery mediante el almacenamiento en búfer de todos los datos en una tabla temporal de Cloud Storage. Luego, copia todos los datos en BigQuery en una operación. El conector intenta borrar los archivos temporales una vez que la operación de carga de BigQuery se realiza correctamente, y lo vuelve a hacer cuando la aplicación Spark finaliza. Si el trabajo falla, quita los archivos temporales de Cloud Storage que queden. Por lo general, los archivos temporales de BigQuery se encuentran en gs://[bucket]/.spark-bigquery-[jobid]-[UUID].

Configura la facturación

De forma predeterminada, el proyecto asociado con las credenciales o la cuenta de servicio se factura por el uso de la API. Para facturar un proyecto diferente, establece la siguiente configuración: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

También se puede agregar a una operación de lectura o escritura, de la siguiente manera: .option("parentProject", "<BILLED-GCP-PROJECT>").

Ejecuta el código

Antes de ejecutar este ejemplo, crea un conjunto de datos llamado “wordcount_dataset” o cambia el conjunto de datos de salida en el código a un conjunto de datos de BigQuery existente en tu proyecto deGoogle Cloud .

Usa el comando de bq para crear el wordcount_dataset:

bq mk wordcount_dataset

Usa el comando Google Cloud CLI para crear un bucket de Cloud Storage, que se usará para exportar a BigQuery:

gcloud storage buckets create gs://[bucket]

Scala

  1. Examina el código y reemplaza el marcador de posición [bucket] por el bucket de Cloud Storage que creaste anteriormente.
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      (spark.read.format("bigquery")
      .option("table","bigquery-public-data:samples.shakespeare")
      .load()
      .cache())
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .option("table","wordcount_dataset.wordcount_output")
      .save())
  2. Ejecuta el código en tu clúster
    1. Usa SSH para conectarte al nodo principal del clúster de Dataproc
      1. Ve a la página Clústeres de Dataproc en la consola de Google Cloud y, luego, haz clic en el nombre de tu clúster.
        Página de clústeres de Dataproc en la consola de Cloud.
      2. En la página >Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en SSH a la derecha del nombre del nodo principal del clúster
        Página de detalles del clúster de Dataproc en la consola de Cloud.

        Se abrirá una ventana del navegador en tu directorio principal del nodo principal
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crea wordcount.scala con el editor de texto vi, vim o nano preinstalado y, luego, pega el código de Scala desde la lista de códigos de Scala.
      nano wordcount.scala
        
    3. Inicia el REPL de spark-shell.
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. Ejecuta wordcount.scala con el comando :load wordcount.scala para crear la tabla de BigQuery wordcount_output. La lista de salida muestra 20 líneas del resultado del recuento de palabras.
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Para obtener una vista previa de la tabla de resultados, abre la página BigQuery, selecciona la tabla wordcount_output y, luego, haz clic en Vista previa.
      Vista previa de la tabla en la página del explorador de BigQuery en la consola de Cloud

PySpark

  1. Examina el código y reemplaza el marcador de posición [bucket] por el bucket de Cloud Storage que creaste anteriormente.
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
  2. Ejecuta el código en tu clúster.
    1. Usa SSH para conectarte al nodo principal del clúster de Dataproc
      1. Ve a la página Clústeres de Dataproc en la consola de Google Cloud y, luego, haz clic en el nombre de tu clúster.
        Página de clústeres en la consola de Cloud
      2. En la página Detalles del clúster, selecciona la pestaña Instancias de VM. Luego, haz clic en SSH a la derecha del nombre del nodo principal del clúster
        Selecciona SSH en la fila del nombre del clúster en la página Detalles del clúster de la consola de Cloud.

        Se abrirá una ventana del navegador en tu directorio principal del nodo principal
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crea wordcount.py con el editor de texto vi, vim o nano preinstalado y, luego, pega el código de PySpark desde la lista de código de PySpark.
      nano wordcount.py
      
    3. Ejecuta el conteo de palabras con spark-submit para crear la tabla de BigQuery wordcount_output. La lista de salida muestra 20 líneas del resultado del recuento de palabras.
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Para obtener una vista previa de la tabla de resultados, abre la página BigQuery, selecciona la tabla wordcount_output y, luego, haz clic en Vista previa.
      Vista previa de la tabla en la página del explorador de BigQuery en la consola de Cloud

Más información