Run PySpark code in BigQuery Studio notebooks

This document shows you how to run PySpark code in a BigQuery Python notebook.

Before you begin

If you haven't already done so, create a Google Cloud project and a Cloud Storage bucket.

  1. Set up your project

    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    3. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

  2. Create a Cloud Storage bucket in your project if you don't have one you can use.

  3. Set up your notebook

    1. Notebook credentials: By default, your notebook session uses your user credentials. If you want to specify service account credentials for your session, it must have the Dataproc Worker (roles/dataproc.worker role). For more information, see Dataproc Serverless service account.
    2. Notebook runtime: Your notebook uses a default Vertex runtime unless you select a different runtime. If you want to define your own runtime, create the runtime from the Runtimes page in the Google Cloud console.

Pricing

For pricing information, see BigQuery Notebook runtime pricing.

Open a BigQuery Studio Python notebook

  1. In the Google Cloud console, go to the BigQuery page.

    Go to BigQuery

  2. In the tab bar of the details pane, click the arrow next to the + sign, and then click Notebook.

Create a Spark session in a BigQuery Studio notebook

You can use a BigQuery Studio Python notebook to create a Spark Connect interactive session. Each BigQuery Studio notebook can have only one active Dataproc Serverless session associated with it.

You can create a Spark session in a BigQuery Studio Python notebook in the following ways:

  • Configure and create a single session in the notebook.
  • Configure a Spark session in a Dataproc Serverless for Spark interactive session template, then use the template to configure and create a session in the notebook. BigQuery provides a Query using Spark feature that helps you start coding the templated session as explained under the Templated Spark session tab.

Single session

To create a Spark session in a new notebook, do the following:

  1. In the tab bar of the editor pane, click the arrow drop down next to the + sign, and then click Notebook.

    Screenshot showing the BigQuery interface with the '+' button for creating a new notebook.
  2. Copy and run the following code in a notebook cell to configure and create a basic Spark session.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.functions as f
session = Session()

# Create the Spark session.
spark = (
   DataprocSparkSession.builder
     .appName("APP_NAME")
     .dataprocSessionConfig(session)
     .getOrCreate()
)

Replace the following:

  • APP_NAME: An optional name for your session.
  • Optional Session settings: You can add Dataproc API Session settings to customize your session. Here are some examples:
    • RuntimeConfig:
      Code help showing session.runtime.config options.
      • session.runtime_config.properties={spark.property.key1:VALUE_1,...,spark.property.keyN:VALUE_N}
      • session.runtime_config.container_image = path/to/container/image
    • EnvironmentConfig:
      Code help showing session-environment-config-execution-config options.
      • session.environment_config.execution_config.subnetwork_uri = "SUBNET_NAME"
      • session.environment_config.execution_config.ttl = {"seconds": VALUE}
      • session.environment_config.execution_config.service_account = SERVICE_ACCOUNT

Templated Spark session

You can enter and run the code in a notebook cell to create a Spark session based on an existing Dataproc Serverless session template. Any session configuration settings you provide in your notebook code will override any of the same settings that are set in the session template.

To get started quickly, use the Query using Spark template to pre-populate your notebook with Spark session template code:

  1. In the tab bar of the editor pane, click the arrow drop down next to the + sign, and then click Notebook.
    Screenshot showing the BigQuery interface with the '+' button for creating a new notebook.
  2. Under Start with a template, click Query using Spark, then click Use template to insert the code in your notebook.
    BigQuery UI selections to start with a template
  3. Specify the variables as explained in the Notes.
  4. You can delete any additional sample code cells inserted in the notebook.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.functions as f
session = Session()
# Configure the session with an existing session template.
session_template = "SESSION_TEMPLATE"
session.session_template = f"projects/{project}/locations/{location}/sessionTemplates/{session_template}"
# Create the Spark session.
spark = (
   DataprocSparkSession.builder
     .appName("APP_NAME")
     .dataprocSessionConfig(session)
     .getOrCreate()
)

Replace the following:

  • PROJECT: Your project ID, which is listed in the Project info section of the Google Cloud console dashboard.
  • LOCATION: The Compute Engine region where your notebook session will run. If not supplied, the default location is the region of the VM that creates the notebook.
  • SESSION_TEMPLATE: The name of an existing Dataproc Serverless interactive session template. Session configuration settings are obtained from the template. The template must also specify the following settings:

    • Runtime version 2.3+
    • Notebook type: Spark Connect

      Example:

      Screenshot showing the Spark Connect required settings.
  • APP_NAME: An optional name for your session.

