在 BigQuery Studio 筆記本中執行 PySpark 程式碼
本文件說明如何在 BigQuery Python 筆記本中執行 PySpark 程式碼。
事前準備
如果您尚未建立,請先建立 Google Cloud 專案和 Cloud Storage 值區。
設定專案
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
如果沒有可用的 Cloud Storage 值區,請在專案中建立 Cloud Storage 值區。
設定筆記本
- 筆記本憑證:根據預設,筆記本工作階段會使用您的使用者憑證。如果您想為工作階段指定服務帳戶憑證,該帳戶必須具備 Dataproc 工作者 (
roles/dataproc.worker
角色)。詳情請參閱 Dataproc 無伺服器服務帳戶。 - 筆記本執行階段:除非您選取其他執行階段,否則筆記本會使用預設的 Vertex 執行階段。如果您想自行定義執行階段,請在 Google Cloud 控制台的「Runtimes」頁面中建立執行階段。
- 筆記本憑證:根據預設,筆記本工作階段會使用您的使用者憑證。如果您想為工作階段指定服務帳戶憑證,該帳戶必須具備 Dataproc 工作者 (
定價
如需價格資訊,請參閱 BigQuery 的Notebook 執行時間定價。
開啟 BigQuery Studio Python 筆記本
前往 Google Cloud 控制台的「BigQuery」頁面。
在詳細資料窗格的分頁標籤列中,按一下「+」符號旁的
箭頭,然後點選「Notebook」。
在 BigQuery Studio 筆記本中建立 Spark 工作階段
您可以使用 BigQuery Studio Python 筆記本建立 Spark Connect 互動式工作階段。每個 BigQuery Studio 筆記本只能與一個有效的 Dataproc Serverless 工作階段建立關聯。
您可以透過下列方式,在 BigQuery Studio Python 筆記本中建立 Spark 工作階段:
- 在筆記本中設定及建立單一工作階段。
- 在 Dataproc Serverless for Spark 互動工作階段範本中設定 Spark 工作階段,然後使用範本在筆記本中設定及建立工作階段。BigQuery 提供
Query using Spark
功能,可協助您開始編寫範本工作階段的程式碼,如「範本 Spark 工作階段」分頁所述。
單一工作階段
如要在新筆記本中建立 Spark 工作階段,請按照下列步驟操作:
在編輯器窗格中的分頁列中,按一下「+」符號旁的
箭頭下拉式選單,然後點選「Notebook」。在筆記本儲存格中複製並執行下列程式碼,即可設定並建立基本的 Spark 工作階段。
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.connect.functions as f
session = Session()
# Create the Spark session.
spark = (
DataprocSparkSession.builder
.appName("APP_NAME")
.dataprocSessionConfig(session)
.getOrCreate()
)
更改下列內容:
- APP_NAME:可選的會話名稱。
- 選用的工作階段設定:您可以新增 Dataproc API
Session
設定,自訂工作階段。以下舉例說明:RuntimeConfig
:session.runtime_config.properties={spark.property.key1:VALUE_1,...,spark.property.keyN:VALUE_N}
session.runtime_config.container_image = path/to/container/image
EnvironmentConfig
:- session.environment_config.execution_config.subnetwork_uri = "SUBNET_NAME"
session.environment_config.execution_config.ttl = {"seconds": VALUE}
session.environment_config.execution_config.service_account = SERVICE_ACCOUNT
範本 Spark 工作階段
您可以在筆記本儲存格中輸入並執行程式碼,根據現有的 Dataproc Serverless 工作階段範本建立 Spark 工作階段。您在筆記本程式碼中提供的任何 session
設定,都會覆寫在工作階段範本中設定的任何相同設定。
如要快速上手,請使用 Query using Spark
範本,在 Notebook 中預先填入 Spark 工作階段範本程式碼:
- 在編輯器窗格中的分頁列中,按一下「+」符號旁的
- 在「從範本開始」下方,點選「使用 Spark 查詢」,然後點選「使用範本」,即可在筆記本中插入程式碼。
- 請按照「附註」一節的說明指定變數。
- 您可以刪除在筆記本中插入的任何其他範例程式碼儲存格。
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.functions as f
session = Session()
# Configure the session with an existing session template.
session_template = "SESSION_TEMPLATE"
session.session_template = f"projects/{project}/locations/{location}/sessionTemplates/{session_template}"
# Create the Spark session.
spark = (
DataprocSparkSession.builder
.appName("APP_NAME")
.dataprocSessionConfig(session)
.getOrCreate()
)
- PROJECT:您的專案 ID,位於 Google Cloud 控制台資訊主頁的「Project info」專案資訊部分。
- LOCATION:執行 Notebook 工作階段的 Compute Engine 區域。如果未提供,預設位置會是建立 Notebook 的 VM 所在的地區。
SESSION_TEMPLATE:現有 Dataproc Serverless 互動工作階段範本的名稱。會從範本取得工作階段設定。範本也必須指定下列設定:
- 執行階段版本
2.3
+ 筆記本類型:
Spark Connect
範例:
- 執行階段版本
APP_NAME:可選的會話名稱。
在 BigQuery Studio 筆記本中編寫及執行 PySpark 程式碼
在筆記本中建立 Spark 工作階段後,請使用該工作階段在筆記本中執行 Spark 筆記本程式碼。
Spark Connect PySpark API 支援:Spark Connect 的 Notebook 工作階段支援大多數 PySpark API,包括 DataFrame、Functions 和 Column,但不支援 SparkContext 和 RDD 等其他 PySpark API。詳情請參閱「Spark 3.5 支援的內容」。
Dataproc 專屬 API:Dataproc 會擴充 addArtifacts
方法,簡化將 PyPI
套件動態新增至 Spark 工作階段的程序。您可以使用 version-scheme
格式 (類似 pip install
) 指定清單。這樣一來,Spark Connect 伺服器就會在所有叢集節點上安裝套件及其相依項目,讓 UDF 的工作站可以使用這些項目。
範例:在叢集中安裝指定的 textdistance
版本和最新的相容 random2
程式庫,以便使用 textdistance
和 random2
的 UDF 在 worker 節點上執行。
spark.addArtifacts("textdistance==4.6.1", "random2", pypi=True)
Notebook 程式碼說明:當您將游標懸停在類別或方法名稱上時,BigQuery Studio 筆記本會提供程式碼說明,並在您輸入程式碼時提供程式碼補全說明。
在以下範例中,輸入 DataprocSparkSession
。將游標懸停在這個類別名稱上,即可顯示程式碼完成功能和說明文件說明。

BigQuery Studio 筆記本 PySpark 範例
本節提供 BigQuery Studio Python 筆記本範例,其中包含 PySpark 程式碼,可執行下列工作:
- 對公開的莎士比亞資料集執行字詞計數。
- 建立含有 BigLake Metastore 中繼資料的中繼資料表。
Wordcount
以下 Pyspark 範例會建立 Spark 工作階段,然後計算公開 bigquery-public-data.samples.shakespeare
資料集中的字詞出現次數。
# Basic wordcount example
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.functions as f
session = Session()
# Create the Spark session.
spark = (
DataprocSparkSession.builder
.appName("APP_NAME")
.dataprocSessionConfig(session)
.getOrCreate()
)
# Run a wordcount on the public Shakespeare dataset.
df = spark.read.format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load()
words_df = df.select(f.explode(f.split(f.col("word"), " ")).alias("word"))
word_counts_df = words_df.filter(f.col("word") != "").groupBy("word").agg(f.count("*").alias("count")).orderBy("word")
word_counts_df.show()
更改下列內容:
- APP_NAME:可選的會話名稱。
輸出內容:
儲存格輸出內容會列出 wordcount 輸出內容的範例。如要在 Google Cloud 主控台中查看工作階段詳細資料,請按一下「互動式工作階段詳細資料檢視畫面」連結。如要監控 Spark 工作階段,請在工作階段詳細資料頁面上按一下「View Spark UI」。

Interactive Session Detail View: LINK +------------+-----+ | word|count| +------------+-----+ | '| 42| | ''All| 1| | ''Among| 1| | ''And| 1| | ''But| 1| | ''Gamut'| 1| | ''How| 1| | ''Lo| 1| | ''Look| 1| | ''My| 1| | ''Now| 1| | ''O| 1| | ''Od's| 1| | ''The| 1| | ''Tis| 4| | ''When| 1| | ''tis| 1| | ''twas| 1| | 'A| 10| |'ARTEMIDORUS| 1| +------------+-----+ only showing top 20 rows
Iceberg 資料表
執行 PySpark 程式碼,使用 BigLake Metastore 中繼資料建立 Iceberg 資料表
下列範例程式碼會建立 sample_iceberg_table
,其中包含儲存在 BigLake Metastore 中的資料表中繼資料,然後查詢資料表。
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.functions as f
# Create the Dataproc Serverless session.
session = Session()
# Set the session configuration for BigQuery Metastore with the Iceberg environment.
project = "PROJECT"
region = "REGION"
subnet_name = "SUBNET_NAME"
location = "LOCATION"
session.environment_config.execution_config.subnetwork_uri = f"{subnet_name}"
warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY"
catalog = "CATALOG_NAME"
namespace = "NAMESPACE"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}"] = "org.apache.iceberg.spark.SparkCatalog"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_project"] = f"{project_id}"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_location"] = f"{location}"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.warehouse"] = f"{warehouse_dir}"
# Create the Spark Connect session.
spark = (
DataprocSparkSession.builder
.appName("APP_NAME")
.dataprocSessionConfig(session)
.getOrCreate()
)
# Create the namespace in BigQuery.
spark.sql(f"USE `{catalog}`;")
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;")
spark.sql(f"USE `{namespace}`;")
# Create the Iceberg table.
spark.sql("DROP TABLE IF EXISTS `sample_iceberg_table`");
spark.sql("CREATE TABLE sample_iceberg_table (id int, data string) USING ICEBERG;")
spark.sql("DESCRIBE sample_iceberg_table;")
# Insert table data and query the table.
spark.sql("INSERT INTO sample_iceberg_table VALUES (1, \"first row\");")
# Alter table, then query and display table data and schema.
spark.sql("ALTER TABLE sample_iceberg_table ADD COLUMNS (newDoubleCol double);")
spark.sql("DESCRIBE sample_iceberg_table;")
df = spark.sql("SELECT * FROM sample_iceberg_table")
df.show()
df.printSchema()
注意:
- PROJECT:您的專案 ID,位於 Google Cloud 控制台資訊主頁的「Project info」專案資訊部分。
- REGION 和 SUBNET_NAME:指定 Compute Engine 區域和工作階段區域中的子網路名稱。Dataproc Serverless 會在指定子網路中啟用私人 Google 存取權 (PGA)。
- LOCATION:預設的
BigQuery_metastore_config.location
和spark.sql.catalog.{catalog}.gcp_location
為US
,但您可以選擇任何支援的 BigQuery 位置。 - BUCKET 和 WAREHOUSE_DIRECTORY:用於 Iceberg 倉儲目錄的 Cloud Storage 值區和資料夾。
- CATALOG_NAME 和 NAMESPACE:Iceberg 目錄名稱和命名空間結合,用於識別 Iceberg 資料表 (
catalog.namespace.table_name
)。 - APP_NAME:可選的會話名稱。
儲存格輸出內容會列出 sample_iceberg_table
和新增的資料欄,並顯示 Google Cloud 控制台中「互動工作階段詳細資料」頁面的連結。您可以按一下工作階段詳細資料頁面中的「View Spark UI」,監控 Spark 工作階段。

