在 BigQuery Studio 筆記本中執行 PySpark 程式碼

本文件說明如何在 BigQuery Python 筆記本中執行 PySpark 程式碼。

事前準備

如果您尚未建立,請先建立 Google Cloud 專案和 Cloud Storage 值區

  1. 設定專案

    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    3. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

  2. 如果沒有可用的 Cloud Storage 值區,請在專案中建立 Cloud Storage 值區

  3. 設定筆記本

    1. 筆記本憑證:根據預設,筆記本工作階段會使用您的使用者憑證。如果您想為工作階段指定服務帳戶憑證,該帳戶必須具備 Dataproc 工作者 (roles/dataproc.worker 角色)。詳情請參閱 Dataproc 無伺服器服務帳戶
    2. 筆記本執行階段:除非您選取其他執行階段,否則筆記本會使用預設的 Vertex 執行階段。如果您想自行定義執行階段,請在 Google Cloud 控制台的「Runtimes」頁面中建立執行階段。

定價

如需價格資訊,請參閱 BigQuery 的Notebook 執行時間定價

開啟 BigQuery Studio Python 筆記本

  1. 前往 Google Cloud 控制台的「BigQuery」頁面。

    前往 BigQuery

  2. 在詳細資料窗格的分頁標籤列中,按一下「+」符號旁的 箭頭,然後點選「Notebook」

在 BigQuery Studio 筆記本中建立 Spark 工作階段

您可以使用 BigQuery Studio Python 筆記本建立 Spark Connect 互動式工作階段。每個 BigQuery Studio 筆記本只能與一個有效的 Dataproc Serverless 工作階段建立關聯。

您可以透過下列方式,在 BigQuery Studio Python 筆記本中建立 Spark 工作階段:

  • 在筆記本中設定及建立單一工作階段。
  • Dataproc Serverless for Spark 互動工作階段範本中設定 Spark 工作階段,然後使用範本在筆記本中設定及建立工作階段。BigQuery 提供 Query using Spark 功能,可協助您開始編寫範本工作階段的程式碼,如「範本 Spark 工作階段」分頁所述。

單一工作階段

如要在新筆記本中建立 Spark 工作階段,請按照下列步驟操作:

  1. 在編輯器窗格中的分頁列中,按一下「+」符號旁的 箭頭下拉式選單,然後點選「Notebook」

    螢幕截圖:顯示 BigQuery 介面,其中有用於建立新筆記本的「+」按鈕。
  2. 在筆記本儲存格中複製並執行下列程式碼,即可設定並建立基本的 Spark 工作階段。

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

import pyspark.sql.connect.functions as f

session = Session()

# Create the Spark session.
spark = (
   DataprocSparkSession.builder
     .appName("APP_NAME")
     .dataprocSessionConfig(session)
     .getOrCreate()
)

更改下列內容:

  • APP_NAME:可選的會話名稱。
  • 選用的工作階段設定:您可以新增 Dataproc API Session 設定,自訂工作階段。以下舉例說明:
    • RuntimeConfig
      顯示 session.runtime.config 選項的程式碼說明。
      • session.runtime_config.properties={spark.property.key1:VALUE_1,...,spark.property.keyN:VALUE_N}
      • session.runtime_config.container_image = path/to/container/image
    • EnvironmentConfig
      顯示工作階段-環境-設定-執行設定選項的程式碼說明。
      • session.environment_config.execution_config.subnetwork_uri = "SUBNET_NAME"
      • session.environment_config.execution_config.ttl = {"seconds": VALUE}
      • session.environment_config.execution_config.service_account = SERVICE_ACCOUNT

範本 Spark 工作階段

您可以在筆記本儲存格中輸入並執行程式碼,根據現有的 Dataproc Serverless 工作階段範本建立 Spark 工作階段。您在筆記本程式碼中提供的任何 session 設定,都會覆寫在工作階段範本中設定的任何相同設定。

