在 BigQuery Studio 笔记本中运行 PySpark 代码

本文档介绍了如何在 BigQuery Python 笔记本中运行 PySpark 代码。

准备工作

如果您尚未创建项目和 Cloud Storage 存储桶,请先创建这些资源。 Google Cloud

  1. 设置您的项目

    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    3. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    6. 如果您没有可用的 Cloud Storage 存储桶,请在项目中创建 Cloud Storage 存储桶

    7. 设置笔记本

      1. 笔记本凭证:默认情况下,笔记本会话会使用您的用户凭证。如果您想为会话指定服务账号凭证,该服务账号必须具有 Dataproc Worker(roles/dataproc.worker 角色)。如需了解详情,请参阅 Dataproc Serverless 服务账号
      2. 笔记本运行时:除非您选择其他运行时,否则笔记本会使用默认的 Vertex 运行时。如果您想定义自己的运行时,请在 Google Cloud 控制台的运行时页面中创建运行时。
    8. 价格

      如需了解价格信息,请参阅 BigQuery 笔记本运行时价格

      打开 BigQuery Studio Python 笔记本

      1. 在 Google Cloud 控制台中,前往 BigQuery 页面。

        转到 BigQuery

      2. 在详细信息窗格的标签页栏中,点击 + 号旁边的 箭头,然后点击笔记本

      在 BigQuery Studio 笔记本中创建 Spark 会话

      您可以使用 BigQuery Studio Python 笔记本创建 Spark Connect 交互式会话。每个 BigQuery Studio 笔记本只能有一个与之关联的活跃 Dataproc Serverless 会话。

      您可以通过以下方式在 BigQuery Studio Python 笔记本中创建 Spark 会话:

      • 在笔记本中配置和创建单个会话。
      • Dataproc Serverless for Spark 交互式会话模板中配置 Spark 会话,然后使用该模板在笔记本中配置和创建会话。BigQuery 提供了一个 Query using Spark 功能,可帮助您按照模板化 Spark 会话标签页下所述开始编写模板化会话的代码。

      单个会话

      如需在新笔记本中创建 Spark 会话,请执行以下操作:

      1. 在编辑器窗格的标签页栏中,点击 + 号旁边的 下拉箭头,然后点击笔记本

        显示 BigQuery 界面的屏幕截图,其中包含用于创建新笔记本的“+”按钮。
      2. 在笔记本单元中复制并运行以下代码,以配置和创建基本 Spark 会话。

      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      
      import pyspark.sql.connect.functions as f
      
      session = Session()
      
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      

      替换以下内容:

      • APP_NAME:会话的可选名称。
      • 可选的会话设置:您可以添加 Dataproc API Session 设置来自定义会话。下面是一些示例:
        • RuntimeConfig
          显示 session.runtime.config 选项的代码帮助。
          • session.runtime_config.properties={spark.property.key1:VALUE_1,...,spark.property.keyN:VALUE_N}
          • session.runtime_config.container_image = path/to/container/image
        • EnvironmentConfig
          显示 session-environment-config-execution-config 选项的代码帮助。
          • session.environment_config.execution_config.subnetwork_uri = "SUBNET_NAME"
          • session.environment_config.execution_config.ttl = {"seconds": VALUE}
          • session.environment_config.execution_config.service_account = SERVICE_ACCOUNT

      模板化 Spark 会话

      您可以在笔记本单元中输入并运行代码,以根据现有的 Dataproc Serverless 会话模板创建 Spark 会话。您在笔记本代码中提供的任何 session 配置设置都会替换会话模板中设置的任何相同设置。

      如需快速上手,请使用 Query using Spark 模板,使用 Spark 会话模板代码预先填充笔记本:

      1. 在编辑器窗格的标签页栏中,点击 + 号旁边的 下拉箭头,然后点击笔记本
        显示 BigQuery 界面的屏幕截图,其中包含用于创建新笔记本的“+”按钮。
      2. 从模板开始下方,点击使用 Spark 查询,然后点击使用模板以将代码插入笔记本中。
        使用模板开始 BigQuery 界面选择
      3. 按照备注中所述指定变量。
      4. 您可以删除在笔记本中插入的任何其他示例代码单元。
      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.functions as f
      session = Session()
      # Configure the session with an existing session template.
      session_template = "SESSION_TEMPLATE"
      session.session_template = f"projects/{project}/locations/{location}/sessionTemplates/{session_template}"
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      

      请替换以下内容:

      • PROJECT:您的项目 ID,列在 Google Cloud 控制台信息中心项目信息部分中。
      • LOCATION:笔记本会话将在其中运行的 Compute Engine 区域。如果未提供,则默认位置为创建笔记本的虚拟机所在的区域。
      • SESSION_TEMPLATE:现有 Dataproc Serverless 交互式会话模板的名称。会话配置设置是从模板中获取的。该模板还必须指定以下设置:

        • 运行时版本 2.3+
        • 笔记本类型:Spark Connect

          示例:

          显示 Spark Connect 所需设置的屏幕截图。
      • APP_NAME:会话的可选名称。

      在 BigQuery Studio 笔记本中编写和运行 PySpark 代码

      在笔记本中创建 Spark 会话后,使用该会话在笔记本中运行 Spark 笔记本代码。

      Spark Connect PySpark API 支持:Spark Connect 笔记本会话支持大多数 PySpark API,包括 DataFrameFunctionsColumn,但不支持 SparkContextRDD 和其他 PySpark API。如需了解详情,请参阅 Spark 3.5 支持的内容

      Dataproc 特有的 API:Dataproc 通过扩展 addArtifacts 方法,简化了向 Spark 会话动态添加 PyPI 软件包的操作。您可以使用 version-scheme 格式(类似于 pip install)指定列表。这会指示 Spark Connect 服务器在所有集群节点上安装软件包及其依赖项,以便可供工作器用于 UDF。

      示例:在集群上安装指定的 textdistance 版本和最新的兼容 random2 库,以允许使用 textdistancerandom2 的 UDF 在 工作器节点上运行。

      spark.addArtifacts("textdistance==4.6.1", "random2", pypi=True)
      

      笔记本代码帮助:当您将指针悬停在类名称或方法名称上时,BigQuery Studio 笔记本会提供代码帮助;当您输入代码时,会提供代码补全帮助。

      在以下示例中,输入 DataprocSparkSession,然后将指针悬停在该类名称上,系统会显示代码补全和文档帮助。

      代码文档和代码补全提示示例。

      BigQuery Studio 笔记本 PySpark 示例

      本部分提供了包含 PySpark 代码的 BigQuery Studio Python 笔记本示例,用于执行以下任务:

      • 对公开的莎士比亚数据集运行字数统计。
      • 创建一个包含保存在 BigLake metastore 中的元数据的 Iceberg 表。

      字数统计

      以下 Pyspark 示例创建一个 Spark 会话,然后统计公开 bigquery-public-data.samples.shakespeare 数据集中的字词出现次数。

      # Basic wordcount example
      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.functions as f
      session = Session()
      
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      # Run a wordcount on the public Shakespeare dataset.
      df = spark.read.format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load()
      words_df = df.select(f.explode(f.split(f.col("word"), " ")).alias("word"))
      word_counts_df = words_df.filter(f.col("word") != "").groupBy("word").agg(f.count("*").alias("count")).orderBy("word")
      word_counts_df.show()
      

      替换以下内容:

      • APP_NAME:会话的可选名称。

      输出:

      单元输出列出了字数统计输出的示例。如需在 Google Cloud 控制台中查看会话详情,请点击交互式会话详情视图链接。如需监控 Spark 会话,请点击会话详情页面上的查看 Spark 界面

      控制台的会话详情页面中的“查看 Spark 界面”按钮
      Interactive Session Detail View: LINK
      +------------+-----+
      |        word|count|
      +------------+-----+
      |           '|   42|
      |       ''All|    1|
      |     ''Among|    1|
      |       ''And|    1|
      |       ''But|    1|
      |    ''Gamut'|    1|
      |       ''How|    1|
      |        ''Lo|    1|
      |      ''Look|    1|
      |        ''My|    1|
      |       ''Now|    1|
      |         ''O|    1|
      |      ''Od's|    1|
      |       ''The|    1|
      |       ''Tis|    4|
      |      ''When|    1|
      |       ''tis|    1|
      |      ''twas|    1|
      |          'A|   10|
      |'ARTEMIDORUS|    1|
      +------------+-----+
      only showing top 20 rows
      

      Iceberg 表

      运行 PySpark 代码以创建包含 BigLake metastore 元数据的 Iceberg 表

      以下示例代码会创建一个 sample_iceberg_table,其中包含存储在 BigLake metastore 中的表元数据,然后查询该表。

      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.functions as f
      # Create the Dataproc Serverless session.
      session = Session()
      # Set the session configuration for BigQuery Metastore with the Iceberg environment.
      project = "PROJECT"
      region = "REGION"
      subnet_name = "SUBNET_NAME"
      location = "LOCATION"
      session.environment_config.execution_config.subnetwork_uri = f"{subnet_name}"
      warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY"
      catalog = "CATALOG_NAME"
      namespace = "NAMESPACE"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}"] = "org.apache.iceberg.spark.SparkCatalog"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_project"] = f"{project_id}"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_location"] = f"{location}"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.warehouse"] = f"{warehouse_dir}"
      # Create the Spark Connect session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      # Create the namespace in BigQuery.
      spark.sql(f"USE `{catalog}`;")
      spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;")
      spark.sql(f"USE `{namespace}`;")
      # Create the Iceberg table.
      spark.sql("DROP TABLE IF EXISTS `sample_iceberg_table`");
      spark.sql("CREATE TABLE sample_iceberg_table (id int, data string) USING ICEBERG;")
      spark.sql("DESCRIBE sample_iceberg_table;")
      # Insert table data and query the table.
      spark.sql("INSERT INTO sample_iceberg_table VALUES (1, \"first row\");")
      # Alter table, then query and display table data and schema.
      spark.sql("ALTER TABLE sample_iceberg_table ADD COLUMNS (newDoubleCol double);")
      spark.sql("DESCRIBE sample_iceberg_table;")
      df = spark.sql("SELECT * FROM sample_iceberg_table")
      df.show()
      df.printSchema()
      

      注意:

      • PROJECT:您的项目 ID,列在 Google Cloud 控制台信息中心项目信息部分中。
      • REGIONSUBNET_NAME:指定 Compute Engine 区域和会话区域中子网的名称。Dataproc Serverless 会在指定的子网中启用专用 Google 访问通道 (PGA)
      • LOCATION:默认的 BigQuery_metastore_config.locationspark.sql.catalog.{catalog}.gcp_locationUS,但您可以选择任何受支持的 BigQuery 位置
      • BUCKETWAREHOUSE_DIRECTORY:用于 Iceberg 数据仓库目录的 Cloud Storage 存储桶和文件夹。
      • CATALOG_NAMENAMESPACE:Iceberg 目录名称和命名空间组合起来,用于标识 Iceberg 表 (catalog.namespace.table_name)。
      • APP_NAME:会话的可选名称。

      单元输出列出了 sample_iceberg_table 以及添加的列,并显示了指向控制台 Google Cloud 中交互式会话详情页面的链接。您可以点击会话详情页面上的查看 Spark 界面,以监控 Spark 会话。

      Interactive Session Detail View: LINK
      +---+---------+------------+
      | id|     data|newDoubleCol|
      +---+---------+------------+
      |  1|first row|        NULL|
      +---+---------+------------+
      
      root
       |-- id: integer (nullable = true)
       |-- data: string (nullable = true)
       |-- newDoubleCol: double (nullable = true)
      

      在 BigQuery 中查看表详情

      如需在 BigQuery 中查看 Iceberg 表详情,请执行以下步骤:

      1. 在 Google Cloud 控制台中,前往 BigQuery 页面。

        转到 BigQuery

      2. 在项目资源窗格中,点击您的项目,然后点击您的命名空间以列出 sample_iceberg_table 表。点击详细信息表以查看打开目录表配置信息。

        输入和输出格式是 Iceberg 使用的标准 Hadoop InputFormatOutputFormat 类格式。

        BigQuery 界面中列出的 Iceberg 表元数据

      其他示例

      从 Pandas DataFrame (df) 创建 Spark DataFrame (sdf)。

      sdf = spark.createDataFrame(df)
      sdf.show()
      

      在 Spark DataFrames 上运行聚合。

      from pyspark.sql import functions as F
      
      sdf.groupby("segment").agg(
         F.mean("total_spend_per_user").alias("avg_order_value"),
         F.approx_count_distinct("user_id").alias("unique_customers")
      ).show()
      

      使用 Spark-BigQuery 连接器从 BigQuery 读取数据。

      spark.conf.set("viewsEnabled","true")
      spark.conf.set("materializationDataset","my-bigquery-dataset")
      
      sdf = spark.read.format('bigquery') \
       .load(query)
      

      使用 Gemini Code Assist 编写 Spark 代码

      您可以让 Gemini Code Assist 在笔记本中生成 PySpark 代码。Gemini Code Assist 会提取并使用相关的 BigQuery 和 Dataproc Metastore 表及其架构来生成代码响应。

      如需在笔记本中生成 Gemini Code Assist 代码,请执行以下操作:

      1. 点击工具栏中的 + 代码,插入新的代码单元。新的代码单元会显示 Start coding or generate with AI。点击生成

      2. 在生成编辑器中,输入自然语言提示,然后点击 enter请务必在提示中添加关键字 sparkpyspark

        示例提示:

        create a spark dataframe from order_items and filter to orders created in 2024
        

        示例输出:

        spark.read.format("bigquery").option("table", "sqlgen-testing.pysparkeval_ecommerce.order_items").load().filter("year(created_at) = 2024").createOrReplaceTempView("order_items")
        df = spark.sql("SELECT * FROM order_items")
        

      Gemini Code Assist 代码生成提示

      • 如需让 Gemini Code Assist 提取相关表和架构,请为 Dataproc Metastore 实例启用 Data Catalog 同步

      • 确保您的用户账号有权访问 Data Catalog 查询表。为此,请分配 DataCatalog.Viewer 角色

      结束 Spark 会议

      您可以在 BigQuery Studio 笔记本中执行以下任一操作来停止 Spark Connect 会话:

      • 在笔记本单元中运行 spark.stop()
      • 在笔记本中终止运行时:
        1. 点击运行时选择器,然后点击管理会话
          管理会话选择
        2. 活跃会话对话框中,点击终止图标,然后点击终止
          “活跃会话”对话框中的“终止会话”选项

      编排 BigQuery Studio 笔记本代码

      您可以通过以下方式编排 BigQuery Studio 笔记本代码:

      从 Google Cloud 控制台安排笔记本代码

      您可以通过以下方式安排笔记本代码:

      • 安排笔记本
      • 如果笔记本代码执行是工作流的一部分,请安排笔记本作为流水线的一部分。

      将笔记本代码作为 Dataproc Serverless 批量工作负载运行

      如需将 BigQuery Studio 笔记本代码作为 Dataproc Serverless 批量工作负载运行,请完成以下步骤。

      1. 在本地终端或 Cloud Shell 中,将笔记本代码下载到文件中。

        1. 在 Google Cloud 控制台的 BigQuery Studio 页面上的探索器面板中打开笔记本。

        2. 文件菜单中选择下载来下载笔记本代码,然后选择 Download .py

          “探索器”页面上的“文件”>“下载”菜单。
      2. 生成 requirements.txt

        1. pipreqs 安装到您保存 .py 文件的目录中。
          pip install pipreqs
          
        2. 运行 pipreqs 以生成 requirements.txt

          pipreqs filename.py
          

        3. 使用 gsutil 工具将本地 requirements.txt 文件复制到 Cloud Storage 中的存储桶。

          gsutil cp requirements.txt gs://BUCKET/
          
      3. 通过修改下载的 .py 文件来更新 Spark 会话代码。

        1. 移除或注释掉所有 Shell 脚本命令。

        2. 移除用于配置 Spark 会话的代码,然后将配置参数指定为批量工作负载提交参数。(请参阅提交 Spark 批量工作负载)。

          示例:

          • 从代码中移除以下会话子网配置行:

            session.environment_config.execution_config.subnetwork_uri = "{subnet_name}"
            

          • 运行批量工作负载时,请使用 --subnet 标志指定子网。

            gcloud dataproc batches submit pyspark \
            --subnet=SUBNET_NAME
            
        3. 使用简单的会话创建代码段。

          • 简化前的下载笔记本代码示例。

            from google.cloud.dataproc_spark_connect import DataprocSparkSession
            from google.cloud.dataproc_v1 import Session
            

            session = Session() spark = DataprocSparkSession \     .builder \     .appName("CustomSparkSession")     .dataprocSessionConfig(session) \     .getOrCreate()

          • 简化后的批量工作负载代码。

            from pyspark.sql import SparkSession
            

            spark = SparkSession \ .builder \ .getOrCreate()

      4. 运行批量工作负载。

        1. 如需查看相关说明,请参阅提交 Spark 批量工作负载

          • 请务必添加 --deps-bucket 标志,以指向包含您的 requirements.txt 文件的 Cloud Storage 存储桶。

            示例:

          gcloud dataproc batches submit pyspark FILENAME.py \
              --region=REGION \
              --deps-bucket=BUCKET \
              --version=2.3 
          

          注意:

          • FILENAME:您下载并修改的笔记本代码文件的名称。
          • REGION:您的集群所在的 Compute Engine 区域
          • BUCKET 包含 requirements.txt 文件的 Cloud Storage 存储桶的名称。
          • --version:选择 Spark 运行时版本 2.3 来运行批量工作负载。
      5. 提交代码。

        1. 测试批量工作负载代码后,您可以使用 git 客户端(例如 GitHub、GitLab 或 Bitbucket)将 .ipynb.py 文件提交到代码库,作为 CI/CD 流水线的一部分。
      6. 使用 Cloud Composer 安排批量工作负载。

        1. 如需查看相关说明,请参阅使用 Cloud Composer 运行 Dataproc Serverless 工作负载

      排查笔记本错误

      如果包含 Spark 代码的单元发生故障,您可以通过点击单元输出中的交互式会话详情视图链接来排查错误(请参阅 Wordcount 和 Iceberg 表示例)。

      已知问题和解决方案

      错误:使用 Python 版本 3.10 创建的笔记本运行时在尝试连接到 Spark 会话时可能会导致 PYTHON_VERSION_MISMATCH 错误。

      解决方案:使用 Python 版本 3.11 重新创建运行时。

      后续步骤