Ejecuta código de PySpark en notebooks de BigQuery Studio
En este documento, se muestra cómo ejecutar código de PySpark en un notebook de Python de BigQuery.
Antes de comenzar
Si aún no lo hiciste, crea un Google Cloud proyecto y un bucket de Cloud Storage.
Configura tu proyecto
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
Crea un bucket de Cloud Storage en tu proyecto si no tienes uno que puedas usar.
Configura tu notebook
- Credenciales del notebook: De forma predeterminada, tu sesión de notebook usa tus credenciales de usuario. Si deseas especificar credenciales de la cuenta de servicio para tu sesión, esta debe tener el rol de trabajador de Dataproc (
roles/dataproc.worker
). Para obtener más información, consulta Cuenta de servicio de Dataproc Serverless. - Entorno de ejecución del notebook: Tu notebook usa un entorno de ejecución predeterminado de Vertex, a menos que selecciones uno diferente. Si deseas definir tu propio entorno de ejecución, crea el entorno de ejecución desde la página Entornos de ejecución en la consola de Google Cloud .
- Credenciales del notebook: De forma predeterminada, tu sesión de notebook usa tus credenciales de usuario. Si deseas especificar credenciales de la cuenta de servicio para tu sesión, esta debe tener el rol de trabajador de Dataproc (
En la consola de Google Cloud , ve a la página BigQuery.
En la barra de pestañas del panel de detalles, haz clic en la flecha
junto al signo + y, luego, en Notebook.- Configura y crea una sola sesión en el notebook.
- Configura una sesión de Spark en una plantilla de sesión interactiva de Dataproc Serverless para Spark y, luego, usa la plantilla para configurar y crear una sesión en el notebook.
BigQuery proporciona una función
Query using Spark
que te ayuda a comenzar a codificar la sesión basada en plantillas, como se explica en la pestaña Sesión de Spark basada en plantillas. En la barra de pestañas del panel del editor, haz clic en la flecha desplegable
junto al signo + y, luego, haz clic en Notebook.Copia y ejecuta el siguiente código en una celda del notebook para configurar y crear una sesión básica de Spark.
- APP_NAME: Es un nombre opcional para tu sesión.
- Configuración de sesión opcional: Puedes agregar parámetros de configuración de la API de Dataproc
Session
para personalizar tu sesión. Estos son algunos ejemplos:RuntimeConfig
:session.runtime_config.properties={spark.property.key1:VALUE_1,...,spark.property.keyN:VALUE_N}
session.runtime_config.container_image = path/to/container/image
EnvironmentConfig
:- session.environment_config.execution_config.subnetwork_uri = "SUBNET_NAME"
session.environment_config.execution_config.ttl = {"seconds": VALUE}
session.environment_config.execution_config.service_account = SERVICE_ACCOUNT
- En la barra de pestañas del panel del editor, haz clic en la flecha desplegable
- En Comenzar con una plantilla, haz clic en Consultar con Spark y, luego, en Usar plantilla para insertar el código en tu notebook.
- Especifica las variables como se explica en las Notas.
- Puedes borrar cualquier celda de código de muestra adicional que se haya insertado en la notebook.
- PROJECT: Es el ID de tu proyecto, que se encuentra en la sección Project info del Google Cloud panel de la consola.
- LOCATION: La región de Compute Engine en la que se ejecutará tu sesión de notebook. Si no se proporciona, la ubicación predeterminada es la región de la VM que crea el notebook.
SESSION_TEMPLATE: Es el nombre de una plantilla de sesión interactiva de Dataproc Serverless existente. Los parámetros de configuración de la sesión se obtienen de la plantilla. La plantilla también debe especificar los siguientes parámetros de configuración:
- Versión del entorno de ejecución
2.3
+ Tipo de notebook:
Spark Connect
Ejemplo:
- Versión del entorno de ejecución
APP_NAME: Es un nombre opcional para tu sesión.
- Ejecuta un recuento de palabras en un conjunto de datos públicos de Shakespeare.
- Crea una tabla de Iceberg con metadatos guardados en BigLake Metastore.
- APP_NAME: Es un nombre opcional para tu sesión.
- PROJECT: Es el ID de tu proyecto, que se encuentra en la sección Project info del Google Cloud panel de la consola.
- REGION y SUBNET_NAME: Especifica la región de Compute Engine y el nombre de una subred en la región de la sesión. Dataproc Serverless habilita el Acceso privado a Google (PGA) en la subred especificada.
- LOCATION: Los valores predeterminados de
BigQuery_metastore_config.location
yspark.sql.catalog.{catalog}.gcp_location
sonUS
, pero puedes elegir cualquier ubicación de BigQuery compatible. - BUCKET y WAREHOUSE_DIRECTORY: Son el bucket y la carpeta de Cloud Storage que se usan para el directorio del almacén de Iceberg.
- CATALOG_NAME y NAMESPACE: El nombre y el espacio de nombres del catálogo de Iceberg se combinan para identificar la tabla de Iceberg (
catalog.namespace.table_name
). - APP_NAME: Es un nombre opcional para tu sesión.
En la consola de Google Cloud , ve a la página BigQuery.
En el panel de recursos del proyecto, haz clic en tu proyecto y, luego, en tu espacio de nombres para enumerar la tabla
sample_iceberg_table
. Haz clic en la tabla Details para ver la información de Open Catalog Table Configuration.Los formatos de entrada y salida son los formatos de clase
InputFormat
yOutputFormat
estándar de Hadoop que usa Iceberg.Haz clic en + Código en la barra de herramientas para insertar una celda de código nueva. La nueva celda de código muestra
Start coding or generate with AI
. Haz clic en Generar.En el editor de Generate, ingresa una instrucción en lenguaje natural y, luego, haz clic en
enter
. Asegúrate de incluir la palabra clavespark
opyspark
en tu instrucción.Instrucción de ejemplo:
create a spark dataframe from order_items and filter to orders created in 2024
Resultado de muestra:
spark.read.format("bigquery").option("table", "sqlgen-testing.pysparkeval_ecommerce.order_items").load().filter("year(created_at) = 2024").createOrReplaceTempView("order_items") df = spark.sql("SELECT * FROM order_items")
Para permitir que Gemini Code Assist recupere tablas y esquemas pertinentes, activa la sincronización de Data Catalog para las instancias de Dataproc Metastore.
Asegúrate de que tu cuenta de usuario tenga acceso a las tablas de consulta de Data Catalog. Para ello, asigna el rol
DataCatalog.Viewer
.- Ejecuta
spark.stop()
en una celda del notebook. - Finaliza el entorno de ejecución en el notebook:
- Haz clic en el selector de tiempo de ejecución y, luego, en Administrar sesiones.
- En el diálogo Sesiones activas, haz clic en el ícono de finalizar y, luego, en Finalizar.
- Haz clic en el selector de tiempo de ejecución y, luego, en Administrar sesiones.
Programa código de notebook desde la consola de Google Cloud (se aplican los precios de los notebooks).
Ejecuta código de notebook como una carga de trabajo por lotes de Dataproc Serverless (se aplican los precios de Dataproc Serverless).
- Programa el notebook.
- Si la ejecución de código del notebook forma parte de un flujo de trabajo, programa el notebook como parte de una canalización.
Descarga el código del notebook en un archivo en una terminal local o en Cloud Shell.
Abre el notebook en el panel Explorador de la página de BigQuery Studio en la Google Cloud consola.
Para descargar el código del notebook, selecciona Descargar en el menú Archivo y, luego, elige
Download .py
.
Genera
requirements.txt
.- Instala
pipreqs
en el directorio en el que guardaste el archivo.py
.pip install pipreqs
Ejecuta
pipreqs
para generarrequirements.txt
.pipreqs filename.py
Usa la CLI de Google Cloud para copiar el archivo
requirements.txt
local en un bucket de Cloud Storage.gcloud storage cp requirements.txt gs://BUCKET/
- Instala
Actualiza el código de la sesión de Spark editando el archivo
.py
descargado.Quita o marca como comentario cualquier comando de secuencia de comandos de shell.
Quita el código que configura la sesión de Spark y, luego, especifica los parámetros de configuración como parámetros de envío de la carga de trabajo por lotes. (consulta Envía una carga de trabajo por lotes de Spark).
Ejemplo:
Quita la siguiente línea de configuración de la subred de la sesión del código:
session.environment_config.execution_config.subnetwork_uri = "{subnet_name}"
Cuando ejecutes tu carga de trabajo por lotes, usa la marca
--subnet
para especificar la subred.gcloud dataproc batches submit pyspark \ --subnet=SUBNET_NAME
Usa un fragmento de código simple para crear sesiones.
Muestra del código del notebook descargado antes de la simplificación.
from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session
session = Session() spark = DataprocSparkSession \ .builder \ .appName("CustomSparkSession") .dataprocSessionConfig(session) \ .getOrCreate()
Código de carga de trabajo por lotes después de la simplificación.
from pyspark.sql import SparkSession
spark = SparkSession \ .builder \ .getOrCreate()
Ejecuta la carga de trabajo por lotes.
Consulta Envía la carga de trabajo por lotes de Spark para obtener instrucciones.
Asegúrate de incluir la marca --deps-bucket para que apunte al bucket de Cloud Storage que contiene tu archivo
requirements.txt
.Ejemplo:
gcloud dataproc batches submit pyspark FILENAME.py \ --region=REGION \ --deps-bucket=BUCKET \ --version=2.3
Notas:
- FILENAME: El nombre del archivo de código del notebook que descargaste y editaste.
- REGION: La región de Compute Engine en la que se encuentra tu clúster.
- BUCKET Es el nombre del bucket de Cloud Storage que contiene tu archivo
requirements.txt
. --version
: Se selecciona la versión 2.3 del entorno de ejecución de Spark para ejecutar la carga de trabajo por lotes.
Confirma tu código.
- Después de probar el código de la carga de trabajo por lotes, puedes confirmar el archivo
.ipynb
o.py
en tu repositorio con tu clientegit
, como GitHub, GitLab o Bitbucket, como parte de tu canalización de CI/CD.
- Después de probar el código de la carga de trabajo por lotes, puedes confirmar el archivo
Programa tu carga de trabajo por lotes con Cloud Composer.
- Consulta Cómo ejecutar cargas de trabajo de Dataproc Serverless con Cloud Composer para obtener instrucciones.
- Video de demostración de YouTube: Unleashing the power of Apache Spark integrated with BigQuery (Desata el poder de Apache Spark integrado en BigQuery).
- Usa BigLake Metastore con Dataproc
- Usa BigLake Metastore con Dataproc sin servidores
Precios
Para obtener información sobre los precios, consulta Precios del tiempo de ejecución de Notebooks de BigQuery.
Abre un notebook de Python de BigQuery Studio
Crea una sesión de Spark en un notebook de BigQuery Studio
Puedes usar un notebook de Python de BigQuery Studio para crear una sesión interactiva de Spark Connect. Cada notebook de BigQuery Studio solo puede tener una sesión activa de Dataproc Serverless asociada.
Puedes crear una sesión de Spark en un notebook de Python de BigQuery Studio de las siguientes maneras:
Sesión única
Para crear una sesión de Spark en un cuaderno nuevo, haz lo siguiente:
from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session import pyspark.sql.connect.functions as f session = Session() # Create the Spark session. spark = ( DataprocSparkSession.builder .appName("APP_NAME") .dataprocSessionConfig(session) .getOrCreate() )
Reemplaza lo siguiente:
Sesión de Spark basada en plantillas
Puedes ingresar y ejecutar el código en una celda de notebook para crear una sesión de Spark basada en una plantilla de sesión de Dataproc Serverless existente. Cualquier parámetro de configuración de
session
que proporciones en el código del notebook anulará cualquier parámetro de configuración establecido en la plantilla de sesión.Para comenzar rápidamente, usa la plantilla
Query using Spark
para completar previamente tu notebook con código de plantilla de sesión de Spark:from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session import pyspark.sql.connect.functions as f session = Session() # Configure the session with an existing session template. session_template = "SESSION_TEMPLATE" session.session_template = f"projects/{project}/locations/{location}/sessionTemplates/{session_template}" # Create the Spark session. spark = ( DataprocSparkSession.builder .appName("APP_NAME") .dataprocSessionConfig(session) .getOrCreate() )
Escribe y ejecuta código de PySpark en tu notebook de BigQuery Studio
Después de crear una sesión de Spark en tu notebook, úsala para ejecutar código de notebook de Spark.
Compatibilidad con la API de PySpark de Spark Connect: Tu sesión de notebook de Spark Connect admite la mayoría de las APIs de PySpark, incluidas DataFrame, Functions y Column, pero no admite SparkContext ni RDD, ni otras APIs de PySpark. Para obtener más información, consulta Qué se admite en Spark 3.5.
APIs específicas de Dataproc: Dataproc simplifica la adición dinámica de paquetes
PyPI
a tu sesión de Spark extendiendo el métodoaddArtifacts
. Puedes especificar la lista en formatoversion-scheme
(similar apip install
). Esto indica al servidor de Spark Connect que instale paquetes y sus dependencias en todos los nodos del clúster, lo que los pone a disposición de los trabajadores para tus UDF.Ejemplo que instala la versión de
textdistance
especificada y las bibliotecas derandom2
compatibles más recientes en el clúster para permitir que las UDF que usantextdistance
yrandom2
se ejecuten en los nodos de trabajo.spark.addArtifacts("textdistance==4.6.1", "random2", pypi=True)
Ayuda con el código de notebooks: El notebook de BigQuery Studio proporciona ayuda con el código cuando mantienes el puntero sobre el nombre de una clase o un método, y proporciona ayuda para completar el código a medida que lo ingresas.
En el siguiente ejemplo, se ingresa
DataprocSparkSession
. y mantener el puntero sobre este nombre de clase muestra la función para completar código y ayuda con la documentación.Ejemplos de PySpark en notebooks de BigQuery Studio
En esta sección, se proporcionan ejemplos de notebooks de Python de BigQuery Studio con código de PySpark para realizar las siguientes tareas:
Conteo de palabras
En el siguiente ejemplo de PySpark, se crea una sesión de Spark y, luego, se cuentan las ocurrencias de palabras en un conjunto de datos públicos
bigquery-public-data.samples.shakespeare
.# Basic wordcount example from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session import pyspark.sql.connect.functions as f session = Session() # Create the Spark session. spark = ( DataprocSparkSession.builder .appName("APP_NAME") .dataprocSessionConfig(session) .getOrCreate() ) # Run a wordcount on the public Shakespeare dataset. df = spark.read.format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load() words_df = df.select(f.explode(f.split(f.col("word"), " ")).alias("word")) word_counts_df = words_df.filter(f.col("word") != "").groupBy("word").agg(f.count("*").alias("count")).orderBy("word") word_counts_df.show()
Reemplaza lo siguiente:
Resultado:
El resultado de la celda muestra una muestra del resultado del recuento de palabras. Para ver los detalles de la sesión en la consola de Google Cloud , haz clic en el vínculo Vista detallada de la sesión interactiva. Para supervisar tu sesión de Spark, haz clic en Ver IU de Spark en la página de detalles de la sesión.
Interactive Session Detail View: LINK +------------+-----+ | word|count| +------------+-----+ | '| 42| | ''All| 1| | ''Among| 1| | ''And| 1| | ''But| 1| | ''Gamut'| 1| | ''How| 1| | ''Lo| 1| | ''Look| 1| | ''My| 1| | ''Now| 1| | ''O| 1| | ''Od's| 1| | ''The| 1| | ''Tis| 4| | ''When| 1| | ''tis| 1| | ''twas| 1| | 'A| 10| |'ARTEMIDORUS| 1| +------------+-----+ only showing top 20 rows
Tabla de Iceberg
Ejecuta código de PySpark para crear una tabla de Iceberg con metadatos de BigLake Metastore
En el siguiente ejemplo de código, se crea un objeto
sample_iceberg_table
con metadatos de la tabla almacenados en el metastore de BigLake y, luego, se consulta la tabla.from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session import pyspark.sql.connect.functions as f # Create the Dataproc Serverless session. session = Session() # Set the session configuration for BigLake Metastore with the Iceberg environment. project = "PROJECT" region = "REGION" subnet_name = "SUBNET_NAME" location = "LOCATION" session.environment_config.execution_config.subnetwork_uri = f"{subnet_name}" warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY" catalog = "CATALOG_NAME" namespace = "NAMESPACE" session.runtime_config.properties[f"spark.sql.catalog.{catalog}"] = "org.apache.iceberg.spark.SparkCatalog" session.runtime_config.properties[f"spark.sql.catalog.{catalog}.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog" session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_project"] = f"{project_id}" session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_location"] = f"{location}" session.runtime_config.properties[f"spark.sql.catalog.{catalog}.warehouse"] = f"{warehouse_dir}" # Create the Spark Connect session. spark = ( DataprocSparkSession.builder .appName("APP_NAME") .dataprocSessionConfig(session) .getOrCreate() ) # Create the namespace in BigQuery. spark.sql(f"USE `{catalog}`;") spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;") spark.sql(f"USE `{namespace}`;") # Create the Iceberg table. spark.sql("DROP TABLE IF EXISTS `sample_iceberg_table`"); spark.sql("CREATE TABLE sample_iceberg_table (id int, data string) USING ICEBERG;") spark.sql("DESCRIBE sample_iceberg_table;") # Insert table data and query the table. spark.sql("INSERT INTO sample_iceberg_table VALUES (1, \"first row\");") # Alter table, then query and display table data and schema. spark.sql("ALTER TABLE sample_iceberg_table ADD COLUMNS (newDoubleCol double);") spark.sql("DESCRIBE sample_iceberg_table;") df = spark.sql("SELECT * FROM sample_iceberg_table") df.show() df.printSchema()
Notas:
El resultado de la celda muestra el
sample_iceberg_table
con la columna agregada y un vínculo a la página Detalles de la sesión interactiva en la consola de Google Cloud . Puedes hacer clic en Ver IU de Spark en la página de detalles de la sesión para supervisar tu sesión de Spark.Interactive Session Detail View: LINK +---+---------+------------+ | id| data|newDoubleCol| +---+---------+------------+ | 1|first row| NULL| +---+---------+------------+ root |-- id: integer (nullable = true) |-- data: string (nullable = true) |-- newDoubleCol: double (nullable = true)
Cómo ver los detalles de la tabla en BigQuery
Sigue estos pasos para verificar los detalles de la tabla de Iceberg en BigQuery:
Otros ejemplos
Crea un
DataFrame
de Spark (sdf
) a partir de un DataFrame de Pandas (df
).sdf = spark.createDataFrame(df) sdf.show()
Ejecuta agregaciones en el
DataFrames
de Spark.from pyspark.sql import functions as F sdf.groupby("segment").agg( F.mean("total_spend_per_user").alias("avg_order_value"), F.approx_count_distinct("user_id").alias("unique_customers") ).show()
Lee desde BigQuery con el conector de Spark-BigQuery.
spark.conf.set("viewsEnabled","true") spark.conf.set("materializationDataset","my-bigquery-dataset") sdf = spark.read.format('bigquery') \ .load(query)
Escribe código de Spark con Gemini Code Assist
Puedes pedirle a Gemini Code Assist que genere código de PySpark en tu notebook. Gemini Code Assist recupera y usa las tablas pertinentes de BigQuery y Dataproc Metastore, y sus esquemas, para generar una respuesta de código.
Para generar código de Gemini Code Assist en tu notebook, haz lo siguiente:
Sugerencias para la generación de código con Gemini Code Assist
Finaliza la sesión de Spark
Puedes realizar cualquiera de las siguientes acciones para detener tu sesión de Spark Connect en tu notebook de BigQuery Studio:
Cómo organizar el código de los notebooks de BigQuery Studio
Puedes organizar el código del notebook de BigQuery Studio de las siguientes maneras:
Programa código de notebook desde la consola de Google Cloud
Puedes programar el código del notebook de las siguientes maneras:
Ejecuta código de notebook como una carga de trabajo por lotes de Dataproc Serverless
Completa los siguientes pasos para ejecutar el código del notebook de BigQuery Studio como una carga de trabajo por lotes de Dataproc Serverless.
Soluciona problemas de errores de notebooks
Si se produce una falla en una celda que contiene código de Spark, puedes solucionar el error haciendo clic en el vínculo Interactive Session Detail View en el resultado de la celda (consulta los ejemplos de Wordcount y de tablas de Iceberg).
Problemas conocidos y soluciones
Error: Un entorno de ejecución de notebook creado con la versión de Python
3.10
puede causar un errorPYTHON_VERSION_MISMATCH
cuando intenta conectarse a la sesión de Spark.Solución: Vuelve a crear el entorno de ejecución con la versión
3.11
de Python.¿Qué sigue?