如要快速上手,請使用 Query using Spark 範本,在 Notebook 中預先填入 Spark 工作階段範本程式碼:

  1. 在編輯器窗格中的分頁列中,按一下「+」符號旁的 箭頭下拉式選單,然後點選「Notebook」
    螢幕截圖:顯示 BigQuery 介面,其中有用於建立新筆記本的「+」按鈕。
  2. 在「從範本開始」下方,點選「使用 Spark 查詢」,然後點選「使用範本」,即可在筆記本中插入程式碼。
    開始使用範本時的 BigQuery UI 選項
  3. 請按照「附註」一節的說明指定變數。
  4. 您可以刪除在筆記本中插入的任何其他範例程式碼儲存格。
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.functions as f
session = Session()
# Configure the session with an existing session template.
session_template = "SESSION_TEMPLATE"
session.session_template = f"projects/{project}/locations/{location}/sessionTemplates/{session_template}"
# Create the Spark session.
spark = (
   DataprocSparkSession.builder
     .appName("APP_NAME")
     .dataprocSessionConfig(session)
     .getOrCreate()
)

取代下列內容:

  • PROJECT:您的專案 ID,位於 Google Cloud 控制台資訊主頁的「Project info」專案資訊部分。
  • LOCATION:執行 Notebook 工作階段的 Compute Engine 區域。如果未提供,預設位置會是建立 Notebook 的 VM 所在的地區。
  • SESSION_TEMPLATE:現有 Dataproc Serverless 互動工作階段範本的名稱。會從範本取得工作階段設定。範本也必須指定下列設定:

    • 執行階段版本 2.3+
    • 筆記本類型:Spark Connect

      範例:

      螢幕截圖:顯示 Spark Connect 的必要設定。
  • APP_NAME:可選的會話名稱。

在 BigQuery Studio 筆記本中編寫及執行 PySpark 程式碼

在筆記本中建立 Spark 工作階段後,請使用該工作階段在筆記本中執行 Spark 筆記本程式碼。

Spark Connect PySpark API 支援:Spark Connect 的 Notebook 工作階段支援大多數 PySpark API,包括 DataFrameFunctionsColumn,但不支援 SparkContextRDD 等其他 PySpark API。詳情請參閱「Spark 3.5 支援的內容」。

Dataproc 專屬 API:Dataproc 會擴充 addArtifacts 方法,簡化將 PyPI 套件動態新增至 Spark 工作階段的程序。您可以使用 version-scheme 格式 (類似 pip install) 指定清單。這樣一來,Spark Connect 伺服器就會在所有叢集節點上安裝套件及其相依項目,讓 UDF 的工作站可以使用這些項目。

範例:在叢集中安裝指定的 textdistance 版本和最新的相容 random2 程式庫,以便使用 textdistancerandom2 的 UDF 在 worker 節點上執行。

spark.addArtifacts("textdistance==4.6.1", "random2", pypi=True)

Notebook 程式碼說明:當您將游標懸停在類別或方法名稱上時,BigQuery Studio 筆記本會提供程式碼說明,並在您輸入程式碼時提供程式碼補全說明。

在以下範例中,輸入 DataprocSparkSession。將游標懸停在這個類別名稱上,即可顯示程式碼完成功能和說明文件說明。

程式碼文件和程式碼完成提示範例。

BigQuery Studio 筆記本 PySpark 範例

本節提供 BigQuery Studio Python 筆記本範例,其中包含 PySpark 程式碼,可執行下列工作:

  • 對公開的莎士比亞資料集執行字詞計數。
  • 建立含有 BigLake Metastore 中繼資料的中繼資料表。

Wordcount

以下 Pyspark 範例會建立 Spark 工作階段,然後計算公開 bigquery-public-data.samples.shakespeare 資料集中的字詞出現次數。

# Basic wordcount example
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.functions as f
session = Session()

# Create the Spark session.
spark = (
   DataprocSparkSession.builder
     .appName("APP_NAME")
     .dataprocSessionConfig(session)
     .getOrCreate()
)
# Run a wordcount on the public Shakespeare dataset.
df = spark.read.format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load()
words_df = df.select(f.explode(f.split(f.col("word"), " ")).alias("word"))
word_counts_df = words_df.filter(f.col("word") != "").groupBy("word").agg(f.count("*").alias("count")).orderBy("word")
word_counts_df.show()

