BigQuery Studio ノートブックで PySpark コードを実行する

このドキュメントでは、BigQuery Python ノートブックで PySpark コードを実行する方法について説明します。

始める前に

Google Cloud プロジェクトと Cloud Storage バケットをまだ作成していない場合は、作成します。

  1. プロジェクトを設定する

    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    3. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    6. 使用できる Cloud Storage バケットがない場合は、プロジェクトに Cloud Storage バケットを作成します。

    7. ノートブックを設定する

      1. ノートブック認証情報: デフォルトでは、ノートブック セッションでユーザー認証情報を使用します。セッションに対してサービス アカウント認証情報を指定する場合は、Dataproc ワーカー(roles/dataproc.worker ロール)が必要です。詳細については、Dataproc Serverless サービス アカウントをご覧ください。
      2. ノートブックのランタイム: 別のランタイムを選択しない限り、ノートブックはデフォルトの Vertex ランタイムを使用します。独自のランタイムを定義する場合は、 Google Cloud コンソールの [ランタイム] ページからランタイムを作成します。
    8. 料金

      料金については、BigQuery のノートブック ランタイムの料金をご覧ください。

      BigQuery Studio Python ノートブックを開く

      1. Google Cloud コンソールで、[BigQuery] ページに移動します。

        [BigQuery] に移動

      2. 詳細ペインのタブバーで、+ 記号の横にある 矢印をクリックし、[ノートブック] をクリックします。

      BigQuery Studio ノートブックで Spark セッションを作成する

      BigQuery Studio Python ノートブックを使用して、Spark Connect インタラクティブ セッションを作成できます。各 BigQuery Studio ノートブックには、アクティブな Dataproc Serverless セッションを 1 つだけ関連付けることが可能です。

      BigQuery Studio Python ノートブックで Spark セッションを作成するには、次の方法があります。

      • ノートブックで単一のセッションを構成して作成します。
      • Dataproc Serverless for Spark インタラクティブ セッション テンプレートで Spark セッションを構成し、テンプレートを使用してノートブックでセッションを構成して作成します。BigQuery には、[Templated Spark session] タブで説明されているように、テンプレート化されたセッションのコーディングを開始するのに役立つ Query using Spark 機能があります。

      1 回のみ

      新しいノートブックで Spark セッションを作成するには、次の操作を行います。

      1. エディタペインのタブバーで、+ 記号の横にある 矢印プルダウンをクリックし、[ノートブック] をクリックします。

        新しいノートブックを作成するための [+] ボタンがある BigQuery インターフェースを示すスクリーンショット。
      2. 次のコードをノートブック セルにコピーして実行し、基本的な Spark セッションを構成して作成します。

      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      
      import pyspark.sql.connect.functions as f
      
      session = Session()
      
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      

      次のように置き換えます。

      • APP_NAME: セッションのオプションの名前。
      • オプションのセッション設定: Dataproc API の Session 設定を追加して、セッションをカスタマイズできます。次に例を示します。
        • RuntimeConfig:
          session.runtime.config オプションを示すコードヘルプ。
          • session.runtime_config.properties={spark.property.key1:VALUE_1,...,spark.property.keyN:VALUE_N}
          • session.runtime_config.container_image = path/to/container/image
        • EnvironmentConfig:
          session-environment-config-execution-config オプションを示すコードヘルプ。
          • session.environment_config.execution_config.subnetwork_uri = "SUBNET_NAME"
          • session.environment_config.execution_config.ttl = {"seconds": VALUE}
          • session.environment_config.execution_config.service_account = SERVICE_ACCOUNT

      テンプレート化された Spark セッション

      ノートブック セルにコードを入力して実行し、既存の Dataproc Serverless セッション テンプレートに基づいて Spark セッションを作成できます。ノートブック コードで指定した session 構成設定は、セッション テンプレートで設定されている同じ設定をオーバーライドします。

      すぐに始めるには、Query using Spark テンプレートを使用して、Spark セッション テンプレート コードでノートブックを事前入力します。

      1. エディタペインのタブバーで、+ 記号の横にある 矢印プルダウンをクリックし、[ノートブック] をクリックします。
        新しいノートブックを作成するための [+] ボタンがある BigQuery インターフェースを示すスクリーンショット。
      2. [テンプレートを使って開始] で [Spark を使用したクエリ]、[テンプレートを使用] の順にクリックして、ノートブックにコードを挿入します。
        テンプレートで開始するための BigQuery UI の選択
      3. に記載されているように変数を指定します。
      4. ノートブックに挿入された追加のサンプルコード セルは削除できます。
      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      session = Session()
      # Configure the session with an existing session template.
      session_template = "SESSION_TEMPLATE"
      session.session_template = f"projects/{project}/locations/{location}/sessionTemplates/{session_template}"
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      

      次のように置き換えます。

      BigQuery Studio ノートブックで PySpark コードを記述して実行する

      ノートブックで Spark セッションを作成したら、そのセッションを使用してノートブックで Spark ノートブック コードを実行します。

      Spark Connect PySpark API のサポート: Spark Connect ノートブック セッションは、DataFrameFunctionsColumn など、ほとんどの PySpark API をサポートしていますが、SparkContextRDD、その他の PySpark API はサポートしていません。詳細については、Spark 3.5 でサポートされている内容をご覧ください。

      Dataproc 固有の API: Dataproc は、addArtifacts メソッドを拡張することで、PyPI パッケージを Spark セッションに動的に追加する処理を簡素化します。このリストは、version-scheme 形式(pip install に似ている)で指定できます。これにより、Spark Connect サーバーに、すべてのクラスタノードにパッケージとその依存関係をインストールするよう指示し、UDF のワーカーで使用できるようにします。

      指定された textdistance バージョンと、互換性のある最新の random2 ライブラリをクラスタにインストールして、textdistancerandom2 を使用する UDF がワーカーノードで実行できるようにする例。

      spark.addArtifacts("textdistance==4.6.1", "random2", pypi=True)
      

      ノートブックのコードヘルプ: BigQuery Studio ノートブックでは、クラス名またはメソッド名にポインタを合わせるとコードヘルプが表示され、コードを入力するときにコード補完ヘルプが表示されます。

      次の例では、DataprocSparkSession を入力しています。このクラス名にポインタを合わせると、コード補完とドキュメントのヘルプが表示されます。

      コード ドキュメントとコード補完のヒントの例。

      BigQuery Studio ノートブックの PySpark の例

      このセクションでは、次のタスクを実行する PySpark コードを含む BigQuery Studio Python ノートブックの例を示します。

      • 一般公開の Shakespeare データセットに対して WordCount を実行します。
      • BigLake metastore に保存されたメタデータを使用して Iceberg テーブルを作成します。

      ワード数

      次の Pyspark の例では、Spark セッションを作成し、一般公開の bigquery-public-data.samples.shakespeare データセット内の単語の出現回数をカウントします。

      # Basic wordcount example
      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      session = Session()
      
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      # Run a wordcount on the public Shakespeare dataset.
      df = spark.read.format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load()
      words_df = df.select(f.explode(f.split(f.col("word"), " ")).alias("word"))
      word_counts_df = words_df.filter(f.col("word") != "").groupBy("word").agg(f.count("*").alias("count")).orderBy("word")
      word_counts_df.show()
      

      次のように置き換えます。

      • APP_NAME: セッションのオプションの名前。

      出力:

      セル出力には、ワードカウント出力のサンプルが一覧表示されます。 Google Cloud コンソールでセッションの詳細を表示するには、[Interactive Session Detail View] リンクをクリックします。Spark セッションをモニタリングするには、セッションの詳細ページで [Spark UI を表示] をクリックします。

      コンソールのセッションの詳細ページにある [Spark UI を表示] ボタン
      Interactive Session Detail View: LINK
      +------------+-----+
      |        word|count|
      +------------+-----+
      |           '|   42|
      |       ''All|    1|
      |     ''Among|    1|
      |       ''And|    1|
      |       ''But|    1|
      |    ''Gamut'|    1|
      |       ''How|    1|
      |        ''Lo|    1|
      |      ''Look|    1|
      |        ''My|    1|
      |       ''Now|    1|
      |         ''O|    1|
      |      ''Od's|    1|
      |       ''The|    1|
      |       ''Tis|    4|
      |      ''When|    1|
      |       ''tis|    1|
      |      ''twas|    1|
      |          'A|   10|
      |'ARTEMIDORUS|    1|
      +------------+-----+
      only showing top 20 rows
      

      Iceberg テーブル

      PySpark コードを実行して、BigLake metastore メタデータを含む Iceberg テーブルを作成する

      次のサンプルコードでは、BigLake metastore にテーブル メタデータを保存する sample_iceberg_table を作成し、テーブルをクエリします。

      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      # Create the Dataproc Serverless session.
      session = Session()
      # Set the session configuration for BigLake Metastore with the Iceberg environment.
      project = "PROJECT"
      region = "REGION"
      subnet_name = "SUBNET_NAME"
      location = "LOCATION"
      session.environment_config.execution_config.subnetwork_uri = f"{subnet_name}"
      warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY"
      catalog = "CATALOG_NAME"
      namespace = "NAMESPACE"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}"] = "org.apache.iceberg.spark.SparkCatalog"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_project"] = f"{project_id}"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_location"] = f"{location}"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.warehouse"] = f"{warehouse_dir}"
      # Create the Spark Connect session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      # Create the namespace in BigQuery.
      spark.sql(f"USE `{catalog}`;")
      spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;")
      spark.sql(f"USE `{namespace}`;")
      # Create the Iceberg table.
      spark.sql("DROP TABLE IF EXISTS `sample_iceberg_table`");
      spark.sql("CREATE TABLE sample_iceberg_table (id int, data string) USING ICEBERG;")
      spark.sql("DESCRIBE sample_iceberg_table;")
      # Insert table data and query the table.
      spark.sql("INSERT INTO sample_iceberg_table VALUES (1, \"first row\");")
      # Alter table, then query and display table data and schema.
      spark.sql("ALTER TABLE sample_iceberg_table ADD COLUMNS (newDoubleCol double);")
      spark.sql("DESCRIBE sample_iceberg_table;")
      df = spark.sql("SELECT * FROM sample_iceberg_table")
      df.show()
      df.printSchema()
      

      注:

      • PROJECT: プロジェクト ID は、Google Cloud コンソール ダッシュボードの [プロジェクト情報] セクションに表示されます。
      • REGIONSUBNET_NAME: Compute Engine リージョンと、セッション リージョンのサブネットの名前を指定します。Dataproc Serverless は、指定されたサブネットでプライベート Google アクセス(PGA)を有効にします。
      • LOCATION: デフォルトの BigQuery_metastore_config.locationspark.sql.catalog.{catalog}.gcp_locationUS ですが、サポートされている BigQuery のロケーションを選択できます。
      • BUCKETWAREHOUSE_DIRECTORY: Iceberg ウェアハウス ディレクトリに使用する Cloud Storage バケットとフォルダ。
      • CATALOG_NAMENAMESPACE: Iceberg テーブルを識別するための Iceberg カタログ名と名前空間の組み合わせ(catalog.namespace.table_name)。
      • APP_NAME: セッションのオプションの名前。

      セルの出力には、追加された列を含む sample_iceberg_table が一覧表示され、 Google Cloud コンソールの [インタラクティブ セッションの詳細] ページへのリンクが表示されます。セッションの詳細ページで [Spark UI を表示] をクリックすると、Spark セッションをモニタリングできます。

      Interactive Session Detail View: LINK
      +---+---------+------------+
      | id|     data|newDoubleCol|
      +---+---------+------------+
      |  1|first row|        NULL|
      +---+---------+------------+
      
      root
       |-- id: integer (nullable = true)
       |-- data: string (nullable = true)
       |-- newDoubleCol: double (nullable = true)
      

      BigQuery でテーブルの詳細を表示する

      BigQuery で Iceberg テーブルの詳細を確認する手順は次のとおりです。

      1. Google Cloud コンソールで、[BigQuery] ページに移動します。

        [BigQuery] に移動

      2. プロジェクト リソース ペインで、プロジェクトをクリックし、名前空間をクリックして sample_iceberg_table テーブルを一覧表示します。[詳細] テーブルをクリックして、オープン カタログ テーブルの構成情報を表示します。

        入力形式と出力形式は、Iceberg が使用する標準の Hadoop InputFormat クラス形式と OutputFormat クラス形式です。

        BigQuery UI に表示される Iceberg テーブルのメタデータ

      その他の例

      Pandas DataFrame(df)から Spark DataFramesdf)を作成します。

      sdf = spark.createDataFrame(df)
      sdf.show()
      

      Spark DataFrames で集計を実行します。

      from pyspark.sql import functions as F
      
      sdf.groupby("segment").agg(
         F.mean("total_spend_per_user").alias("avg_order_value"),
         F.approx_count_distinct("user_id").alias("unique_customers")
      ).show()
      

      Spark-BigQuery コネクタを使用して BigQuery から読み取ります。

      spark.conf.set("viewsEnabled","true")
      spark.conf.set("materializationDataset","my-bigquery-dataset")
      
      sdf = spark.read.format('bigquery') \
       .load(query)
      

      Gemini Code Assist を使用して Spark コードを記述する

      Gemini Code Assist に、ノートブックで PySpark コードを生成するよう依頼できます。Gemini Code Assist は、関連する BigQuery テーブルと Dataproc Metastore テーブルとそのスキーマを取得して使用し、コード レスポンスを生成します。

      ノートブックで Gemini Code Assist コードを生成する手順は次のとおりです。

      1. ツールバーで [+ コード] をクリックして、新しいコードセルを挿入します。新しいコードセルに Start coding or generate with AI が表示されます。[生成] をクリックします。

      2. 生成エディタで、自然言語プロンプトを入力して enter をクリックします。プロンプトには、キーワード spark または pyspark を含めてください。

        プロンプトの例:

        create a spark dataframe from order_items and filter to orders created in 2024
        

        出力例:

        spark.read.format("bigquery").option("table", "sqlgen-testing.pysparkeval_ecommerce.order_items").load().filter("year(created_at) = 2024").createOrReplaceTempView("order_items")
        df = spark.sql("SELECT * FROM order_items")
        

      Gemini Code Assist のコード生成に関するヒント

      • Gemini Code Assist で関連するテーブルとスキーマを取得できるようにするには、Dataproc Metastore インスタンスの Data Catalog の同期を有効にします。

      • ユーザー アカウントに、Data Catalog のクエリテーブルへのアクセス権があることを確認します。そのためには、DataCatalog.Viewer ロールを割り当てます。

      Spark セッションを終了する

      BigQuery Studio ノートブックで Spark Connect セッションを停止するには、次のいずれかを行います。

      • ノートブックのセルで spark.stop() を実行します。
      • ノートブックでランタイムを終了します。
        1. ランタイム セレクタをクリックし、[セッションの管理] をクリックします。
          セッションの選択を管理する
        2. [アクティブなセッション] ダイアログで、終了アイコンをクリックし、[終了] をクリックします。
          [アクティブなセッション] ダイアログでセッションの選択を終了する

      BigQuery Studio ノートブック コードをオーケストレートする

      BigQuery Studio ノートブック コードをオーケストレートするには、次の方法があります。

      • Google Cloud コンソールからノートブック コードをスケジュールします(ノートブックの料金が適用されます)。

      • ノートブック コードを Dataproc Serverless バッチ ワークロードとして実行します(Dataproc Serverless の料金が適用されます)。

      Google Cloud コンソールからノートブック コードのスケジュールを設定する

      ノートブック コードは次の方法でスケジュールできます。

      ノートブック コードを Dataproc Serverless バッチ ワークロードとして実行する

      BigQuery Studio ノートブック コードを Dataproc Serverless バッチ ワークロードとして実行するには、次の操作を行います。

      1. ノートブック コードをローカル ターミナルまたは Cloud Shell のファイルにダウンロードします。

        1. Google Cloud コンソールの BigQuery Studio ページの [エクスプローラ] パネルでノートブックを開きます。

        2. [ファイル] メニューから [ダウンロード] を選択し、Download .py を選択してノートブック コードをダウンロードします。

          [エクスプローラ] ページの [ファイル] > [ダウンロード] メニュー。
      2. requirements.txt を生成します。

        1. .py ファイルを保存したディレクトリに pipreqs をインストールします。
          pip install pipreqs
          
        2. pipreqs を実行して requirements.txt を生成します。

          pipreqs filename.py
          

        3. Google Cloud CLI を使用して、ローカルの requirements.txt ファイルを Cloud Storage のバケットにコピーします。

          gcloud storage cp requirements.txt gs://BUCKET/
          
      3. ダウンロードした .py ファイルを編集して、Spark セッション コードを更新します。

        1. シェル スクリプト コマンドを削除するか、コメントアウトします。

        2. Spark セッションを構成するコードを削除し、構成パラメータをバッチ ワークロード送信パラメータとして指定します。(Spark バッチ ワークロードを送信するをご覧ください)。

          例:

          • コードから次のセッション サブネット構成行を削除します。

            session.environment_config.execution_config.subnetwork_uri = "{subnet_name}"
            

          • バッチ ワークロードを実行する場合は、--subnet フラグを使用してサブネットを指定します。

            gcloud dataproc batches submit pyspark \
            --subnet=SUBNET_NAME
            
        3. 単純なセッション作成コード スニペットを使用する。

          • 簡素化前のダウンロードされたノートブック コードのサンプル。

            from google.cloud.dataproc_spark_connect import DataprocSparkSession
            from google.cloud.dataproc_v1 import Session
            

            session = Session() spark = DataprocSparkSession \     .builder \     .appName("CustomSparkSession")     .dataprocSessionConfig(session) \     .getOrCreate()

          • 簡素化後のバッチ ワークロード コード。

            from pyspark.sql import SparkSession
            

            spark = SparkSession \ .builder \ .getOrCreate()

      4. バッチ ワークロードを実行する。

        1. 手順については、Spark バッチ ワークロードを送信するをご覧ください。

          • --deps-bucket フラグを含めて、requirements.txt ファイルを含む Cloud Storage バケットを指定してください。

            例:

          gcloud dataproc batches submit pyspark FILENAME.py \
              --region=REGION \
              --deps-bucket=BUCKET \
              --version=2.3 
          

          注:

          • FILENAME: ダウンロードして編集したノートブック コード ファイルの名前。
          • REGION: クラスタが配置されている Compute Engine リージョン
          • BUCKET: requirements.txt ファイルを含む Cloud Storage バケットの名前。
          • --version: バッチ ワークロードを実行するために Spark ランタイム バージョン 2.3 が選択されています。
      5. コードを commit します。

        1. バッチ ワークロード コードをテストしたら、CI/CD パイプラインの一部として、GitHub、GitLab、Bitbucket などの git クライアントを使用して、.ipynb ファイルまたは .py ファイルをリポジトリに commit できます。
      6. Cloud Composer でバッチ ワークロードのスケジュールを設定します。

        1. 手順については、Cloud Composer で Dataproc Serverless ワークロードを実行するをご覧ください。

      ノートブック エラーのトラブルシューティング

      Spark コードを含むセルでエラーが発生した場合は、セル出力の [Interactive Session Detail View] リンクをクリックして、エラーのトラブルシューティングを行えます(Wordcount と Iceberg テーブルの例をご覧ください)。

      既知の問題と解決策

      エラー: Python バージョン 3.10 で作成されたノートブック ランタイムが Spark セッションに接続しようとすると、PYTHON_VERSION_MISMATCH エラーを引き起こすことがあります。

      解決策: Python バージョン 3.11 でランタイムを再作成します。

      次のステップ