Esegui codice PySpark nei notebook BigQuery Studio
Questo documento mostra come eseguire il codice PySpark in un notebook Python BigQuery.
Prima di iniziare
Se non l'hai ancora fatto, crea un progetto e un bucket Cloud Storage. Google Cloud
Configurare il progetto
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
Crea un bucket Cloud Storage nel tuo progetto se non ne hai uno da utilizzare.
Configurare il notebook
- Credenziali del notebook: per impostazione predefinita, la sessione del notebook utilizza le tue
credenziali utente. Se vuoi specificare le credenziali del account di servizio per la tua sessione, questo deve disporre del ruolo Dataproc Worker (
roles/dataproc.worker
). Per saperne di più, vedi Service account Dataproc Serverless. - Runtime del notebook: il notebook utilizza un runtime Vertex predefinito, a meno che non ne selezioni uno diverso. Se vuoi definire il tuo runtime, crealo dalla pagina Runtime nella Google Cloud console.
- Credenziali del notebook: per impostazione predefinita, la sessione del notebook utilizza le tue
credenziali utente. Se vuoi specificare le credenziali del account di servizio per la tua sessione, questo deve disporre del ruolo Dataproc Worker (
Nella console Google Cloud , vai alla pagina BigQuery.
Nella barra delle schede del riquadro dei dettagli, fai clic sulla freccia
accanto al segno +, quindi su Blocco note.- Configura e crea una singola sessione nel notebook.
- Configura una sessione Spark in un
modello di sessione interattiva Dataproc Serverless per Spark,
quindi utilizza il modello per configurare e creare una sessione nel notebook.
BigQuery fornisce una funzionalità
Query using Spark
che ti aiuta a iniziare a codificare la sessione basata su modelli, come spiegato nella scheda Sessione Spark basata su modelli. Nella barra delle schede del riquadro dell'editor, fai clic sul
menu a discesa a forma di freccia accanto al segno +, poi fai clic su Blocco note.Copia ed esegui il seguente codice in una cella del notebook per configurare e creare una sessione Spark di base.
- APP_NAME: un nome facoltativo per la sessione.
- (Facoltativo) Impostazioni sessione:puoi aggiungere impostazioni dell'API Dataproc
Session
per personalizzare la sessione. Ecco alcuni esempi:RuntimeConfig
:session.runtime_config.properties={spark.property.key1:VALUE_1,...,spark.property.keyN:VALUE_N}
session.runtime_config.container_image = path/to/container/image
EnvironmentConfig
:- session.environment_config.execution_config.subnetwork_uri = "SUBNET_NAME"
session.environment_config.execution_config.ttl = {"seconds": VALUE}
session.environment_config.execution_config.service_account = SERVICE_ACCOUNT
- Nella barra delle schede del riquadro dell'editor, fai clic sul
- In Inizia con un modello, fai clic su Query con Spark, quindi fai clic su
Utilizza modello per inserire il codice nel notebook.
- Specifica le variabili come spiegato nelle Note.
- Puoi eliminare tutte le celle di codice campione aggiuntive inserite nel blocco note.
- PROJECT: l'ID progetto, elencato nella sezione Informazioni progetto della dashboard della consoleGoogle Cloud .
- LOCATION: la regione di Compute Engine in cui verrà eseguita la sessione del notebook. Se non viene fornita, la località predefinita è la regione della VM che crea il notebook.
SESSION_TEMPLATE: il nome di un modello di sessione interattiva serverless di Dataproc esistente. Le impostazioni di configurazione della sessione vengono ottenute dal modello. Il modello deve specificare anche le seguenti impostazioni:
- Versione runtime
2.3
+ Tipo di notebook:
Spark Connect
Esempio:
- Versione runtime
APP_NAME: un nome facoltativo per la sessione.
- Esegui un conteggio parole su un set di dati pubblico di Shakespeare.
- Crea una tabella Iceberg con i metadati salvati in BigLake Metastore.
- APP_NAME: un nome facoltativo per la sessione.
- PROJECT: l'ID progetto, elencato nella sezione Informazioni progetto della dashboard della consoleGoogle Cloud .
- REGION e SUBNET_NAME: specifica la regione Compute Engine e il nome di una subnet nella regione della sessione. Dataproc Serverless abilita l'accesso privato Google (PGA) sulla subnet specificata.
- LOCATION: il valore predefinito
BigQuery_metastore_config.location
espark.sql.catalog.{catalog}.gcp_location
èUS
, ma puoi scegliere qualsiasi località BigQuery supportata. - BUCKET e WAREHOUSE_DIRECTORY: il bucket Cloud Storage e la cartella utilizzati per la directory del warehouse Iceberg.
- CATALOG_NAME e NAMESPACE: il nome del catalogo Iceberg
e lo spazio dei nomi si combinano per identificare la tabella Iceberg (
catalog.namespace.table_name
). - APP_NAME: un nome facoltativo per la sessione.
Nella console Google Cloud , vai alla pagina BigQuery.
Nel riquadro delle risorse del progetto, fai clic sul progetto, poi fai clic su il tuo spazio dei nomi per elencare la tabella
sample_iceberg_table
. Fai clic sulla tabella Dettagli per visualizzare le informazioni Apri configurazione tabella catalogo.I formati di input e output sono i formati standard delle classi Hadoop
InputFormat
eOutputFormat
utilizzati da Iceberg.Inserisci una nuova cella di codice facendo clic su + Codice nella barra degli strumenti. Nella nuova cella di codice viene visualizzato
Start coding or generate with AI
. Fai clic su Genera.Nell'editor Genera, inserisci un prompt in linguaggio naturale e poi fai clic su
enter
. Assicurati di includere la parola chiavespark
opyspark
nel prompt.Prompt di esempio:
create a spark dataframe from order_items and filter to orders created in 2024
Esempio di output:
spark.read.format("bigquery").option("table", "sqlgen-testing.pysparkeval_ecommerce.order_items").load().filter("year(created_at) = 2024").createOrReplaceTempView("order_items") df = spark.sql("SELECT * FROM order_items")
Per consentire a Gemini Code Assist di recuperare tabelle e schemi pertinenti, attiva la sincronizzazione di Data Catalog per le istanze Dataproc Metastore.
Assicurati che il tuo account utente abbia accesso a Data Catalog alle tabelle di query. Per farlo, assegna il ruolo
DataCatalog.Viewer
.- Esegui
spark.stop()
in una cella del notebook. - Termina il runtime nel notebook:
- Fai clic sul selettore del runtime e poi su Gestisci sessioni.
- Nella finestra di dialogo Sessioni attive, fai clic sull'icona di chiusura, quindi su Termina.
- Fai clic sul selettore del runtime e poi su Gestisci sessioni.
Pianifica il codice del notebook dalla Google Cloud console (si applicano i prezzi dei notebook).
Esegui il codice del notebook come carico di lavoro batch Dataproc Serverless (si applicano i prezzi di Dataproc Serverless).
- Pianifica il notebook.
- Se l'esecuzione del codice del notebook fa parte di un flusso di lavoro, pianifica il notebook nell'ambito di una pipeline.
Scarica il codice del blocco note in un file in un terminale locale o in Cloud Shell.
Apri il notebook nel riquadro Explorer nella pagina BigQuery Studio nella console Google Cloud .
Scarica il codice del blocco note selezionando Scarica dal menu File, poi scegli
Download .py
.
Genera
requirements.txt
.- Installa
pipreqs
nella directory in cui hai salvato il file.py
.pip install pipreqs
Esegui
pipreqs
per generarerequirements.txt
.pipreqs filename.py
Utilizza Google Cloud CLI per copiare il file
requirements.txt
locale in un bucket in Cloud Storage.gcloud storage cp requirements.txt gs://BUCKET/
- Installa
Aggiorna il codice della sessione Spark modificando il file
.py
scaricato.Rimuovi o commenta tutti i comandi dello script shell.
Rimuovi il codice che configura la sessione Spark, quindi specifica i parametri di configurazione come parametri di invio del carico di lavoro batch. (vedi Inviare un carico di lavoro batch Spark).
Esempio:
Rimuovi la seguente riga di configurazione della subnet di sessione dal codice:
session.environment_config.execution_config.subnetwork_uri = "{subnet_name}"
Quando esegui il carico di lavoro batch, utilizza il flag
--subnet
per specificare la subnet.gcloud dataproc batches submit pyspark \ --subnet=SUBNET_NAME
Utilizza un semplice snippet di codice per la creazione della sessione.
Codice del blocco note scaricato di esempio prima della semplificazione.
from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session
session = Session() spark = DataprocSparkSession \ .builder \ .appName("CustomSparkSession") .dataprocSessionConfig(session) \ .getOrCreate()
Codice del carico di lavoro batch dopo la semplificazione.
from pyspark.sql import SparkSession
spark = SparkSession \ .builder \ .getOrCreate()
Esegui il carico di lavoro batch.
Per istruzioni, consulta la sezione Inviare il carico di lavoro batch Spark.
Assicurati di includere il flag --deps-bucket per indicare il bucket Cloud Storage che contiene il file
requirements.txt
.Esempio:
gcloud dataproc batches submit pyspark FILENAME.py \ --region=REGION \ --deps-bucket=BUCKET \ --version=2.3
Note:
- FILENAME: il nome del file di codice del notebook scaricato e modificato.
- REGION: la regione di Compute Engine in cui si trova il cluster.
- BUCKET Il nome del bucket Cloud Storage
che contiene il file
requirements.txt
. --version
: è selezionata la versione 2.3 di Spark Runtime per eseguire il carico di lavoro batch.
Esegui il commit del codice.
- Dopo aver testato il codice del carico di lavoro batch, puoi eseguire il commit del file
.ipynb
o.py
nel repository utilizzando il clientgit
, ad esempio GitHub, GitLab o Bitbucket, nell'ambito della pipeline CI/CD.
- Dopo aver testato il codice del carico di lavoro batch, puoi eseguire il commit del file
Pianifica il tuo workload batch con Cloud Composer.
- Consulta la sezione Esegui carichi di lavoro Dataproc Serverless con Cloud Composer per istruzioni.
- Video dimostrativo di YouTube: Unleashing the power of Apache Spark integrated with BigQuery.
- Utilizzare BigLake Metastore con Dataproc
- Utilizzare BigLake Metastore con Dataproc Serverless
Prezzi
Per informazioni sui prezzi, consulta la sezione Prezzi del runtime dei notebook di BigQuery.
Apri un notebook Python di BigQuery Studio
Creare una sessione Spark in un notebook BigQuery Studio
Puoi utilizzare un notebook Python di BigQuery Studio per creare una sessione interattiva Spark Connect. A ogni notebook BigQuery Studio può essere associata una sola sessione Dataproc Serverless attiva.
Puoi creare una sessione Spark in un notebook Python di BigQuery Studio nei seguenti modi:
Singola sessione
Per creare una sessione Spark in un nuovo notebook:
from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session import pyspark.sql.connect.functions as f session = Session() # Create the Spark session. spark = ( DataprocSparkSession.builder .appName("APP_NAME") .dataprocSessionConfig(session) .getOrCreate() )
Sostituisci quanto segue:
Sessione Spark basata su modello
Puoi inserire ed eseguire il codice in una cella del notebook per creare una sessione Spark basata su un modello di sessione Dataproc Serverless esistente. Qualsiasi impostazione di configurazione
session
fornita nel codice del notebook sostituirà qualsiasi impostazione uguale impostata nel modello di sessione.Per iniziare rapidamente, utilizza il modello
Query using Spark
per precompilare il notebook con il codice del modello di sessione Spark:from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session import pyspark.sql.connect.functions as f session = Session() # Configure the session with an existing session template. session_template = "SESSION_TEMPLATE" session.session_template = f"projects/{project}/locations/{location}/sessionTemplates/{session_template}" # Create the Spark session. spark = ( DataprocSparkSession.builder .appName("APP_NAME") .dataprocSessionConfig(session) .getOrCreate() )
Scrivere ed eseguire codice PySpark nel notebook BigQuery Studio
Dopo aver creato una sessione Spark nel notebook, utilizzala per eseguire il codice del notebook Spark nel notebook.
Supporto dell'API Spark Connect PySpark:la sessione del notebook Spark Connect supporta la maggior parte delle API PySpark, tra cui DataFrame, Functions e Column, ma non supporta SparkContext e RDD e altre API PySpark. Per ulteriori informazioni, vedi Che cosa è supportato in Spark 3.5.
API specifiche di Dataproc:Dataproc semplifica l'aggiunta dinamica di pacchetti
PyPI
alla sessione Spark estendendo il metodoaddArtifacts
. Puoi specificare l'elenco nel formatoversion-scheme
, (simile apip install
). In questo modo, il server Spark Connect installa i pacchetti e le relative dipendenze su tutti i nodi del cluster, rendendoli disponibili ai worker per le tue UDF.Esempio che installa la versione
textdistance
specificata e le librerierandom2
compatibili più recenti sul cluster per consentire l'esecuzione di UDF che utilizzanotextdistance
erandom2
sui nodi worker.spark.addArtifacts("textdistance==4.6.1", "random2", pypi=True)
Guida al codice del notebook: il notebook BigQuery Studio fornisce assistenza per il codice quando tieni il puntatore sopra il nome di una classe o di un metodo e fornisce assistenza per il completamento del codice mentre lo inserisci.
Nel seguente esempio, inserisci
DataprocSparkSession
. e se passi il puntatore sopra questo nome di classe, vengono visualizzati il completamento del codice e la guida alla documentazione.Esempi di PySpark per i notebook BigQuery Studio
Questa sezione fornisce esempi di notebook Python di BigQuery Studio con codice PySpark per eseguire le seguenti attività:
Wordcount
Il seguente esempio Pyspark crea una sessione Spark, quindi conta le occorrenze delle parole in un set di dati pubblico
bigquery-public-data.samples.shakespeare
.# Basic wordcount example from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session import pyspark.sql.connect.functions as f session = Session() # Create the Spark session. spark = ( DataprocSparkSession.builder .appName("APP_NAME") .dataprocSessionConfig(session) .getOrCreate() ) # Run a wordcount on the public Shakespeare dataset. df = spark.read.format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load() words_df = df.select(f.explode(f.split(f.col("word"), " ")).alias("word")) word_counts_df = words_df.filter(f.col("word") != "").groupBy("word").agg(f.count("*").alias("count")).orderBy("word") word_counts_df.show()
Sostituisci quanto segue:
Output:
L'output della cella elenca un campione dell'output del conteggio parole. Per visualizzare i dettagli della sessione nella console Google Cloud , fai clic sul link Visualizzazione dettagli sessione interattiva. Per monitorare la sessione Spark, fai clic su Visualizza UI di Spark nella pagina dei dettagli della sessione.
Interactive Session Detail View: LINK +------------+-----+ | word|count| +------------+-----+ | '| 42| | ''All| 1| | ''Among| 1| | ''And| 1| | ''But| 1| | ''Gamut'| 1| | ''How| 1| | ''Lo| 1| | ''Look| 1| | ''My| 1| | ''Now| 1| | ''O| 1| | ''Od's| 1| | ''The| 1| | ''Tis| 4| | ''When| 1| | ''tis| 1| | ''twas| 1| | 'A| 10| |'ARTEMIDORUS| 1| +------------+-----+ only showing top 20 rows
Tavolo Iceberg
Esegui il codice PySpark per creare una tabella Iceberg con i metadati di BigLake Metastore
Il seguente codice di esempio crea un
sample_iceberg_table
con metadati della tabella archiviati in BigLake Metastore, quindi esegue query sulla tabella.from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session import pyspark.sql.connect.functions as f # Create the Dataproc Serverless session. session = Session() # Set the session configuration for BigLake Metastore with the Iceberg environment. project = "PROJECT" region = "REGION" subnet_name = "SUBNET_NAME" location = "LOCATION" session.environment_config.execution_config.subnetwork_uri = f"{subnet_name}" warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY" catalog = "CATALOG_NAME" namespace = "NAMESPACE" session.runtime_config.properties[f"spark.sql.catalog.{catalog}"] = "org.apache.iceberg.spark.SparkCatalog" session.runtime_config.properties[f"spark.sql.catalog.{catalog}.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog" session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_project"] = f"{project_id}" session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_location"] = f"{location}" session.runtime_config.properties[f"spark.sql.catalog.{catalog}.warehouse"] = f"{warehouse_dir}" # Create the Spark Connect session. spark = ( DataprocSparkSession.builder .appName("APP_NAME") .dataprocSessionConfig(session) .getOrCreate() ) # Create the namespace in BigQuery. spark.sql(f"USE `{catalog}`;") spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;") spark.sql(f"USE `{namespace}`;") # Create the Iceberg table. spark.sql("DROP TABLE IF EXISTS `sample_iceberg_table`"); spark.sql("CREATE TABLE sample_iceberg_table (id int, data string) USING ICEBERG;") spark.sql("DESCRIBE sample_iceberg_table;") # Insert table data and query the table. spark.sql("INSERT INTO sample_iceberg_table VALUES (1, \"first row\");") # Alter table, then query and display table data and schema. spark.sql("ALTER TABLE sample_iceberg_table ADD COLUMNS (newDoubleCol double);") spark.sql("DESCRIBE sample_iceberg_table;") df = spark.sql("SELECT * FROM sample_iceberg_table") df.show() df.printSchema()
Note:
L'output della cella elenca
sample_iceberg_table
con la colonna aggiunta e mostra un link alla pagina Dettagli sessione interattiva nella console Google Cloud . Puoi fare clic su Visualizza UI di Spark nella pagina dei dettagli della sessione per monitorare la tua sessione Spark.Interactive Session Detail View: LINK +---+---------+------------+ | id| data|newDoubleCol| +---+---------+------------+ | 1|first row| NULL| +---+---------+------------+ root |-- id: integer (nullable = true) |-- data: string (nullable = true) |-- newDoubleCol: double (nullable = true)
Visualizzare i dettagli della tabella in BigQuery
Per controllare i dettagli della tabella Iceberg in BigQuery:
Altri esempi
Crea uno Spark
DataFrame
(sdf
) da un DataFrame Pandas (df
).sdf = spark.createDataFrame(df) sdf.show()
Esegui aggregazioni su Spark
DataFrames
.from pyspark.sql import functions as F sdf.groupby("segment").agg( F.mean("total_spend_per_user").alias("avg_order_value"), F.approx_count_distinct("user_id").alias("unique_customers") ).show()
Leggi da BigQuery utilizzando il connettore Spark-BigQuery.
spark.conf.set("viewsEnabled","true") spark.conf.set("materializationDataset","my-bigquery-dataset") sdf = spark.read.format('bigquery') \ .load(query)
Scrivere codice Spark con Gemini Code Assist
Puoi chiedere a Gemini in Code Assist di generare codice PySpark nel tuo notebook. Gemini Code Assist recupera e utilizza le tabelle BigQuery e Dataproc Metastore pertinenti e i relativi schemi per generare una risposta di codice.
Per generare codice Gemini Code Assist nel tuo notebook:
Suggerimenti per la generazione di codice di Gemini Code Assist
Terminare la sessione Spark
Puoi eseguire una delle seguenti azioni per interrompere la sessione Spark Connect nel notebook BigQuery Studio:
Orchestrare il codice del notebook BigQuery Studio
Puoi orchestrare il codice del notebook BigQuery Studio nei seguenti modi:
Pianificare il codice del notebook dalla console Google Cloud
Puoi programmare il codice del notebook nei seguenti modi:
Esegui il codice del blocco note come carico di lavoro batch Dataproc Serverless
Completa i seguenti passaggi per eseguire il codice del blocco note BigQuery Studio come carico di lavoro batch Dataproc Serverless.
Risolvere gli errori del notebook
Se si verifica un errore in una cella contenente codice Spark, puoi risolvere il problema facendo clic sul link Visualizzazione dettagli sessione interattiva nell'output della cella (vedi gli esempi di tabelle Wordcount e Iceberg).
Problemi noti e soluzioni
Errore: un runtime del notebook creato con la versione Python
3.10
può causare un errorePYTHON_VERSION_MISMATCH
quando tenta di connettersi alla sessione Spark.Soluzione: ricrea il runtime con Python versione
3.11
.Passaggi successivi