更改下列內容:

  • APP_NAME:可選的會話名稱。

輸出內容:

儲存格輸出內容會列出 wordcount 輸出內容的範例。如要在 Google Cloud 主控台中查看工作階段詳細資料,請按一下「互動式工作階段詳細資料檢視畫面」連結。如要監控 Spark 工作階段,請在工作階段詳細資料頁面上按一下「View Spark UI」

在控制台的工作階段詳細資料頁面中查看 Spark UI 按鈕
Interactive Session Detail View: LINK
+------------+-----+
|        word|count|
+------------+-----+
|           '|   42|
|       ''All|    1|
|     ''Among|    1|
|       ''And|    1|
|       ''But|    1|
|    ''Gamut'|    1|
|       ''How|    1|
|        ''Lo|    1|
|      ''Look|    1|
|        ''My|    1|
|       ''Now|    1|
|         ''O|    1|
|      ''Od's|    1|
|       ''The|    1|
|       ''Tis|    4|
|      ''When|    1|
|       ''tis|    1|
|      ''twas|    1|
|          'A|   10|
|'ARTEMIDORUS|    1|
+------------+-----+
only showing top 20 rows

Iceberg 資料表

執行 PySpark 程式碼,使用 BigLake Metastore 中繼資料建立 Iceberg 資料表

下列範例程式碼會建立 sample_iceberg_table,其中包含儲存在 BigLake Metastore 中的資料表中繼資料,然後查詢資料表。

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.functions as f
# Create the Dataproc Serverless session.
session = Session()
# Set the session configuration for BigQuery Metastore with the Iceberg environment.
project = "PROJECT"
region = "REGION"
subnet_name = "SUBNET_NAME"
location = "LOCATION"
session.environment_config.execution_config.subnetwork_uri = f"{subnet_name}"
warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY"
catalog = "CATALOG_NAME"
namespace = "NAMESPACE"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}"] = "org.apache.iceberg.spark.SparkCatalog"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_project"] = f"{project_id}"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_location"] = f"{location}"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.warehouse"] = f"{warehouse_dir}"
# Create the Spark Connect session.
spark = (
   DataprocSparkSession.builder
     .appName("APP_NAME")
     .dataprocSessionConfig(session)
     .getOrCreate()
)
# Create the namespace in BigQuery.
spark.sql(f"USE `{catalog}`;")
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;")
spark.sql(f"USE `{namespace}`;")
# Create the Iceberg table.
spark.sql("DROP TABLE IF EXISTS `sample_iceberg_table`");
spark.sql("CREATE TABLE sample_iceberg_table (id int, data string) USING ICEBERG;")
spark.sql("DESCRIBE sample_iceberg_table;")
# Insert table data and query the table.
spark.sql("INSERT INTO sample_iceberg_table VALUES (1, \"first row\");")
# Alter table, then query and display table data and schema.
spark.sql("ALTER TABLE sample_iceberg_table ADD COLUMNS (newDoubleCol double);")
spark.sql("DESCRIBE sample_iceberg_table;")
df = spark.sql("SELECT * FROM sample_iceberg_table")
df.show()
df.printSchema()

注意:

  • PROJECT:您的專案 ID,位於 Google Cloud 控制台資訊主頁的「Project info」專案資訊部分。
  • REGIONSUBNET_NAME:指定 Compute Engine 區域和工作階段區域中的子網路名稱。Dataproc Serverless 會在指定子網路中啟用私人 Google 存取權 (PGA)
  • LOCATION:預設的 BigQuery_metastore_config.locationspark.sql.catalog.{catalog}.gcp_locationUS,但您可以選擇任何支援的 BigQuery 位置
  • BUCKETWAREHOUSE_DIRECTORY:用於 Iceberg 倉儲目錄的 Cloud Storage 值區和資料夾。
  • CATALOG_NAMENAMESPACE:Iceberg 目錄名稱和命名空間結合,用於識別 Iceberg 資料表 (catalog.namespace.table_name)。
  • APP_NAME:可選的會話名稱。