Interactive Session Detail View: LINK +---+---------+------------+ | id| data|newDoubleCol| +---+---------+------------+ | 1|first row| NULL| +---+---------+------------+ root |-- id: integer (nullable = true) |-- data: string (nullable = true) |-- newDoubleCol: double (nullable = true)
在 BigQuery 中查看資料表詳細資料
如要在 BigQuery 中查看 Iceberg 資料表詳細資料,請按照下列步驟操作:
前往 Google Cloud 控制台的「BigQuery」頁面。
在專案資源窗格中,按一下您的專案,然後點選您的命名空間,即可列出
sample_iceberg_table
資料表。按一下「Details」資料表,查看「Open Catalog Table Configuration」資訊。輸入和輸出格式是 Iceberg 使用的標準 Hadoop
InputFormat
和OutputFormat
類別格式。
其他範例
從 Pandas DataFrame (df
) 建立 Spark DataFrame
(sdf
)。
sdf = spark.createDataFrame(df)
sdf.show()
在 Spark DataFrames
上執行匯總作業。
from pyspark.sql import functions as F
sdf.groupby("segment").agg(
F.mean("total_spend_per_user").alias("avg_order_value"),
F.approx_count_distinct("user_id").alias("unique_customers")
).show()
使用 Spark-BigQuery 連接器讀取 BigQuery。
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","my-bigquery-dataset")
sdf = spark.read.format('bigquery') \
.load(query)
使用 Gemini Code Assist 編寫 Spark 程式碼
您可以要求 Gemini Code Assist 在筆記本中產生 PySpark 程式碼。Gemini Code Assist 會擷取並使用相關的 BigQuery 和 Dataproc Metastore 資料表及其結構定義,產生程式碼回應。
如要在筆記本中產生 Gemini Code Assist 程式碼,請按照下列步驟操作:
按一下工具列中的「+ 程式碼」,即可插入新的程式碼儲存格。新的程式碼儲存格會顯示
Start coding or generate with AI
。按一下「產生」。在「產生」編輯器中輸入自然語言提示,然後按一下
enter
。請務必在提示中加入關鍵字spark
或pyspark
。提示範例:
create a spark dataframe from order_items and filter to orders created in 2024
輸出內容範例:
spark.read.format("bigquery").option("table", "sqlgen-testing.pysparkeval_ecommerce.order_items").load().filter("year(created_at) = 2024").createOrReplaceTempView("order_items") df = spark.sql("SELECT * FROM order_items")
Gemini Code Assist 程式碼產生功能的訣竅
如要讓 Gemini Code Assist 擷取相關的資料表和結構定義,請為 Dataproc Metastore 執行個體啟用 Data Catalog 同步處理。
請確認您的使用者帳戶有權存取資料目錄的查詢表。方法是指派
DataCatalog.Viewer
角色。
結束 Spark 工作階段
您可以採取下列任一行動,在 BigQuery Studio 筆記本中停止 Spark Connect 工作階段:
- 在筆記本儲存格中執行
spark.stop()
。 - 在筆記本中終止執行階段:
- 按一下執行階段選取器,然後點選「管理工作階段」。
- 在「Active sessions」對話方塊中,按一下終止圖示,然後點選「Terminate」。
- 按一下執行階段選取器,然後點選「管理工作階段」。
協調 BigQuery Studio 筆記本程式碼
您可以透過下列方式編排 BigQuery Studio 筆記本程式碼:
透過 Google Cloud 控制台排定筆記本程式碼 (適用筆記本價格)。
以 Dataproc Serverless 批次工作負載形式執行筆記本程式碼 (適用 Dataproc Serverless 定價)。
透過 Google Cloud 主控台安排筆記本程式碼
您可以透過下列方式排定程式碼筆記:
以 Dataproc Serverless 批次工作負載形式執行筆記本程式碼
請完成下列步驟,以 Dataproc Serverless 批次工作負載的形式執行 BigQuery Studio 筆記本程式碼。
在本機終端機或 Cloud Shell 中,將 Notebook 程式碼下載至檔案。
在 Google Cloud 控制台的「BigQuery Studio」頁面上,透過「Explorer」面板開啟筆記本。
如要下載筆記本程式碼,請依序選取「檔案」選單中的「下載」,然後選擇
Download .py
。
產生
requirements.txt
。- 在儲存
.py
檔案的目錄中安裝pipreqs
。pip install pipreqs
執行
pipreqs
即可產生requirements.txt
。pipreqs filename.py
使用
gsutil
工具將本機requirements.txt
檔案複製到 Cloud Storage 中的值區。gsutil cp requirements.txt gs://BUCKET/
- 在儲存
編輯下載的
.py
檔案,更新 Spark 工作階段程式碼。移除或註解排除任何 Shell 指令碼指令。
移除設定 Spark 工作階段的程式碼,然後將設定參數指定為批次工作負載提交參數。(請參閱「提交 Spark 批次工作負載」)。
範例:
從程式碼中移除下列會話子網路設定行:
session.environment_config.execution_config.subnetwork_uri = "{subnet_name}"
執行批次工作負載時,請使用
--subnet
標記指定子網路。gcloud dataproc batches submit pyspark \ --subnet=SUBNET_NAME
使用簡單的工作階段建立程式碼片段。
下載的筆記本程式碼範例 (精簡前)。
from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session
session = Session() spark = DataprocSparkSession \ .builder \ .appName("CustomSparkSession") .dataprocSessionConfig(session) \ .getOrCreate()
簡化後的批次工作負載程式碼。
from pyspark.sql import SparkSession
spark = SparkSession \ .builder \ .getOrCreate()
-
如需操作說明,請參閱「提交 Spark 批次工作負載」。
請務必加入 --deps-bucket 標記,指向包含
requirements.txt
檔案的 Cloud Storage 值區。範例:
gcloud dataproc batches submit pyspark FILENAME.py \ --region=REGION \ --deps-bucket=BUCKET \ --version=2.3
注意:
- FILENAME:已下載及編輯的 Notebook 程式碼檔案名稱。
- REGION:叢集所在的 Compute Engine 地區。
- BUCKET 包含
requirements.txt
檔案的 Cloud Storage 值區名稱。 --version
:系統會選取Spark Runtime 2.3 版來執行批次工作負載。
提交程式碼。
- 測試批次工作負載程式碼後,您可以使用
git
用戶端 (例如 GitHub、GitLab 或 Bitbucket) 將.ipynb
或.py
檔案提交至存放區,做為 CI/CD 管道的一部分。
- 測試批次工作負載程式碼後,您可以使用
使用 Cloud Composer 排定批次工作負載。
- 如需操作說明,請參閱「使用 Cloud Composer 執行 Dataproc Serverless 工作負載」一文。
排解筆記本錯誤
如果含有 Spark 程式碼的儲存格發生錯誤,您可以按一下儲存格輸出內容中的「Interactive Session Detail View」連結,排解錯誤 (請參閱 Wordcount 和 Iceberg 表格範例)。
已知問題和解決方法
錯誤:使用 Python 版本 3.10
建立的筆記本執行階段,在嘗試連線至 Spark 工作階段時,可能會導致 PYTHON_VERSION_MISMATCH
錯誤。
解決方案:使用 Python 版本 3.11
重新建立執行階段。
後續步驟
- YouTube 影片示範:發揮與 BigQuery 整合的 Apache Spark 強大功能。
- 搭配 Dataproc 使用 BigLake Metastore
- 搭配使用 BigLake Metastore 和 Dataproc Serverless