Esegui codice PySpark nei notebook BigQuery Studio

Questo documento mostra come eseguire il codice PySpark in un notebook Python BigQuery.

Prima di iniziare

Se non l'hai ancora fatto, crea un progetto e un bucket Cloud Storage. Google Cloud

  1. Configurare il progetto

    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    3. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    6. Crea un bucket Cloud Storage nel tuo progetto se non ne hai uno da utilizzare.

    7. Configurare il notebook

      1. Credenziali del notebook: per impostazione predefinita, la sessione del notebook utilizza le tue credenziali utente. Se vuoi specificare le credenziali del account di servizio per la tua sessione, questo deve disporre del ruolo Dataproc Worker (roles/dataproc.worker). Per saperne di più, vedi Service account Dataproc Serverless.
      2. Runtime del notebook: il notebook utilizza un runtime Vertex predefinito, a meno che non ne selezioni uno diverso. Se vuoi definire il tuo runtime, crealo dalla pagina Runtime nella Google Cloud console.
    8. Prezzi

      Per informazioni sui prezzi, consulta la sezione Prezzi del runtime dei notebook di BigQuery.

      Apri un notebook Python di BigQuery Studio

      1. Nella console Google Cloud , vai alla pagina BigQuery.

        Vai a BigQuery

      2. Nella barra delle schede del riquadro dei dettagli, fai clic sulla freccia accanto al segno +, quindi su Blocco note.

      Creare una sessione Spark in un notebook BigQuery Studio

      Puoi utilizzare un notebook Python di BigQuery Studio per creare una sessione interattiva Spark Connect. A ogni notebook BigQuery Studio può essere associata una sola sessione Dataproc Serverless attiva.

      Puoi creare una sessione Spark in un notebook Python di BigQuery Studio nei seguenti modi:

      • Configura e crea una singola sessione nel notebook.
      • Configura una sessione Spark in un modello di sessione interattiva Dataproc Serverless per Spark, quindi utilizza il modello per configurare e creare una sessione nel notebook. BigQuery fornisce una funzionalità Query using Spark che ti aiuta a iniziare a codificare la sessione basata su modelli, come spiegato nella scheda Sessione Spark basata su modelli.

      Singola sessione

      Per creare una sessione Spark in un nuovo notebook:

      1. Nella barra delle schede del riquadro dell'editor, fai clic sul menu a discesa a forma di freccia accanto al segno +, poi fai clic su Blocco note.

        Screenshot che mostra l'interfaccia BigQuery con il pulsante "+" per creare un nuovo notebook.
      2. Copia ed esegui il seguente codice in una cella del notebook per configurare e creare una sessione Spark di base.

      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      
      import pyspark.sql.connect.functions as f
      
      session = Session()
      
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      

      Sostituisci quanto segue:

      • APP_NAME: un nome facoltativo per la sessione.
      • (Facoltativo) Impostazioni sessione:puoi aggiungere impostazioni dell'API Dataproc Session per personalizzare la sessione. Ecco alcuni esempi:
        • RuntimeConfig:
          Guida al codice che mostra le opzioni di session.runtime.config.
          • session.runtime_config.properties={spark.property.key1:VALUE_1,...,spark.property.keyN:VALUE_N}
          • session.runtime_config.container_image = path/to/container/image
        • EnvironmentConfig:
          Guida al codice che mostra le opzioni di configurazione dell'ambiente di sessione e di esecuzione.
          • session.environment_config.execution_config.subnetwork_uri = "SUBNET_NAME"
          • session.environment_config.execution_config.ttl = {"seconds": VALUE}
          • session.environment_config.execution_config.service_account = SERVICE_ACCOUNT

      Sessione Spark basata su modello

      Puoi inserire ed eseguire il codice in una cella del notebook per creare una sessione Spark basata su un modello di sessione Dataproc Serverless esistente. Qualsiasi impostazione di configurazione session fornita nel codice del notebook sostituirà qualsiasi impostazione uguale impostata nel modello di sessione.

      Per iniziare rapidamente, utilizza il modello Query using Spark per precompilare il notebook con il codice del modello di sessione Spark:

      1. Nella barra delle schede del riquadro dell'editor, fai clic sul menu a discesa a forma di freccia accanto al segno +, poi fai clic su Blocco note.
        Screenshot che mostra l'interfaccia BigQuery con il pulsante "+" per creare un nuovo notebook.
      2. In Inizia con un modello, fai clic su Query con Spark, quindi fai clic su Utilizza modello per inserire il codice nel notebook.
        Selezioni dell'interfaccia utente BigQuery per iniziare con un modello
      3. Specifica le variabili come spiegato nelle Note.
      4. Puoi eliminare tutte le celle di codice campione aggiuntive inserite nel blocco note.
      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      session = Session()
      # Configure the session with an existing session template.
      session_template = "SESSION_TEMPLATE"
      session.session_template = f"projects/{project}/locations/{location}/sessionTemplates/{session_template}"
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      

      Sostituisci quanto segue:

      • PROJECT: l'ID progetto, elencato nella sezione Informazioni progetto della dashboard della consoleGoogle Cloud .
      • LOCATION: la regione di Compute Engine in cui verrà eseguita la sessione del notebook. Se non viene fornita, la località predefinita è la regione della VM che crea il notebook.
      • SESSION_TEMPLATE: il nome di un modello di sessione interattiva serverless di Dataproc esistente. Le impostazioni di configurazione della sessione vengono ottenute dal modello. Il modello deve specificare anche le seguenti impostazioni:

        • Versione runtime 2.3+
        • Tipo di notebook: Spark Connect

          Esempio:

          Screenshot che mostra le impostazioni richieste di Spark Connect.
      • APP_NAME: un nome facoltativo per la sessione.

      Scrivere ed eseguire codice PySpark nel notebook BigQuery Studio

      Dopo aver creato una sessione Spark nel notebook, utilizzala per eseguire il codice del notebook Spark nel notebook.

      Supporto dell'API Spark Connect PySpark:la sessione del notebook Spark Connect supporta la maggior parte delle API PySpark, tra cui DataFrame, Functions e Column, ma non supporta SparkContext e RDD e altre API PySpark. Per ulteriori informazioni, vedi Che cosa è supportato in Spark 3.5.

      API specifiche di Dataproc:Dataproc semplifica l'aggiunta dinamica di pacchetti PyPI alla sessione Spark estendendo il metodo addArtifacts. Puoi specificare l'elenco nel formato version-scheme, (simile a pip install). In questo modo, il server Spark Connect installa i pacchetti e le relative dipendenze su tutti i nodi del cluster, rendendoli disponibili ai worker per le tue UDF.

      Esempio che installa la versione textdistance specificata e le librerie random2 compatibili più recenti sul cluster per consentire l'esecuzione di UDF che utilizzano textdistance e random2 sui nodi worker.

      spark.addArtifacts("textdistance==4.6.1", "random2", pypi=True)
      

      Guida al codice del notebook: il notebook BigQuery Studio fornisce assistenza per il codice quando tieni il puntatore sopra il nome di una classe o di un metodo e fornisce assistenza per il completamento del codice mentre lo inserisci.

      Nel seguente esempio, inserisci DataprocSparkSession. e se passi il puntatore sopra questo nome di classe, vengono visualizzati il completamento del codice e la guida alla documentazione.

      Esempi di suggerimenti per la documentazione e il completamento del codice.

      Esempi di PySpark per i notebook BigQuery Studio

      Questa sezione fornisce esempi di notebook Python di BigQuery Studio con codice PySpark per eseguire le seguenti attività:

      • Esegui un conteggio parole su un set di dati pubblico di Shakespeare.
      • Crea una tabella Iceberg con i metadati salvati in BigLake Metastore.

      Wordcount

      Il seguente esempio Pyspark crea una sessione Spark, quindi conta le occorrenze delle parole in un set di dati pubblico bigquery-public-data.samples.shakespeare.

      # Basic wordcount example
      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      session = Session()
      
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      # Run a wordcount on the public Shakespeare dataset.
      df = spark.read.format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load()
      words_df = df.select(f.explode(f.split(f.col("word"), " ")).alias("word"))
      word_counts_df = words_df.filter(f.col("word") != "").groupBy("word").agg(f.count("*").alias("count")).orderBy("word")
      word_counts_df.show()
      

      Sostituisci quanto segue:

      • APP_NAME: un nome facoltativo per la sessione.

      Output:

      L'output della cella elenca un campione dell'output del conteggio parole. Per visualizzare i dettagli della sessione nella console Google Cloud , fai clic sul link Visualizzazione dettagli sessione interattiva. Per monitorare la sessione Spark, fai clic su Visualizza UI di Spark nella pagina dei dettagli della sessione.

      Visualizzare il pulsante dell'interfaccia utente di Spark nella pagina dei dettagli della sessione nella console
      Interactive Session Detail View: LINK
      +------------+-----+
      |        word|count|
      +------------+-----+
      |           '|   42|
      |       ''All|    1|
      |     ''Among|    1|
      |       ''And|    1|
      |       ''But|    1|
      |    ''Gamut'|    1|
      |       ''How|    1|
      |        ''Lo|    1|
      |      ''Look|    1|
      |        ''My|    1|
      |       ''Now|    1|
      |         ''O|    1|
      |      ''Od's|    1|
      |       ''The|    1|
      |       ''Tis|    4|
      |      ''When|    1|
      |       ''tis|    1|
      |      ''twas|    1|
      |          'A|   10|
      |'ARTEMIDORUS|    1|
      +------------+-----+
      only showing top 20 rows
      

      Tavolo Iceberg

      Esegui il codice PySpark per creare una tabella Iceberg con i metadati di BigLake Metastore

      Il seguente codice di esempio crea un sample_iceberg_table con metadati della tabella archiviati in BigLake Metastore, quindi esegue query sulla tabella.

      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      # Create the Dataproc Serverless session.
      session = Session()
      # Set the session configuration for BigLake Metastore with the Iceberg environment.
      project = "PROJECT"
      region = "REGION"
      subnet_name = "SUBNET_NAME"
      location = "LOCATION"
      session.environment_config.execution_config.subnetwork_uri = f"{subnet_name}"
      warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY"
      catalog = "CATALOG_NAME"
      namespace = "NAMESPACE"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}"] = "org.apache.iceberg.spark.SparkCatalog"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_project"] = f"{project_id}"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_location"] = f"{location}"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.warehouse"] = f"{warehouse_dir}"
      # Create the Spark Connect session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      # Create the namespace in BigQuery.
      spark.sql(f"USE `{catalog}`;")
      spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;")
      spark.sql(f"USE `{namespace}`;")
      # Create the Iceberg table.
      spark.sql("DROP TABLE IF EXISTS `sample_iceberg_table`");
      spark.sql("CREATE TABLE sample_iceberg_table (id int, data string) USING ICEBERG;")
      spark.sql("DESCRIBE sample_iceberg_table;")
      # Insert table data and query the table.
      spark.sql("INSERT INTO sample_iceberg_table VALUES (1, \"first row\");")
      # Alter table, then query and display table data and schema.
      spark.sql("ALTER TABLE sample_iceberg_table ADD COLUMNS (newDoubleCol double);")
      spark.sql("DESCRIBE sample_iceberg_table;")
      df = spark.sql("SELECT * FROM sample_iceberg_table")
      df.show()
      df.printSchema()
      

      Note:

      • PROJECT: l'ID progetto, elencato nella sezione Informazioni progetto della dashboard della consoleGoogle Cloud .
      • REGION e SUBNET_NAME: specifica la regione Compute Engine e il nome di una subnet nella regione della sessione. Dataproc Serverless abilita l'accesso privato Google (PGA) sulla subnet specificata.
      • LOCATION: il valore predefinito BigQuery_metastore_config.location e spark.sql.catalog.{catalog}.gcp_location è US, ma puoi scegliere qualsiasi località BigQuery supportata.
      • BUCKET e WAREHOUSE_DIRECTORY: il bucket Cloud Storage e la cartella utilizzati per la directory del warehouse Iceberg.
      • CATALOG_NAME e NAMESPACE: il nome del catalogo Iceberg e lo spazio dei nomi si combinano per identificare la tabella Iceberg (catalog.namespace.table_name).
      • APP_NAME: un nome facoltativo per la sessione.

      L'output della cella elenca sample_iceberg_table con la colonna aggiunta e mostra un link alla pagina Dettagli sessione interattiva nella console Google Cloud . Puoi fare clic su Visualizza UI di Spark nella pagina dei dettagli della sessione per monitorare la tua sessione Spark.

      Interactive Session Detail View: LINK
      +---+---------+------------+
      | id|     data|newDoubleCol|
      +---+---------+------------+
      |  1|first row|        NULL|
      +---+---------+------------+
      
      root
       |-- id: integer (nullable = true)
       |-- data: string (nullable = true)
       |-- newDoubleCol: double (nullable = true)
      

      Visualizzare i dettagli della tabella in BigQuery

      Per controllare i dettagli della tabella Iceberg in BigQuery:

      1. Nella console Google Cloud , vai alla pagina BigQuery.

        Vai a BigQuery

      2. Nel riquadro delle risorse del progetto, fai clic sul progetto, poi fai clic su il tuo spazio dei nomi per elencare la tabella sample_iceberg_table. Fai clic sulla tabella Dettagli per visualizzare le informazioni Apri configurazione tabella catalogo.

        I formati di input e output sono i formati standard delle classi Hadoop InputFormat e OutputFormat utilizzati da Iceberg.

        Metadati della tabella Iceberg elencati nella UI BigQuery

      Altri esempi

      Crea uno Spark DataFrame (sdf) da un DataFrame Pandas (df).

      sdf = spark.createDataFrame(df)
      sdf.show()
      

      Esegui aggregazioni su Spark DataFrames.

      from pyspark.sql import functions as F
      
      sdf.groupby("segment").agg(
         F.mean("total_spend_per_user").alias("avg_order_value"),
         F.approx_count_distinct("user_id").alias("unique_customers")
      ).show()
      

      Leggi da BigQuery utilizzando il connettore Spark-BigQuery.

      spark.conf.set("viewsEnabled","true")
      spark.conf.set("materializationDataset","my-bigquery-dataset")
      
      sdf = spark.read.format('bigquery') \
       .load(query)
      

      Scrivere codice Spark con Gemini Code Assist

      Puoi chiedere a Gemini in Code Assist di generare codice PySpark nel tuo notebook. Gemini Code Assist recupera e utilizza le tabelle BigQuery e Dataproc Metastore pertinenti e i relativi schemi per generare una risposta di codice.

      Per generare codice Gemini Code Assist nel tuo notebook:

      1. Inserisci una nuova cella di codice facendo clic su + Codice nella barra degli strumenti. Nella nuova cella di codice viene visualizzato Start coding or generate with AI. Fai clic su Genera.

      2. Nell'editor Genera, inserisci un prompt in linguaggio naturale e poi fai clic su enter. Assicurati di includere la parola chiave spark o pyspark nel prompt.

        Prompt di esempio:

        create a spark dataframe from order_items and filter to orders created in 2024
        

        Esempio di output:

        spark.read.format("bigquery").option("table", "sqlgen-testing.pysparkeval_ecommerce.order_items").load().filter("year(created_at) = 2024").createOrReplaceTempView("order_items")
        df = spark.sql("SELECT * FROM order_items")
        

      Suggerimenti per la generazione di codice di Gemini Code Assist

      • Per consentire a Gemini Code Assist di recuperare tabelle e schemi pertinenti, attiva la sincronizzazione di Data Catalog per le istanze Dataproc Metastore.

      • Assicurati che il tuo account utente abbia accesso a Data Catalog alle tabelle di query. Per farlo, assegna il ruolo DataCatalog.Viewer.

      Terminare la sessione Spark

      Puoi eseguire una delle seguenti azioni per interrompere la sessione Spark Connect nel notebook BigQuery Studio:

      • Esegui spark.stop() in una cella del notebook.
      • Termina il runtime nel notebook:
        1. Fai clic sul selettore del runtime e poi su Gestisci sessioni.
          Gestire la selezione delle sessioni
        2. Nella finestra di dialogo Sessioni attive, fai clic sull'icona di chiusura, quindi su Termina.
          Termina la selezione della sessione nella finestra di dialogo Sessioni attive

      Orchestrare il codice del notebook BigQuery Studio

      Puoi orchestrare il codice del notebook BigQuery Studio nei seguenti modi:

      Pianificare il codice del notebook dalla console Google Cloud

      Puoi programmare il codice del notebook nei seguenti modi:

      • Pianifica il notebook.
      • Se l'esecuzione del codice del notebook fa parte di un flusso di lavoro, pianifica il notebook nell'ambito di una pipeline.

      Esegui il codice del blocco note come carico di lavoro batch Dataproc Serverless

      Completa i seguenti passaggi per eseguire il codice del blocco note BigQuery Studio come carico di lavoro batch Dataproc Serverless.

      1. Scarica il codice del blocco note in un file in un terminale locale o in Cloud Shell.

        1. Apri il notebook nel riquadro Explorer nella pagina BigQuery Studio nella console Google Cloud .

        2. Scarica il codice del blocco note selezionando Scarica dal menu File, poi scegli Download .py.

          Menu File > Scarica nella pagina Explorer.
      2. Genera requirements.txt.

        1. Installa pipreqs nella directory in cui hai salvato il file .py.
          pip install pipreqs
          
        2. Esegui pipreqs per generare requirements.txt.

          pipreqs filename.py
          

        3. Utilizza Google Cloud CLI per copiare il file requirements.txt locale in un bucket in Cloud Storage.

          gcloud storage cp requirements.txt gs://BUCKET/
          
      3. Aggiorna il codice della sessione Spark modificando il file .py scaricato.

        1. Rimuovi o commenta tutti i comandi dello script shell.

        2. Rimuovi il codice che configura la sessione Spark, quindi specifica i parametri di configurazione come parametri di invio del carico di lavoro batch. (vedi Inviare un carico di lavoro batch Spark).

          Esempio:

          • Rimuovi la seguente riga di configurazione della subnet di sessione dal codice:

            session.environment_config.execution_config.subnetwork_uri = "{subnet_name}"
            

          • Quando esegui il carico di lavoro batch, utilizza il flag --subnet per specificare la subnet.

            gcloud dataproc batches submit pyspark \
            --subnet=SUBNET_NAME
            
        3. Utilizza un semplice snippet di codice per la creazione della sessione.

          • Codice del blocco note scaricato di esempio prima della semplificazione.

            from google.cloud.dataproc_spark_connect import DataprocSparkSession
            from google.cloud.dataproc_v1 import Session
            

            session = Session() spark = DataprocSparkSession \     .builder \     .appName("CustomSparkSession")     .dataprocSessionConfig(session) \     .getOrCreate()

          • Codice del carico di lavoro batch dopo la semplificazione.

            from pyspark.sql import SparkSession
            

            spark = SparkSession \ .builder \ .getOrCreate()

      4. Esegui il carico di lavoro batch.

        1. Per istruzioni, consulta la sezione Inviare il carico di lavoro batch Spark.

          • Assicurati di includere il flag --deps-bucket per indicare il bucket Cloud Storage che contiene il file requirements.txt.

            Esempio:

          gcloud dataproc batches submit pyspark FILENAME.py \
              --region=REGION \
              --deps-bucket=BUCKET \
              --version=2.3 
          

          Note:

          • FILENAME: il nome del file di codice del notebook scaricato e modificato.
          • REGION: la regione di Compute Engine in cui si trova il cluster.
          • BUCKET Il nome del bucket Cloud Storage che contiene il file requirements.txt.
          • --version: è selezionata la versione 2.3 di Spark Runtime per eseguire il carico di lavoro batch.
      5. Esegui il commit del codice.

        1. Dopo aver testato il codice del carico di lavoro batch, puoi eseguire il commit del file .ipynb o .py nel repository utilizzando il client git, ad esempio GitHub, GitLab o Bitbucket, nell'ambito della pipeline CI/CD.
      6. Pianifica il tuo workload batch con Cloud Composer.

        1. Consulta la sezione Esegui carichi di lavoro Dataproc Serverless con Cloud Composer per istruzioni.

      Risolvere gli errori del notebook

      Se si verifica un errore in una cella contenente codice Spark, puoi risolvere il problema facendo clic sul link Visualizzazione dettagli sessione interattiva nell'output della cella (vedi gli esempi di tabelle Wordcount e Iceberg).

      Problemi noti e soluzioni

      Errore: un runtime del notebook creato con la versione Python 3.10 può causare un errore PYTHON_VERSION_MISMATCH quando tenta di connettersi alla sessione Spark.

      Soluzione: ricrea il runtime con Python versione 3.11.

      Passaggi successivi