Write and run PySpark code in your BigQuery Studio notebook

After you create a Spark session in your notebook, use the session to run Spark notebook code in the notebook.

Spark Connect PySpark API support: Your Spark Connect notebook session supports most PySpark APIs, including DataFrame, Functions, and Column, but does not support SparkContext and RDD and other PySpark APIs. For more information, see What is supported in Spark 3.5.

Dataproc specific APIs: Dataproc simplifies adding PyPI packages dynamically to your Spark session by extending the addArtifacts method. You can specify the list in version-scheme format, (similar to pip install). This instructs the Spark Connect server to install packages and their dependencies on all cluster nodes, making them available to workers for your UDFs.

Example that installs specified textdistance version and latest compatible random2 libraries on the cluster to allow UDFs using textdistance and random2 to run on worker nodes.

spark.addArtifacts("textdistance==4.6.1", "random2", pypi=True)

Notebook code help: The BigQuery Studio notebook provides code help when you hold the pointer over a class or method name, and provides code completion help as you input code.

In the following example, entering DataprocSparkSession. and holding the pointer over this class name displays code completion and documentation help.

Code documentation and code completion tip examples.

BigQuery Studio notebook PySpark examples

This section provides BigQuery Studio Python notebook examples with PySpark code to perform the following tasks:

  • Run a wordcount against a public Shakespeare dataset.
  • Create an Iceberg table with metadata saved in BigLake Metastore.

Wordcount

The following Pyspark example creates a Spark session, then counts word occurrences in a public bigquery-public-data.samples.shakespeare dataset.

# Basic wordcount example
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.functions as f
session = Session()

# Create the Spark session.
spark = (
   DataprocSparkSession.builder
     .appName("APP_NAME")
     .dataprocSessionConfig(session)
     .getOrCreate()
)
# Run a wordcount on the public Shakespeare dataset.
df = spark.read.format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load()
words_df = df.select(f.explode(f.split(f.col("word"), " ")).alias("word"))
word_counts_df = words_df.filter(f.col("word") != "").groupBy("word").agg(f.count("*").alias("count")).orderBy("word")
word_counts_df.show()

Replace the following:

  • APP_NAME: An optional name for your session.

Output:

The cell output lists a sample of the wordcount output. To see session details in the Google Cloud console, click the Interactive Session Detail View link. To monitor your Spark session, click View Spark UI on the session details page.

View Spark UI button in session details page in console
Interactive Session Detail View: LINK
+------------+-----+
|        word|count|
+------------+-----+
|           '|   42|
|       ''All|    1|
|     ''Among|    1|
|       ''And|    1|
|       ''But|    1|
|    ''Gamut'|    1|
|       ''How|    1|
|        ''Lo|    1|
|      ''Look|    1|
|        ''My|    1|
|       ''Now|    1|
|         ''O|    1|
|      ''Od's|    1|
|       ''The|    1|
|       ''Tis|    4|
|      ''When|    1|
|       ''tis|    1|
|      ''twas|    1|
|          'A|   10|
|'ARTEMIDORUS|    1|
+------------+-----+
only showing top 20 rows

Iceberg table

Run PySpark code to create an Iceberg table with BigLake Metastore metadata

