Exécuter du code PySpark dans les notebooks BigQuery Studio

Ce document vous explique comment exécuter du code PySpark dans un notebook Python BigQuery.

Avant de commencer

Si ce n'est pas déjà fait, créez un projet Google Cloud et un bucket Cloud Storage.

  1. Configurer votre projet

    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    3. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    6. Créez un bucket Cloud Storage dans votre projet si vous n'en avez pas déjà un.

    7. Configurer votre notebook

      1. Identifiants du notebook : par défaut, votre session de notebook utilise vos identifiants utilisateur. Si vous souhaitez spécifier des identifiants de compte de service pour votre session, celui-ci doit disposer du rôle Nœud de calcul Dataproc (roles/dataproc.worker). Pour en savoir plus, consultez Compte de service Dataproc sans serveur.
      2. Environnement d'exécution du notebook : votre notebook utilise un environnement d'exécution Vertex par défaut, sauf si vous en sélectionnez un autre. Si vous souhaitez définir votre propre environnement d'exécution, créez-le sur la page Environnements d'exécution de la console Google Cloud .
    8. Tarifs

      Pour en savoir plus sur les tarifs, consultez la page Tarifs du runtime des notebooks BigQuery.

      Ouvrir un notebook Python BigQuery Studio

      1. Dans la console Google Cloud , accédez à la page BigQuery.

        Accéder à BigQuery

      2. Dans la barre d'onglets du volet "Détails", cliquez sur la flèche  à côté du signe +, puis sur Notebook.

      Créer une session Spark dans un notebook BigQuery Studio

      Vous pouvez utiliser un notebook Python BigQuery Studio pour créer une session interactive Spark Connect. Chaque notebook BigQuery Studio ne peut être associé qu'à une seule session Dataproc sans serveur active.

      Vous pouvez créer une session Spark dans un notebook Python BigQuery Studio de différentes manières :

      • Configurez et créez une session unique dans le notebook.
      • Configurez une session Spark dans un modèle de session interactive Dataproc sans serveur pour Spark, puis utilisez le modèle pour configurer et créer une session dans le notebook. BigQuery fournit une fonctionnalité Query using Spark qui vous aide à commencer à coder la session basée sur un modèle, comme expliqué dans l'onglet Session Spark basée sur un modèle.

      Session unique

      Pour créer une session Spark dans un notebook, procédez comme suit :

      1. Dans la barre d'onglets du volet de l'éditeur, cliquez sur la flèche vers le bas  à côté du signe +, puis sur Notebook.

        Capture d'écran de l'interface BigQuery avec le bouton "+" permettant de créer un notebook.
      2. Copiez et exécutez le code suivant dans une cellule de notebook pour configurer et créer une session Spark de base.

      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      
      import pyspark.sql.connect.functions as f
      
      session = Session()
      
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      

      Remplacez les éléments suivants :

      • APP_NAME : nom facultatif de votre session.
      • Paramètres de session facultatifs : vous pouvez ajouter des paramètres Session de l'API Dataproc pour personnaliser votre session. Voici quelques exemples :
        • RuntimeConfig :
          Aide sur le code affichant les options session.runtime.config.
          • session.runtime_config.properties={spark.property.key1:VALUE_1,...,spark.property.keyN:VALUE_N}
          • session.runtime_config.container_image = path/to/container/image
        • EnvironmentConfig :
          Aide au code affichant les options de configuration de l'environnement de session et d'exécution.
          • session.environment_config.execution_config.subnetwork_uri = "SUBNET_NAME"
          • session.environment_config.execution_config.ttl = {"seconds": VALUE}
          • session.environment_config.execution_config.service_account = SERVICE_ACCOUNT

      Session Spark basée sur un modèle

      Vous pouvez saisir et exécuter le code dans une cellule de notebook pour créer une session Spark basée sur un modèle de session Dataproc sans serveur existant. Tous les paramètres de configuration session que vous fournissez dans le code de votre notebook remplaceront les paramètres identiques définis dans le modèle de session.

      Pour commencer rapidement, utilisez le modèle Query using Spark pour préremplir votre notebook avec le code de modèle de session Spark :

      1. Dans la barre d'onglets du volet de l'éditeur, cliquez sur la flèche vers le bas  à côté du signe +, puis sur Notebook.
        Capture d'écran de l'interface BigQuery avec le bouton "+" permettant de créer un notebook.
      2. Sous Commencer avec un modèle, cliquez sur Interroger à l'aide de Spark, puis sur Utiliser le modèle pour insérer le code dans votre notebook.
        Sélections de l'UI BigQuery pour commencer avec un modèle
      3. Spécifiez les variables comme expliqué dans les Remarques.
      4. Vous pouvez supprimer toutes les cellules de code d'exemple supplémentaires insérées dans le notebook.
      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      session = Session()
      # Configure the session with an existing session template.
      session_template = "SESSION_TEMPLATE"
      session.session_template = f"projects/{project}/locations/{location}/sessionTemplates/{session_template}"
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      

      Remplacez les éléments suivants :

      • PROJECT : ID de votre projet, qui figure dans la section Infos sur le projet du tableau de bord de la consoleGoogle Cloud .
      • LOCATION : région Compute Engine dans laquelle votre session de notebook sera exécutée. Si vous ne fournissez aucune valeur, l'emplacement par défaut est la région de la VM qui crée le notebook.
      • SESSION_TEMPLATE : nom d'un modèle de session interactive Dataproc sans serveur existant. Les paramètres de configuration de la session sont obtenus à partir du modèle. Le modèle doit également spécifier les paramètres suivants :

        • Version du runtime 2.3+
        • Type de notebook : Spark Connect

          Exemple :

          Capture d'écran montrant les paramètres requis pour Spark Connect.
      • APP_NAME : nom facultatif de votre session.

      Écrire et exécuter du code PySpark dans votre notebook BigQuery Studio

      Une fois que vous avez créé une session Spark dans votre notebook, utilisez-la pour exécuter le code du notebook Spark.

      Compatibilité de l'API PySpark Spark Connect : votre session de notebook Spark Connect est compatible avec la plupart des API PySpark, y compris DataFrame, Functions et Column, mais pas avec SparkContext, RDD ni d'autres API PySpark. Pour en savoir plus, consultez Fonctionnalités compatibles avec Spark 3.5.

      API spécifiques à Dataproc : Dataproc simplifie l'ajout dynamique de packages PyPI à votre session Spark en étendant la méthode addArtifacts. Vous pouvez spécifier la liste au format version-scheme (semblable à pip install). Cela indique au serveur Spark Connect d'installer les packages et leurs dépendances sur tous les nœuds du cluster, ce qui les rend disponibles pour les nœuds de calcul de vos UDF.

      Exemple qui installe la version textdistance spécifiée et les dernières bibliothèques random2 compatibles sur le cluster pour permettre aux UDF utilisant textdistance et random2 de s'exécuter sur les nœuds de calcul.

      spark.addArtifacts("textdistance==4.6.1", "random2", pypi=True)
      

      Aide pour le code de notebook : le notebook BigQuery Studio fournit une aide pour le code lorsque vous pointez sur le nom d'une classe ou d'une méthode, et une aide pour la saisie semi-automatique du code lorsque vous saisissez du code.

      Dans l'exemple suivant, saisissez DataprocSparkSession. et en pointant sur le nom de cette classe, vous pouvez afficher l'aide à la complétion de code et la documentation.

      Exemples de conseils sur la documentation et la complétion de code.

      Exemples PySpark de notebooks BigQuery Studio

      Cette section fournit des exemples de notebooks Python BigQuery Studio avec du code PySpark pour effectuer les tâches suivantes :

      • Exécutez un décompte de mots sur un ensemble de données Shakespeare public.
      • Créez une table Iceberg avec des métadonnées enregistrées dans BigLake Metastore.

      Nombre de mots

      L'exemple Pyspark suivant crée une session Spark, puis compte les occurrences de mots dans un ensemble de données public bigquery-public-data.samples.shakespeare.

      # Basic wordcount example
      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      session = Session()
      
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      # Run a wordcount on the public Shakespeare dataset.
      df = spark.read.format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load()
      words_df = df.select(f.explode(f.split(f.col("word"), " ")).alias("word"))
      word_counts_df = words_df.filter(f.col("word") != "").groupBy("word").agg(f.count("*").alias("count")).orderBy("word")
      word_counts_df.show()
      

      Remplacez les éléments suivants :

      • APP_NAME : nom facultatif de votre session.

      Résultat :

      La sortie de la cellule liste un échantillon de la sortie du décompte de mots. Pour afficher les détails d'une session dans la console Google Cloud , cliquez sur le lien Vue détaillée de la session interactive. Pour surveiller votre session Spark, cliquez sur Afficher l'UI Spark sur la page des détails de la session.

      Bouton "Afficher l'UI Spark" sur la page des détails de la session dans la console
      Interactive Session Detail View: LINK
      +------------+-----+
      |        word|count|
      +------------+-----+
      |           '|   42|
      |       ''All|    1|
      |     ''Among|    1|
      |       ''And|    1|
      |       ''But|    1|
      |    ''Gamut'|    1|
      |       ''How|    1|
      |        ''Lo|    1|
      |      ''Look|    1|
      |        ''My|    1|
      |       ''Now|    1|
      |         ''O|    1|
      |      ''Od's|    1|
      |       ''The|    1|
      |       ''Tis|    4|
      |      ''When|    1|
      |       ''tis|    1|
      |      ''twas|    1|
      |          'A|   10|
      |'ARTEMIDORUS|    1|
      +------------+-----+
      only showing top 20 rows
      

      Table Iceberg

      Exécuter du code PySpark pour créer une table Iceberg avec des métadonnées BigLake Metastore

      L'exemple de code suivant crée une sample_iceberg_table avec des métadonnées de table stockées dans le metastore BigLake, puis interroge la table.

      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      # Create the Dataproc Serverless session.
      session = Session()
      # Set the session configuration for BigLake Metastore with the Iceberg environment.
      project = "PROJECT"
      region = "REGION"
      subnet_name = "SUBNET_NAME"
      location = "LOCATION"
      session.environment_config.execution_config.subnetwork_uri = f"{subnet_name}"
      warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY"
      catalog = "CATALOG_NAME"
      namespace = "NAMESPACE"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}"] = "org.apache.iceberg.spark.SparkCatalog"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_project"] = f"{project_id}"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_location"] = f"{location}"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.warehouse"] = f"{warehouse_dir}"
      # Create the Spark Connect session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      # Create the namespace in BigQuery.
      spark.sql(f"USE `{catalog}`;")
      spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;")
      spark.sql(f"USE `{namespace}`;")
      # Create the Iceberg table.
      spark.sql("DROP TABLE IF EXISTS `sample_iceberg_table`");
      spark.sql("CREATE TABLE sample_iceberg_table (id int, data string) USING ICEBERG;")
      spark.sql("DESCRIBE sample_iceberg_table;")
      # Insert table data and query the table.
      spark.sql("INSERT INTO sample_iceberg_table VALUES (1, \"first row\");")
      # Alter table, then query and display table data and schema.
      spark.sql("ALTER TABLE sample_iceberg_table ADD COLUMNS (newDoubleCol double);")
      spark.sql("DESCRIBE sample_iceberg_table;")
      df = spark.sql("SELECT * FROM sample_iceberg_table")
      df.show()
      df.printSchema()
      

      Remarques :

      • PROJECT : ID de votre projet, qui figure dans la section Infos sur le projet du tableau de bord de la consoleGoogle Cloud .
      • REGION et SUBNET_NAME : spécifiez la région Compute Engine et le nom d'un sous-réseau dans la région de la session. Dataproc sans serveur active l'accès privé à Google sur le sous-réseau spécifié.
      • LOCATION : les valeurs par défaut de BigQuery_metastore_config.location et spark.sql.catalog.{catalog}.gcp_location sont US, mais vous pouvez choisir n'importe quel emplacement BigQuery compatible.
      • BUCKET et WAREHOUSE_DIRECTORY : bucket et dossier Cloud Storage utilisés pour le répertoire de l'entrepôt Iceberg.
      • CATALOG_NAME et NAMESPACE : le nom et l'espace de noms du catalogue Iceberg sont combinés pour identifier la table Iceberg (catalog.namespace.table_name).
      • APP_NAME : nom facultatif de votre session.

      La sortie de la cellule liste les sample_iceberg_table avec la colonne ajoutée et affiche un lien vers la page Détails de la session interactive dans la console Google Cloud . Vous pouvez cliquer sur Afficher l'UI Spark sur la page d'informations sur la session pour surveiller votre session Spark.

      Interactive Session Detail View: LINK
      +---+---------+------------+
      | id|     data|newDoubleCol|
      +---+---------+------------+
      |  1|first row|        NULL|
      +---+---------+------------+
      
      root
       |-- id: integer (nullable = true)
       |-- data: string (nullable = true)
       |-- newDoubleCol: double (nullable = true)
      

      Afficher les détails d'une table dans BigQuery

      Pour vérifier les détails d'une table Iceberg dans BigQuery, procédez comme suit :

      1. Dans la console Google Cloud , accédez à la page BigQuery.

        Accéder à BigQuery

      2. Dans le volet des ressources du projet, cliquez sur votre projet, puis sur votre espace de noms pour lister la table sample_iceberg_table. Cliquez sur le tableau Détails pour afficher les informations de Configuration de la table du catalogue ouvert.

        Les formats d'entrée et de sortie sont les formats de classe Hadoop standards InputFormat et OutputFormat utilisés par Iceberg.

        Métadonnées des tables Iceberg listées dans l'UI BigQuery

      Autres exemples

      Créez un Spark DataFrame (sdf) à partir d'un DataFrame Pandas (df).

      sdf = spark.createDataFrame(df)
      sdf.show()
      

      Exécutez des agrégations sur le DataFrames Spark.

      from pyspark.sql import functions as F
      
      sdf.groupby("segment").agg(
         F.mean("total_spend_per_user").alias("avg_order_value"),
         F.approx_count_distinct("user_id").alias("unique_customers")
      ).show()
      

      Lisez des données depuis BigQuery à l'aide du connecteur Spark-BigQuery.

      spark.conf.set("viewsEnabled","true")
      spark.conf.set("materializationDataset","my-bigquery-dataset")
      
      sdf = spark.read.format('bigquery') \
       .load(query)
      

      Écrire du code Spark avec Gemini Code Assist

      Vous pouvez demander à Gemini Code Assist de générer du code PySpark dans votre notebook. Gemini Code Assist récupère et utilise les tables BigQuery et Dataproc Metastore pertinentes, ainsi que leurs schémas, pour générer une réponse de code.

      Pour générer du code Gemini Code Assist dans votre notebook, procédez comme suit :

      1. Insérez une nouvelle cellule de code en cliquant sur + Code dans la barre d'outils. La nouvelle cellule de code affiche Start coding or generate with AI. Cliquez sur Générer.

      2. Dans l'éditeur "Générer", saisissez une requête en langage naturel, puis cliquez sur enter. Veillez à inclure le mot clé spark ou pyspark dans votre requête.

        Exemple de requête :

        create a spark dataframe from order_items and filter to orders created in 2024
        

        Exemple de résultat :

        spark.read.format("bigquery").option("table", "sqlgen-testing.pysparkeval_ecommerce.order_items").load().filter("year(created_at) = 2024").createOrReplaceTempView("order_items")
        df = spark.sql("SELECT * FROM order_items")
        

      Conseils pour la génération de code Gemini Code Assist

      • Pour permettre à Gemini Code Assist de récupérer les tables et les schémas pertinents, activez la synchronisation Data Catalog pour les instances Dataproc Metastore.

      • Assurez-vous que votre compte utilisateur a accès aux tables de requêtes de Data Catalog. Pour ce faire, attribuez le rôle DataCatalog.Viewer.

      Mettre fin à la session Spark

      Vous pouvez effectuer l'une des actions suivantes pour arrêter votre session Spark Connect dans votre notebook BigQuery Studio :

      • Exécutez spark.stop() dans une cellule de notebook.
      • Arrêtez l'environnement d'exécution dans le notebook :
        1. Cliquez sur le sélecteur d'exécution, puis sur Gérer les sessions.
          Gérer la sélection des sessions
        2. Dans la boîte de dialogue Sessions actives, cliquez sur l'icône de fin de session, puis sur Arrêter.
          Sélection de la session à arrêter dans la boîte de dialogue "Sessions actives"

      Orchestrer le code de notebook BigQuery Studio

      Vous pouvez orchestrer le code de notebook BigQuery Studio de différentes manières :

      Programmer du code de notebook depuis la console Google Cloud

      Vous pouvez planifier du code de notebook de différentes manières :

      Exécuter le code d'un notebook en tant que charge de travail par lot Dataproc Serverless

      Procédez comme suit pour exécuter le code du notebook BigQuery Studio en tant que charge de travail par lot Dataproc sans serveur.

      1. Téléchargez le code du notebook dans un fichier dans un terminal local ou dans Cloud Shell.

        1. Ouvrez le notebook dans le panneau Explorateur de la page BigQuery Studio dans la console Google Cloud .

        2. Téléchargez le code du notebook en sélectionnant Télécharger dans le menu Fichier, puis choisissez Download .py.

          Menu Fichier > Télécharger sur la page de l'explorateur.
      2. Générer des requirements.txt

        1. Installez pipreqs dans le répertoire où vous avez enregistré votre fichier .py.
          pip install pipreqs
          
        2. Exécutez pipreqs pour générer requirements.txt.

          pipreqs filename.py
          

        3. Utilisez Google Cloud CLI pour copier le fichier requirements.txt local dans un bucket Cloud Storage.

          gcloud storage cp requirements.txt gs://BUCKET/
          
      3. Modifiez le code de la session Spark en modifiant le fichier .py téléchargé.

        1. Supprimez ou mettez en commentaire les commandes de script shell.

        2. Supprimez le code qui configure la session Spark, puis spécifiez les paramètres de configuration en tant que paramètres d'envoi de la charge de travail par lot. (consultez Envoyer une charge de travail par lot Spark).

          Exemple :

          • Supprimez la ligne de configuration du sous-réseau de session suivante du code :

            session.environment_config.execution_config.subnetwork_uri = "{subnet_name}"
            

          • Lorsque vous exécutez votre charge de travail par lot, utilisez l'indicateur --subnet pour spécifier le sous-réseau.

            gcloud dataproc batches submit pyspark \
            --subnet=SUBNET_NAME
            
        3. Utilisez un extrait de code simple pour créer une session.

          • Exemple de code de notebook téléchargé avant simplification.

            from google.cloud.dataproc_spark_connect import DataprocSparkSession
            from google.cloud.dataproc_v1 import Session
            

            session = Session() spark = DataprocSparkSession \     .builder \     .appName("CustomSparkSession")     .dataprocSessionConfig(session) \     .getOrCreate()

          • Code de charge de travail par lot après simplification.

            from pyspark.sql import SparkSession
            

            spark = SparkSession \ .builder \ .getOrCreate()

      4. Exécutez la charge de travail par lot.

        1. Pour obtenir des instructions, consultez Envoyer la charge de travail par lot Spark.

          • Veillez à inclure l'option --deps-bucket pour pointer vers le bucket Cloud Storage contenant votre fichier requirements.txt.

            Exemple :

          gcloud dataproc batches submit pyspark FILENAME.py \
              --region=REGION \
              --deps-bucket=BUCKET \
              --version=2.3 
          

          Remarques :

          • FILENAME : nom du fichier de code du notebook que vous avez téléchargé et modifié.
          • REGION : région Compute Engine dans laquelle se trouve votre cluster.
          • BUCKET : nom du bucket Cloud Storage contenant votre fichier requirements.txt.
          • --version : la version d'exécution Spark 2.3 est sélectionnée pour exécuter la charge de travail par lot.
      5. Validez votre code.

        1. Après avoir testé le code de votre charge de travail par lot, vous pouvez valider le fichier .ipynb ou .py dans votre dépôt à l'aide de votre client git, tel que GitHub, GitLab ou Bitbucket, dans le cadre de votre pipeline CI/CD.
      6. Planifiez votre charge de travail par lot avec Cloud Composer.

        1. Pour obtenir des instructions, consultez Exécuter des charges de travail Dataproc sans serveur avec Cloud Composer.

      Résoudre les problèmes liés aux notebooks

      Si un échec se produit dans une cellule contenant du code Spark, vous pouvez résoudre le problème en cliquant sur le lien Vue détaillée de la session interactive dans la sortie de la cellule (voir les exemples de comptage de mots et de tableau Iceberg).

      Problèmes connus et solutions

      Erreur : Un environnement d'exécution de notebook créé avec la version Python 3.10 peut entraîner une erreur PYTHON_VERSION_MISMATCH lorsqu'il tente de se connecter à la session Spark.

      Solution : Recréez l'environnement d'exécution avec la version Python 3.11.

      Étapes suivantes