儲存格輸出內容會列出 sample_iceberg_table 和新增的資料欄,並顯示 Google Cloud 控制台中「互動工作階段詳細資料」頁面的連結。您可以按一下工作階段詳細資料頁面中的「View Spark UI」,監控 Spark 工作階段。

Interactive Session Detail View: LINK
+---+---------+------------+
| id|     data|newDoubleCol|
+---+---------+------------+
|  1|first row|        NULL|
+---+---------+------------+

root
 |-- id: integer (nullable = true)
 |-- data: string (nullable = true)
 |-- newDoubleCol: double (nullable = true)

在 BigQuery 中查看資料表詳細資料

如要在 BigQuery 中查看 Iceberg 資料表詳細資料,請按照下列步驟操作:

  1. 前往 Google Cloud 控制台的「BigQuery」頁面。

    前往 BigQuery

  2. 在專案資源窗格中,按一下您的專案,然後點選您的命名空間,即可列出 sample_iceberg_table 資料表。按一下「Details」資料表,查看「Open Catalog Table Configuration」資訊。

    輸入和輸出格式是 Iceberg 使用的標準 Hadoop InputFormatOutputFormat 類別格式。

    BigQuery UI 中列出的 Iceberg 資料表中繼資料

其他範例

從 Pandas DataFrame (df) 建立 Spark DataFrame (sdf)。

sdf = spark.createDataFrame(df)
sdf.show()

在 Spark DataFrames 上執行匯總作業。

from pyspark.sql import functions as F

sdf.groupby("segment").agg(
   F.mean("total_spend_per_user").alias("avg_order_value"),
   F.approx_count_distinct("user_id").alias("unique_customers")
).show()

使用 Spark-BigQuery 連接器讀取 BigQuery。

spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","my-bigquery-dataset")

sdf = spark.read.format('bigquery') \
 .load(query)

使用 Gemini Code Assist 編寫 Spark 程式碼

您可以要求 Gemini Code Assist 在筆記本中產生 PySpark 程式碼。Gemini Code Assist 會擷取並使用相關的 BigQuery 和 Dataproc Metastore 資料表及其結構定義,產生程式碼回應。

如要在筆記本中產生 Gemini Code Assist 程式碼,請按照下列步驟操作:

  1. 按一下工具列中的「+ 程式碼」,即可插入新的程式碼儲存格。新的程式碼儲存格會顯示 Start coding or generate with AI。按一下「產生」

  2. 在「產生」編輯器中輸入自然語言提示,然後按一下 enter請務必在提示中加入關鍵字 sparkpyspark

    提示範例:

    create a spark dataframe from order_items and filter to orders created in 2024
    

    輸出內容範例:

    spark.read.format("bigquery").option("table", "sqlgen-testing.pysparkeval_ecommerce.order_items").load().filter("year(created_at) = 2024").createOrReplaceTempView("order_items")
    df = spark.sql("SELECT * FROM order_items")
    

Gemini Code Assist 程式碼產生功能的訣竅

  • 如要讓 Gemini Code Assist 擷取相關的資料表和結構定義,請為 Dataproc Metastore 執行個體啟用 Data Catalog 同步處理

  • 請確認您的使用者帳戶有權存取資料目錄的查詢表。方法是指派 DataCatalog.Viewer 角色

結束 Spark 工作階段

您可以採取下列任一行動,在 BigQuery Studio 筆記本中停止 Spark Connect 工作階段:

  • 在筆記本儲存格中執行 spark.stop()
  • 在筆記本中終止執行階段:
    1. 按一下執行階段選取器,然後點選「管理工作階段」
      管理工作階段選項
    2. 在「Active sessions」對話方塊中,按一下終止圖示,然後點選「Terminate」
      在「Active sessions」對話方塊中終止工作階段選項

協調 BigQuery Studio 筆記本程式碼

您可以透過下列方式編排 BigQuery Studio 筆記本程式碼:

透過 Google Cloud 主控台安排筆記本程式碼