The following example code creates a sample_iceberg_table with table metadata stored in BigLake Metastore, and then queries the table.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.functions as f
# Create the Dataproc Serverless session.
session = Session()
# Set the session configuration for BigQuery Metastore with the Iceberg environment.
project = "PROJECT"
region = "REGION"
subnet_name = "SUBNET_NAME"
location = "LOCATION"
session.environment_config.execution_config.subnetwork_uri = f"{subnet_name}"
session.environment_config.peripherals_config.bigquery_metastore_config.project_id = f"{project_id}"
session.environment_config.peripherals_config.bigquery_metastore_config.location = f"{location}"
warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY"
catalog = "CATALOG_NAME"
namespace = "NAMESPACE"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}"] = "org.apache.iceberg.spark.SparkCatalog"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_project"] = f"{project_id}"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_location"] = f"{location}"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.warehouse"] = f"{warehouse_dir}"
# Create the Spark Connect session.
spark = (
   DataprocSparkSession.builder
     .appName("APP_NAME")
     .dataprocSessionConfig(session)
     .getOrCreate()
)
# Create the namespace in BigQuery.
spark.sql(f"USE `{catalog}`;")
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;")
spark.sql(f"USE `{namespace}`;")
# Create the Iceberg table.
spark.sql("DROP TABLE IF EXISTS `sample_iceberg_table`");
spark.sql("CREATE TABLE sample_iceberg_table (id int, data string) USING ICEBERG;")
spark.sql("DESCRIBE sample_iceberg_table;")
# Insert table data and query the table.
spark.sql("INSERT INTO sample_iceberg_table VALUES (1, \"first row\");")
# Alter table, then query and display table data and schema.
spark.sql("ALTER TABLE sample_iceberg_table ADD COLUMNS (newDoubleCol double);")
spark.sql("DESCRIBE sample_iceberg_table;")
df = spark.sql("SELECT * FROM sample_iceberg_table")
df.show()
df.printSchema()

Notes:

  • PROJECT: Your project ID, which is listed in the Project info section of the Google Cloud console dashboard.
  • REGION and SUBNET_NAME: Specify the Compute Engine region and the name of a subnet in the session region. Dataproc Serverless enables Private Google Access (PGA) on the specified subnet.
  • LOCATION: The default BigQuery_metastore_config.location and spark.sql.catalog.{catalog}.gcp_location is US, but you can choose any supported BigQuery location.
  • BUCKET and WAREHOUSE_DIRECTORY: The Cloud Storage bucket and folder used for Iceberg warehouse directory.
  • CATALOG_NAME and NAMESPACE: The Iceberg catalog name and namespace combine to identify the Iceberg table (catalog.namespace.table_name).
  • APP_NAME: An optional name for your session.

The cell output lists the sample_iceberg_table with the added column, and displays a link to the Interactive Session Details page in the Google Cloud console. You can click View Spark UI on the session details page to monitor your Spark session.

Interactive Session Detail View: LINK
+---+---------+------------+
| id|     data|newDoubleCol|
+---+---------+------------+
|  1|first row|        NULL|
+---+---------+------------+

root
 |-- id: integer (nullable = true)
 |-- data: string (nullable = true)
 |-- newDoubleCol: double (nullable = true)

View table details in BigQuery

Perform the following steps to check Iceberg table details in BigQuery:

  1. In the Google Cloud console, go to the BigQuery page.

    Go to BigQuery

  2. In the project resources pane, click your project, then click the your namespace to list the sample_iceberg_table table. Click the Details table to view the Open Catalog Table Configuration information.

    The input and output formats are the standard Hadoop InputFormat and OutputFormat class formats that Iceberg uses.

    Iceberg table metadata listed in BigQuery UI

Other examples

Create a Spark DataFrame (sdf) from a Pandas DataFrame (df).

sdf = spark.createDataFrame(df)
sdf.show()

Run aggregations on Spark DataFrames.

from pyspark.sql import functions as F

sdf.groupby("segment").agg(
   F.mean("total_spend_per_user").alias("avg_order_value"),
   F.approx_count_distinct("user_id").alias("unique_customers")
).show()

Read from BigQuery using the Spark-BigQuery connector.

spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","my-bigquery-dataset")

sdf = spark.read.format('bigquery') \
 .load(query)

Write Spark code with Gemini Code Assist

You can ask Gemini Code Assist to generate PySpark code in your notebook. Gemini Code Assist fetches and uses relevant BigQuery and Dataproc Metastore tables and their schemas to generate a code response.

To generate Gemini Code Assist code in your notebook, do the following:

  1. Insert a new code cell by clicking + Code in the toolbar. The new code cell displays Start coding or generate with AI. Click generate.

  2. In the Generate editor, enter a natural language prompt, and then click enter. Make sure to include the keyword spark or pyspark in your prompt.

    Sample prompt:

    create a spark dataframe from order_items and filter to orders created in 2024
    

    Sample output:

    spark.read.format("bigquery").option("table", "sqlgen-testing.pysparkeval_ecommerce.order_items").load().filter("year(created_at) = 2024").createOrReplaceTempView("order_items")
    df = spark.sql("SELECT * FROM order_items")
    

