Ejecuta código de PySpark en notebooks de BigQuery Studio

En este documento, se muestra cómo ejecutar código de PySpark en un notebook de Python de BigQuery.

Antes de comenzar

Si aún no lo hiciste, crea un Google Cloud proyecto y un bucket de Cloud Storage.

  1. Configura tu proyecto

    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    3. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    6. Crea un bucket de Cloud Storage en tu proyecto si no tienes uno que puedas usar.

    7. Configura tu notebook

      1. Credenciales del notebook: De forma predeterminada, tu sesión de notebook usa tus credenciales de usuario. Si deseas especificar credenciales de la cuenta de servicio para tu sesión, esta debe tener el rol de trabajador de Dataproc (roles/dataproc.worker). Para obtener más información, consulta Cuenta de servicio de Dataproc Serverless.
      2. Entorno de ejecución del notebook: Tu notebook usa un entorno de ejecución predeterminado de Vertex, a menos que selecciones uno diferente. Si deseas definir tu propio entorno de ejecución, crea el entorno de ejecución desde la página Entornos de ejecución en la consola de Google Cloud .
    8. Precios

      Para obtener información sobre los precios, consulta Precios del tiempo de ejecución de Notebooks de BigQuery.

      Abre un notebook de Python de BigQuery Studio

      1. En la consola de Google Cloud , ve a la página BigQuery.

        Ir a BigQuery

      2. En la barra de pestañas del panel de detalles, haz clic en la flecha junto al signo + y, luego, en Notebook.

      Crea una sesión de Spark en un notebook de BigQuery Studio

      Puedes usar un notebook de Python de BigQuery Studio para crear una sesión interactiva de Spark Connect. Cada notebook de BigQuery Studio solo puede tener una sesión activa de Dataproc Serverless asociada.

      Puedes crear una sesión de Spark en un notebook de Python de BigQuery Studio de las siguientes maneras:

      • Configura y crea una sola sesión en el notebook.
      • Configura una sesión de Spark en una plantilla de sesión interactiva de Dataproc Serverless para Spark y, luego, usa la plantilla para configurar y crear una sesión en el notebook. BigQuery proporciona una función Query using Spark que te ayuda a comenzar a codificar la sesión basada en plantillas, como se explica en la pestaña Sesión de Spark basada en plantillas.

      Sesión única

      Para crear una sesión de Spark en un cuaderno nuevo, haz lo siguiente:

      1. En la barra de pestañas del panel del editor, haz clic en la flecha desplegable junto al signo + y, luego, haz clic en Notebook.

        Captura de pantalla que muestra la interfaz de BigQuery con el botón "+" para crear un notebook nuevo.
      2. Copia y ejecuta el siguiente código en una celda del notebook para configurar y crear una sesión básica de Spark.

      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      
      import pyspark.sql.connect.functions as f
      
      session = Session()
      
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      

      Reemplaza lo siguiente:

      • APP_NAME: Es un nombre opcional para tu sesión.
      • Configuración de sesión opcional: Puedes agregar parámetros de configuración de la API de Dataproc Session para personalizar tu sesión. Estos son algunos ejemplos:
        • RuntimeConfig:
          Ayuda de código que muestra las opciones de session.runtime.config.
          • session.runtime_config.properties={spark.property.key1:VALUE_1,...,spark.property.keyN:VALUE_N}
          • session.runtime_config.container_image = path/to/container/image
        • EnvironmentConfig:
          Ayuda de código que muestra las opciones de session-environment-config-execution-config.
          • session.environment_config.execution_config.subnetwork_uri = "SUBNET_NAME"
          • session.environment_config.execution_config.ttl = {"seconds": VALUE}
          • session.environment_config.execution_config.service_account = SERVICE_ACCOUNT

      Sesión de Spark basada en plantillas

      Puedes ingresar y ejecutar el código en una celda de notebook para crear una sesión de Spark basada en una plantilla de sesión de Dataproc Serverless existente. Cualquier parámetro de configuración de session que proporciones en el código del notebook anulará cualquier parámetro de configuración establecido en la plantilla de sesión.

      Para comenzar rápidamente, usa la plantilla Query using Spark para completar previamente tu notebook con código de plantilla de sesión de Spark:

      1. En la barra de pestañas del panel del editor, haz clic en la flecha desplegable junto al signo + y, luego, haz clic en Notebook.
        Captura de pantalla que muestra la interfaz de BigQuery con el botón "+" para crear un notebook nuevo.
      2. En Comenzar con una plantilla, haz clic en Consultar con Spark y, luego, en Usar plantilla para insertar el código en tu notebook.
        Selecciones de la IU de BigQuery para comenzar con una plantilla
      3. Especifica las variables como se explica en las Notas.
      4. Puedes borrar cualquier celda de código de muestra adicional que se haya insertado en la notebook.
      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      session = Session()
      # Configure the session with an existing session template.
      session_template = "SESSION_TEMPLATE"
      session.session_template = f"projects/{project}/locations/{location}/sessionTemplates/{session_template}"
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      

      Reemplaza lo siguiente:

      • PROJECT: Es el ID de tu proyecto, que se encuentra en la sección Project info del Google Cloud panel de la consola.
      • LOCATION: La región de Compute Engine en la que se ejecutará tu sesión de notebook. Si no se proporciona, la ubicación predeterminada es la región de la VM que crea el notebook.
      • SESSION_TEMPLATE: Es el nombre de una plantilla de sesión interactiva de Dataproc Serverless existente. Los parámetros de configuración de la sesión se obtienen de la plantilla. La plantilla también debe especificar los siguientes parámetros de configuración:

        • Versión del entorno de ejecución 2.3+
        • Tipo de notebook: Spark Connect

          Ejemplo:

          Captura de pantalla que muestra la configuración requerida de Spark Connect.
      • APP_NAME: Es un nombre opcional para tu sesión.

      Escribe y ejecuta código de PySpark en tu notebook de BigQuery Studio

      Después de crear una sesión de Spark en tu notebook, úsala para ejecutar código de notebook de Spark.

      Compatibilidad con la API de PySpark de Spark Connect: Tu sesión de notebook de Spark Connect admite la mayoría de las APIs de PySpark, incluidas DataFrame, Functions y Column, pero no admite SparkContext ni RDD, ni otras APIs de PySpark. Para obtener más información, consulta Qué se admite en Spark 3.5.

      APIs específicas de Dataproc: Dataproc simplifica la adición dinámica de paquetes PyPI a tu sesión de Spark extendiendo el método addArtifacts. Puedes especificar la lista en formato version-scheme (similar a pip install). Esto indica al servidor de Spark Connect que instale paquetes y sus dependencias en todos los nodos del clúster, lo que los pone a disposición de los trabajadores para tus UDF.

      Ejemplo que instala la versión de textdistance especificada y las bibliotecas de random2 compatibles más recientes en el clúster para permitir que las UDF que usan textdistance y random2 se ejecuten en los nodos de trabajo.

      spark.addArtifacts("textdistance==4.6.1", "random2", pypi=True)
      

      Ayuda con el código de notebooks: El notebook de BigQuery Studio proporciona ayuda con el código cuando mantienes el puntero sobre el nombre de una clase o un método, y proporciona ayuda para completar el código a medida que lo ingresas.

      En el siguiente ejemplo, se ingresa DataprocSparkSession. y mantener el puntero sobre este nombre de clase muestra la función para completar código y ayuda con la documentación.

      Ejemplos de sugerencias de documentación y finalización de código.

      Ejemplos de PySpark en notebooks de BigQuery Studio

      En esta sección, se proporcionan ejemplos de notebooks de Python de BigQuery Studio con código de PySpark para realizar las siguientes tareas:

      • Ejecuta un recuento de palabras en un conjunto de datos públicos de Shakespeare.
      • Crea una tabla de Iceberg con metadatos guardados en BigLake Metastore.

      Conteo de palabras

      En el siguiente ejemplo de PySpark, se crea una sesión de Spark y, luego, se cuentan las ocurrencias de palabras en un conjunto de datos públicos bigquery-public-data.samples.shakespeare.

      # Basic wordcount example
      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      session = Session()
      
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      # Run a wordcount on the public Shakespeare dataset.
      df = spark.read.format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load()
      words_df = df.select(f.explode(f.split(f.col("word"), " ")).alias("word"))
      word_counts_df = words_df.filter(f.col("word") != "").groupBy("word").agg(f.count("*").alias("count")).orderBy("word")
      word_counts_df.show()
      

      Reemplaza lo siguiente:

      • APP_NAME: Es un nombre opcional para tu sesión.

      Resultado:

      El resultado de la celda muestra una muestra del resultado del recuento de palabras. Para ver los detalles de la sesión en la consola de Google Cloud , haz clic en el vínculo Vista detallada de la sesión interactiva. Para supervisar tu sesión de Spark, haz clic en Ver IU de Spark en la página de detalles de la sesión.

      Botón para ver la IU de Spark en la página de detalles de la sesión en la consola
      Interactive Session Detail View: LINK
      +------------+-----+
      |        word|count|
      +------------+-----+
      |           '|   42|
      |       ''All|    1|
      |     ''Among|    1|
      |       ''And|    1|
      |       ''But|    1|
      |    ''Gamut'|    1|
      |       ''How|    1|
      |        ''Lo|    1|
      |      ''Look|    1|
      |        ''My|    1|
      |       ''Now|    1|
      |         ''O|    1|
      |      ''Od's|    1|
      |       ''The|    1|
      |       ''Tis|    4|
      |      ''When|    1|
      |       ''tis|    1|
      |      ''twas|    1|
      |          'A|   10|
      |'ARTEMIDORUS|    1|
      +------------+-----+
      only showing top 20 rows
      

      Tabla de Iceberg

      Ejecuta código de PySpark para crear una tabla de Iceberg con metadatos de BigLake Metastore

      En el siguiente ejemplo de código, se crea un objeto sample_iceberg_table con metadatos de la tabla almacenados en el metastore de BigLake y, luego, se consulta la tabla.

      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      # Create the Dataproc Serverless session.
      session = Session()
      # Set the session configuration for BigLake Metastore with the Iceberg environment.
      project = "PROJECT"
      region = "REGION"
      subnet_name = "SUBNET_NAME"
      location = "LOCATION"
      session.environment_config.execution_config.subnetwork_uri = f"{subnet_name}"
      warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY"
      catalog = "CATALOG_NAME"
      namespace = "NAMESPACE"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}"] = "org.apache.iceberg.spark.SparkCatalog"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_project"] = f"{project_id}"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_location"] = f"{location}"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.warehouse"] = f"{warehouse_dir}"
      # Create the Spark Connect session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      # Create the namespace in BigQuery.
      spark.sql(f"USE `{catalog}`;")
      spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;")
      spark.sql(f"USE `{namespace}`;")
      # Create the Iceberg table.
      spark.sql("DROP TABLE IF EXISTS `sample_iceberg_table`");
      spark.sql("CREATE TABLE sample_iceberg_table (id int, data string) USING ICEBERG;")
      spark.sql("DESCRIBE sample_iceberg_table;")
      # Insert table data and query the table.
      spark.sql("INSERT INTO sample_iceberg_table VALUES (1, \"first row\");")
      # Alter table, then query and display table data and schema.
      spark.sql("ALTER TABLE sample_iceberg_table ADD COLUMNS (newDoubleCol double);")
      spark.sql("DESCRIBE sample_iceberg_table;")
      df = spark.sql("SELECT * FROM sample_iceberg_table")
      df.show()
      df.printSchema()
      

      Notas:

      • PROJECT: Es el ID de tu proyecto, que se encuentra en la sección Project info del Google Cloud panel de la consola.
      • REGION y SUBNET_NAME: Especifica la región de Compute Engine y el nombre de una subred en la región de la sesión. Dataproc Serverless habilita el Acceso privado a Google (PGA) en la subred especificada.
      • LOCATION: Los valores predeterminados de BigQuery_metastore_config.location y spark.sql.catalog.{catalog}.gcp_location son US, pero puedes elegir cualquier ubicación de BigQuery compatible.
      • BUCKET y WAREHOUSE_DIRECTORY: Son el bucket y la carpeta de Cloud Storage que se usan para el directorio del almacén de Iceberg.
      • CATALOG_NAME y NAMESPACE: El nombre y el espacio de nombres del catálogo de Iceberg se combinan para identificar la tabla de Iceberg (catalog.namespace.table_name).
      • APP_NAME: Es un nombre opcional para tu sesión.

      El resultado de la celda muestra el sample_iceberg_table con la columna agregada y un vínculo a la página Detalles de la sesión interactiva en la consola de Google Cloud . Puedes hacer clic en Ver IU de Spark en la página de detalles de la sesión para supervisar tu sesión de Spark.

      Interactive Session Detail View: LINK
      +---+---------+------------+
      | id|     data|newDoubleCol|
      +---+---------+------------+
      |  1|first row|        NULL|
      +---+---------+------------+
      
      root
       |-- id: integer (nullable = true)
       |-- data: string (nullable = true)
       |-- newDoubleCol: double (nullable = true)
      

      Cómo ver los detalles de la tabla en BigQuery

      Sigue estos pasos para verificar los detalles de la tabla de Iceberg en BigQuery:

      1. En la consola de Google Cloud , ve a la página BigQuery.

        Ir a BigQuery

      2. En el panel de recursos del proyecto, haz clic en tu proyecto y, luego, en tu espacio de nombres para enumerar la tabla sample_iceberg_table. Haz clic en la tabla Details para ver la información de Open Catalog Table Configuration.

        Los formatos de entrada y salida son los formatos de clase InputFormat y OutputFormat estándar de Hadoop que usa Iceberg.

        Metadatos de la tabla Iceberg que se muestran en la IU de BigQuery

      Otros ejemplos

      Crea un DataFrame de Spark (sdf) a partir de un DataFrame de Pandas (df).

      sdf = spark.createDataFrame(df)
      sdf.show()
      

      Ejecuta agregaciones en el DataFrames de Spark.

      from pyspark.sql import functions as F
      
      sdf.groupby("segment").agg(
         F.mean("total_spend_per_user").alias("avg_order_value"),
         F.approx_count_distinct("user_id").alias("unique_customers")
      ).show()
      

      Lee desde BigQuery con el conector de Spark-BigQuery.

      spark.conf.set("viewsEnabled","true")
      spark.conf.set("materializationDataset","my-bigquery-dataset")
      
      sdf = spark.read.format('bigquery') \
       .load(query)
      

      Escribe código de Spark con Gemini Code Assist

      Puedes pedirle a Gemini Code Assist que genere código de PySpark en tu notebook. Gemini Code Assist recupera y usa las tablas pertinentes de BigQuery y Dataproc Metastore, y sus esquemas, para generar una respuesta de código.

      Para generar código de Gemini Code Assist en tu notebook, haz lo siguiente:

      1. Haz clic en + Código en la barra de herramientas para insertar una celda de código nueva. La nueva celda de código muestra Start coding or generate with AI. Haz clic en Generar.

      2. En el editor de Generate, ingresa una instrucción en lenguaje natural y, luego, haz clic en enter. Asegúrate de incluir la palabra clave spark o pyspark en tu instrucción.

        Instrucción de ejemplo:

        create a spark dataframe from order_items and filter to orders created in 2024
        

        Resultado de muestra:

        spark.read.format("bigquery").option("table", "sqlgen-testing.pysparkeval_ecommerce.order_items").load().filter("year(created_at) = 2024").createOrReplaceTempView("order_items")
        df = spark.sql("SELECT * FROM order_items")
        

      Sugerencias para la generación de código con Gemini Code Assist

      • Para permitir que Gemini Code Assist recupere tablas y esquemas pertinentes, activa la sincronización de Data Catalog para las instancias de Dataproc Metastore.

      • Asegúrate de que tu cuenta de usuario tenga acceso a las tablas de consulta de Data Catalog. Para ello, asigna el rol DataCatalog.Viewer.

      Finaliza la sesión de Spark

      Puedes realizar cualquiera de las siguientes acciones para detener tu sesión de Spark Connect en tu notebook de BigQuery Studio:

      • Ejecuta spark.stop() en una celda del notebook.
      • Finaliza el entorno de ejecución en el notebook:
        1. Haz clic en el selector de tiempo de ejecución y, luego, en Administrar sesiones.
          Administra la selección de sesiones
        2. En el diálogo Sesiones activas, haz clic en el ícono de finalizar y, luego, en Finalizar.
          Finaliza la selección de sesiones en el diálogo Sesiones activas

      Cómo organizar el código de los notebooks de BigQuery Studio

      Puedes organizar el código del notebook de BigQuery Studio de las siguientes maneras:

      Programa código de notebook desde la consola de Google Cloud

      Puedes programar el código del notebook de las siguientes maneras:

      Ejecuta código de notebook como una carga de trabajo por lotes de Dataproc Serverless

      Completa los siguientes pasos para ejecutar el código del notebook de BigQuery Studio como una carga de trabajo por lotes de Dataproc Serverless.

      1. Descarga el código del notebook en un archivo en una terminal local o en Cloud Shell.

        1. Abre el notebook en el panel Explorador de la página de BigQuery Studio en la Google Cloud consola.

        2. Para descargar el código del notebook, selecciona Descargar en el menú Archivo y, luego, elige Download .py.

          Menú Archivo > Descargar en la página del Explorador
      2. Genera requirements.txt.

        1. Instala pipreqs en el directorio en el que guardaste el archivo .py.
          pip install pipreqs
          
        2. Ejecuta pipreqs para generar requirements.txt.

          pipreqs filename.py
          

        3. Usa la CLI de Google Cloud para copiar el archivo requirements.txt local en un bucket de Cloud Storage.

          gcloud storage cp requirements.txt gs://BUCKET/
          
      3. Actualiza el código de la sesión de Spark editando el archivo .py descargado.

        1. Quita o marca como comentario cualquier comando de secuencia de comandos de shell.

        2. Quita el código que configura la sesión de Spark y, luego, especifica los parámetros de configuración como parámetros de envío de la carga de trabajo por lotes. (consulta Envía una carga de trabajo por lotes de Spark).

          Ejemplo:

          • Quita la siguiente línea de configuración de la subred de la sesión del código:

            session.environment_config.execution_config.subnetwork_uri = "{subnet_name}"
            

          • Cuando ejecutes tu carga de trabajo por lotes, usa la marca --subnet para especificar la subred.

            gcloud dataproc batches submit pyspark \
            --subnet=SUBNET_NAME
            
        3. Usa un fragmento de código simple para crear sesiones.

          • Muestra del código del notebook descargado antes de la simplificación.

            from google.cloud.dataproc_spark_connect import DataprocSparkSession
            from google.cloud.dataproc_v1 import Session
            

            session = Session() spark = DataprocSparkSession \     .builder \     .appName("CustomSparkSession")     .dataprocSessionConfig(session) \     .getOrCreate()

          • Código de carga de trabajo por lotes después de la simplificación.

            from pyspark.sql import SparkSession
            

            spark = SparkSession \ .builder \ .getOrCreate()

      4. Ejecuta la carga de trabajo por lotes.

        1. Consulta Envía la carga de trabajo por lotes de Spark para obtener instrucciones.

          • Asegúrate de incluir la marca --deps-bucket para que apunte al bucket de Cloud Storage que contiene tu archivo requirements.txt.

            Ejemplo:

          gcloud dataproc batches submit pyspark FILENAME.py \
              --region=REGION \
              --deps-bucket=BUCKET \
              --version=2.3 
          

          Notas:

          • FILENAME: El nombre del archivo de código del notebook que descargaste y editaste.
          • REGION: La región de Compute Engine en la que se encuentra tu clúster.
          • BUCKET Es el nombre del bucket de Cloud Storage que contiene tu archivo requirements.txt.
          • --version: Se selecciona la versión 2.3 del entorno de ejecución de Spark para ejecutar la carga de trabajo por lotes.
      5. Confirma tu código.

        1. Después de probar el código de la carga de trabajo por lotes, puedes confirmar el archivo .ipynb o .py en tu repositorio con tu cliente git, como GitHub, GitLab o Bitbucket, como parte de tu canalización de CI/CD.
      6. Programa tu carga de trabajo por lotes con Cloud Composer.

        1. Consulta Cómo ejecutar cargas de trabajo de Dataproc Serverless con Cloud Composer para obtener instrucciones.

      Soluciona problemas de errores de notebooks

      Si se produce una falla en una celda que contiene código de Spark, puedes solucionar el error haciendo clic en el vínculo Interactive Session Detail View en el resultado de la celda (consulta los ejemplos de Wordcount y de tablas de Iceberg).

      Problemas conocidos y soluciones

      Error: Un entorno de ejecución de notebook creado con la versión de Python 3.10 puede causar un error PYTHON_VERSION_MISMATCH cuando intenta conectarse a la sesión de Spark.

      Solución: Vuelve a crear el entorno de ejecución con la versión 3.11 de Python.

      ¿Qué sigue?