您可以透過下列方式排定程式碼筆記:

以 Dataproc Serverless 批次工作負載形式執行筆記本程式碼

請完成下列步驟,以 Dataproc Serverless 批次工作負載的形式執行 BigQuery Studio 筆記本程式碼。

  1. 在本機終端機或 Cloud Shell 中,將 Notebook 程式碼下載至檔案。

    1. 在 Google Cloud 控制台的「BigQuery Studio頁面上,透過「Explorer」面板開啟筆記本。

    2. 如要下載筆記本程式碼,請依序選取「檔案」選單中的「下載」,然後選擇 Download .py

      在探索工具頁面中依序點選「檔案」>「下載」選單。
  2. 產生 requirements.txt

    1. 在儲存 .py 檔案的目錄中安裝 pipreqs
      pip install pipreqs
      
    2. 執行 pipreqs 即可產生 requirements.txt

      pipreqs filename.py
      

    3. 使用 gsutil 工具將本機 requirements.txt 檔案複製到 Cloud Storage 中的值區。

      gsutil cp requirements.txt gs://BUCKET/
      
  3. 編輯下載的 .py 檔案,更新 Spark 工作階段程式碼。

    1. 移除或註解排除任何 Shell 指令碼指令。

    2. 移除設定 Spark 工作階段的程式碼,然後將設定參數指定為批次工作負載提交參數。(請參閱「提交 Spark 批次工作負載」)。

      範例:

      • 從程式碼中移除下列會話子網路設定行:

        session.environment_config.execution_config.subnetwork_uri = "{subnet_name}"
        

      • 執行批次工作負載時,請使用 --subnet 標記指定子網路。

        gcloud dataproc batches submit pyspark \
        --subnet=SUBNET_NAME
        
    3. 使用簡單的工作階段建立程式碼片段。

      • 下載的筆記本程式碼範例 (精簡前)。

        from google.cloud.dataproc_spark_connect import DataprocSparkSession
        from google.cloud.dataproc_v1 import Session
        

        session = Session() spark = DataprocSparkSession \     .builder \     .appName("CustomSparkSession")     .dataprocSessionConfig(session) \     .getOrCreate()

      • 簡化後的批次工作負載程式碼。

        from pyspark.sql import SparkSession
        

        spark = SparkSession \ .builder \ .getOrCreate()

  4. 執行批次工作負載。

    1. 如需操作說明,請參閱「提交 Spark 批次工作負載」。

      • 請務必加入 --deps-bucket 標記,指向包含 requirements.txt 檔案的 Cloud Storage 值區。

        範例:

      gcloud dataproc batches submit pyspark FILENAME.py \
          --region=REGION \
          --deps-bucket=BUCKET \
          --version=2.3 
      

      注意:

      • FILENAME:已下載及編輯的 Notebook 程式碼檔案名稱。
      • REGION:叢集所在的 Compute Engine 地區
      • BUCKET 包含 requirements.txt 檔案的 Cloud Storage 值區名稱。
      • --version:系統會選取Spark Runtime 2.3 版來執行批次工作負載。
  5. 提交程式碼。

    1. 測試批次工作負載程式碼後,您可以使用 git 用戶端 (例如 GitHub、GitLab 或 Bitbucket) 將 .ipynb.py 檔案提交至存放區,做為 CI/CD 管道的一部分。
  6. 使用 Cloud Composer 排定批次工作負載。

    1. 如需操作說明,請參閱「使用 Cloud Composer 執行 Dataproc Serverless 工作負載」一文。

排解筆記本錯誤

如果含有 Spark 程式碼的儲存格發生錯誤,您可以按一下儲存格輸出內容中的「Interactive Session Detail View」連結,排解錯誤 (請參閱 Wordcount 和 Iceberg 表格範例)。

已知問題和解決方法

錯誤:使用 Python 版本 3.10 建立的筆記本執行階段,在嘗試連線至 Spark 工作階段時,可能會導致 PYTHON_VERSION_MISMATCH 錯誤。

解決方案:使用 Python 版本 3.11 重新建立執行階段。

後續步驟