Tips for Gemini Code Assist code generation

  • To let Gemini Code Assist fetch relevant tables and schemas, turn on Data Catalog sync for Dataproc Metastore instances.

  • Make sure your user account has access to Data Catalog the query tables. To do this, assign the DataCatalog.Viewer role.

End the Spark session

You can take any of the following actions to stop your Spark Connect session in your BigQuery Studio notebook:

  • Run spark.stop() in a notebook cell.
  • Terminate the runtime in the notebook:
    1. Click the runtime selector, then click Manage sessions.
      Manage sessions selection
    2. In the Active sessions dialog, click the terminate icon, then click Terminate.
      Terminate session selection in Active sessions dialog

Orchestrate BigQuery Studio notebook code

You can orchestrate BigQuery Studio notebook code in the following ways:

Schedule notebook code from the Google Cloud console

You can schedule notebook code in the following ways:

Run notebook code as a Dataproc Serverless batch workload

Complete the following steps to run BigQuery Studio notebook code as a Dataproc Serverless batch workload.

  1. Download notebook code into a file in a local terminal or in Cloud Shell.

    1. Open the notebook in the Explorer panel on the BigQuery Studio page in the Google Cloud console.

    2. Download notebook code by selecting Download from the File menu, then choose Download .py.

      File > Download menu on the Explorer page.
  2. Generate requirements.txt.

    1. Install pipreqs in the directory where you saved your .py file.
      pip install pipreqs
      
    2. Run pipreqs to generate requirements.txt.

      pipreqs filename.py
      

    3. Use the gsutil tool to copy the local requirements.txt file to a bucket in Cloud Storage.

      gsutil cp requirements.txt gs://BUCKET/
      
  3. Update Spark session code by editing the downloaded .py file.

    1. Remove or comment out any shell script commands.

    2. Remove code that configures the Spark session, then then specify config parameters as batch workload submit parameters. (see Submit a Spark batch workload).

      Example:

      • Remove the following session subnet config line from the code:

        session.environment_config.execution_config.subnetwork_uri = "{subnet_name}"
        

      • When you run your batch workload, use the --subnet flag to specify the subnet.

        gcloud dataproc batches submit pyspark \
        --subnet=SUBNET_NAME
        
    3. Use a simple session creation code snippet.

      • Sample downloaded notebook code before simplification.

        from google.cloud.dataproc_spark_connect import DataprocSparkSession
        from google.cloud.dataproc_v1 import Session
        

        session = Session() spark = DataprocSparkSession \     .builder \     .appName("CustomSparkSession")     .dataprocSessionConfig(session) \     .getOrCreate()

      • Batch workload code after simplification.

        from pyspark.sql import SparkSession
        

        spark = SparkSession \ .builder \ .getOrCreate()

  4. Run the batch workload.

    1. See Submit the Spark batch workload for instructions.

      • Make sure to include the --deps-bucket flag to point to the Cloud Storage bucket that contains Your requirements.txt file.

        Example:

      gcloud dataproc batches submit pyspark FILENAME.py \
          --region=REGION \
          --deps-bucket=BUCKET \
          --version=2.3 
      

      Notes:

      • FILENAME: The name of your downloaded and edited notebook code file.
      • REGION: The Compute Engine region where your cluster is located.
      • BUCKET The name of the Cloud Storage bucket that contains your requirements.txt file.
      • --version: spark runtime version 2.3 is selected to run the batch workload.
  5. Commit your code.

    1. After testing your batch workload code, you can commit the .ipynb or .py file to your repository using your git client, such as GitHub, GitLab, or Bitbucket, as part of your CI/CD pipeline.
  6. Schedule your batch workload with Cloud Composer.

    1. See Run Dataproc Serverless workloads with Cloud Composer for instructions.

Troubleshoot notebook errors

If a failure occurs in a cell containing Spark code, you can troubleshoot the error by clicking the Interactive Session Detail View link in the cell output (see the Wordcount and Iceberg table examples).

Known issues and solutions

Error: A Notebook runtime created with Python version 3.10 can cause a PYTHON_VERSION_MISMATCH error when it attempts to connect to the Spark session.

Solution: Recreate the runtime with Python version 3.11.

What's next