Apache Spark ストアド プロシージャを操作する
このドキュメントは、BigQuery で Spark ストアド プロシージャを作成して呼び出すデータ エンジニア、データ サイエンティスト、データ アナリストを対象としています。
BigQuery では、Python、Java、Scala で記述された Spark ストアド プロシージャを作成できます。これらのストアド プロシージャは、SQL ストアド プロシージャと同様に、GoogleSQL クエリを使用して BigQuery で実行できます。
準備
Spark のストアド プロシージャを作成するには、管理者に Spark 接続の作成と共有を依頼します。また、管理者は接続に関連付けされたサービス アカウントに必要な Identity and Access Management(IAM)権限を付与する必要があります。
必要なロール
このドキュメントのタスクの実行に必要な権限を取得するには、管理者に次の IAM のロールを付与するよう依頼してください。
-
Spark ストアド プロシージャを作成する:
- ストアド プロシージャを作成するデータセットに対する BigQuery データ編集者(
roles/bigquery.dataEditor
) - ストアド プロシージャが使用する接続に対する BigQuery Connection 管理者(
roles/bigquery.connectionAdmin
) -
プロジェクトに対する BigQuery ジョブユーザー (
roles/bigquery.jobUser
)
- ストアド プロシージャを作成するデータセットに対する BigQuery データ編集者(
-
Spark ストアド プロシージャを呼び出す:
- ストアド プロシージャが格納されているデータセットに対する BigQuery メタデータ閲覧者(
roles/bigquery.metadataViewer
) - 接続に対する BigQuery Connection ユーザー(
roles/bigquery.connectionUser
) -
プロジェクトに対する BigQuery ジョブユーザー (
roles/bigquery.jobUser
)
- ストアド プロシージャが格納されているデータセットに対する BigQuery メタデータ閲覧者(
ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。
これらの事前定義ロールには、このドキュメントのタスクを実行するために必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。
必要な権限
このドキュメントのタスクを実行するには、次の権限が必要です。
-
接続を作成する:
-
bigquery.connections.create
-
bigquery.connections.list
-
-
Spark ストアド プロシージャを作成する:
-
bigquery.routines.create
-
bigquery.connections.delegate
-
bigquery.jobs.create
-
-
Spark ストアド プロシージャを呼び出す:
-
bigquery.routines.get
-
bigquery.connections.use
-
bigquery.jobs.create
-
カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。
ロケーションに関する考慮事項
ストアド プロシージャは接続と同じロケーションで実行されるため、接続と同じロケーションに Spark 用ストアド プロシージャを作成する必要があります。たとえば、US マルチリージョンにストアド プロシージャを作成するには、US マルチリージョンにある接続を使用します。
料金
BigQuery で Spark プロシージャを実行する場合の料金は、Dataproc Serverless で Spark プロシージャを実行する場合の料金と同じです。詳細については、Dataproc Serverless の料金をご覧ください。
Spark ストアド プロシージャは、オンデマンド料金モデルと BigQuery エディションのいずれでも使用できます。Spark プロシージャの課金は、プロジェクトで使用されているコンピューティング料金モデルに関係なく、常に BigQuery Enterprise エディションの従量課金モデルに基づいて行われます。
BigQuery 用の Spark ストアド プロシージャは、予約やコミットメントの使用をサポートしていません。既存の予約とコミットメントは、サポートされている他のクエリとプロシージャで引き続き使用されます。Spark ストアド プロシージャの使用料金は、Enterprise エディションの従量課金制の料金で請求されます。組織の割引が適用されます(該当する場合)。
Spark ストアド プロシージャは Spark 実行エンジンを使用しますが、Spark の実行に対しては別途料金は発生しません。前述のように、対応する料金は BigQuery Enterprise エディションの従量課金制 SKU として報告されます。
Spark ストアド プロシージャに無料枠はありません。
Spark ストアド プロシージャを作成する
ストアド プロシージャは、使用する接続と同じロケーションに作成する必要があります。
ストアド プロシージャの本文が 1 MB を超える場合は、インライン コードを使用する代わりに、Cloud Storage バケット内のファイルにストアド プロシージャを配置することをおすすめします。BigQuery には、Python を使用して Spark ストアド プロシージャを作成するための 2 つの方法が用意されています。
CREATE PROCEDURE
ステートメントを使用する場合は、SQL クエリエディタを使用します。- Python コードを直接入力する場合は、PySpark エディタを使用します。コードをストアド プロシージャとして保存できます。
SQL クエリエディタを使用する
SQL クエリエディタで Spark ストアド プロシージャを作成するには、次の操作を行います。
[BigQuery] ページに移動します。
クエリエディタで、表示された
CREATE PROCEDURE
ステートメントのサンプルコードを追加します。または、[エクスプローラ] ペインで、接続リソースの作成に使用したプロジェクトの接続をクリックします。[
ストアド プロシージャを作成] をクリックして、Spark ストアド プロシージャを作成します。Python
Python で Spark ストアド プロシージャを作成するには、次のサンプルを使用します。
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Java または Scala
main_file_uri
オプションを使用して Java または Scala で Spark ストアド プロシージャを作成するには、次のサンプルコードを使用します。CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_JAR_URI"]); LANGUAGE JAVA|SCALA
main_class
オプションとjar_uris
オプションを使用して Java または Scala で Spark ストアド プロシージャを作成するには、次のサンプルコードを使用します。CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_class=["CLASS_NAME"], jar_uris=["URI"]); LANGUAGE JAVA|SCALA
次のように置き換えます。
PROJECT_ID
: ストアド プロシージャを作成するプロジェクト。例:myproject
DATASET
: ストアド プロシージャを作成するデータセット。例:mydataset
PROCEDURE_NAME
: BigQuery で実行するストアド プロシージャの名前。例:mysparkprocedure
PROCEDURE_ARGUMENT
: 入力引数を入力するパラメータ。このパラメータで、次のフィールドを指定します。
ARGUMENT_MODE
: 引数のモード。有効な値は、
IN
、OUT
、INOUT
です。デフォルトの値はIN
です。ARGUMENT_NAME
: 引数の名前。ARGUMENT_TYPE
: 引数のタイプ。
例:
myproject.mydataset.mysparkproc(num INT64)
。詳細については、このドキュメントの「
IN
パラメータとして値を渡す」または「OUT
パラメータとINOUT
パラメータとして値を渡す」をご覧ください。CONNECTION_PROJECT_ID
: Spark プロシージャを実行するための接続を含むプロジェクト。CONNECTION_REGION
: Spark プロシージャを実行するための接続を含むリージョン。例:us
CONNECTION_ID
: 接続 ID。例:myconnection
Google Cloud コンソールで接続の詳細を表示する場合、接続 ID は接続 ID に表示される完全修飾接続 ID の最後のセクションの値です。例:
projects/myproject/locations/connection_location/connections/myconnection
RUNTIME_VERSION
: Spark のランタイム バージョン。例:1.1
MAIN_PYTHON_FILE_URI
: PySpark ファイルのパス。例:gs://mybucket/mypysparkmain.py
また、
CREATE PROCEDURE
ステートメントでストアド プロシージャの本文を追加する場合は、このドキュメントのインライン コードを使用するの例に示すように、LANGUAGE PYTHON AS
の後にPYSPARK_CODE
を追加します。PYSPARK_CODE
: プロシージャの本文をインラインで渡す場合のCREATE PROCEDURE
ステートメント内の PySpark アプリケーションの定義値は文字列リテラルです。コードに引用符やバックスラッシュが含まれる場合は、エスケープするか、元の文字列として表す必要があります。たとえば、
"\n";
を返すコードは次のいずれかで表されます。- 引用符付き文字列:
"return \"\\n\";"
。引用符とバックスラッシュはエスケープされます。 - 三重引用符付き文字列:
"""return "\\n";"""
。バックスラッシュはエスケープされますが、引用符はエスケープされません。 - 元の文字列:
r"""return "\n";"""
。エスケープは不要です。
- 引用符付き文字列:
MAIN_JAR_URI
:main
クラスを含む JAR ファイルのパス。例:gs://mybucket/my_main.jar
CLASS_NAME
:jar_uris
オプションが設定された JAR セットのクラスの完全修飾名。例:com.example.wordcount
URI
:main
クラスで指定されたクラスを含む JAR ファイルのパス。例:gs://mybucket/mypysparkmain.jar
OPTIONS
で指定できるその他のオプションについては、プロシージャ オプションのリストをご覧ください。
PySpark エディタを使用する
PySpark エディタを使用してプロシージャを作成する場合は、CREATE PROCEDURE
ステートメントを使用する必要はありません。代わりに、Pyspark エディタで直接 Python コードを追加し、コードを保存または実行します。
PySpark エディタで Spark ストアド プロシージャを作成するには、次の操作を行います。
[BigQuery] ページに移動します。
PySpark コードを直接入力する場合は、PySpark エディタを開きます。PySpark エディタを開くには、[
SQL クエリを作成] の横にある メニューをクリックし、[PySpark プロシージャを作成] を選択します。オプションを設定するには、[詳細] > [PySpark オプション] をクリックして、次の操作を行います。
PySpark コードを実行する場所を指定します。
[接続] フィールドで、Spark 接続を指定します。
[ストアド プロシージャの呼び出し] セクションで、生成される一時ストアド プロシージャを保存するデータセットを指定します。PySpark コードを呼び出すために、特定のデータセットを設定することも、一時的なデータセットを使用することもできます。
前の手順で指定したロケーションで一時データセットが生成されます。データセット名を指定する場合は、データセットと Spark 接続が同じロケーションにある必要があります。
[パラメータ] セクションで、ストアド プロシージャのパラメータを定義します。パラメータの値は、セッション中に PySpark コードが実行されるときにのみ使用されますが、宣言自体はプロシージャに格納されます。
[詳細オプション] セクションで、プロシージャのオプションを指定します。プロシージャのオプションの詳細については、プロシージャのオプションのリストをご覧ください。
[プロパティ] セクションで、Key-Value ペアを追加してジョブを構成します。Dataproc Serverless Spark プロパティの任意の Key-Value ペアを使用できます。
[サービス アカウントの設定] で、セッション内での PySpark コードの実行中に使用するカスタム サービス アカウント、CMEK、ステージング データセット、ステージング Cloud Storage フォルダを指定します。
[保存] をクリックします。
Spark 用ストアド プロシージャを保存する
PySpark エディタを使用してストアド プロシージャを作成した後、ストアド プロシージャを保存できます。方法は次のとおりです。
Google Cloud コンソールで [BigQuery] ページに移動します。
クエリエディタで、Python と PySpark エディタを使用して Spark 用ストアド プロシージャを作成します。
[保存] > [プロシージャを保存] をクリックします。
[ストアド プロシージャの保存] ダイアログで、ストアド プロシージャを保存するデータセット名とストアド プロシージャの名前を指定します。
[保存] をクリックします。
PySpark コードをストアド プロシージャとして保存するのではなく、実行するだけの場合は、[保存] ではなく [実行] をクリックします。
カスタム コンテナを使用する
カスタム コンテナは、ワークロードのドライバとエグゼキュータのプロセス用のランタイム環境を提供します。カスタム コンテナを使用するには、次のサンプルコードを使用します。
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
次のように置き換えます。
PROJECT_ID
: ストアド プロシージャを作成するプロジェクト。例:myproject
DATASET
: ストアド プロシージャを作成するデータセット。例:mydataset
PROCEDURE_NAME
: BigQuery で実行するストアド プロシージャの名前。例:mysparkprocedure
PROCEDURE_ARGUMENT
: 入力引数を入力するパラメータ。このパラメータで、次のフィールドを指定します。
ARGUMENT_MODE
: 引数のモード。有効な値は、
IN
、OUT
、INOUT
です。デフォルトの値はIN
です。ARGUMENT_NAME
: 引数の名前。ARGUMENT_TYPE
: 引数のタイプ。
例:
myproject.mydataset.mysparkproc(num INT64)
。詳細については、このドキュメントの「
IN
パラメータとして値を渡す」または「OUT
パラメータとINOUT
パラメータとして値を渡す」をご覧ください。CONNECTION_PROJECT_ID
: Spark プロシージャを実行するための接続を含むプロジェクト。CONNECTION_REGION
: Spark プロシージャを実行するための接続を含むリージョン。例:us
CONNECTION_ID
: 接続 ID。例:myconnection
Google Cloud コンソールで接続の詳細を表示する場合、接続 ID は接続 ID に表示される完全修飾接続 ID の最後のセクションの値です。例:
projects/myproject/locations/connection_location/connections/myconnection
RUNTIME_VERSION
: Spark のランタイム バージョン。例:1.1
MAIN_PYTHON_FILE_URI
: PySpark ファイルのパス。例:gs://mybucket/mypysparkmain.py
また、
CREATE PROCEDURE
ステートメントでストアド プロシージャの本文を追加する場合は、このドキュメントのインライン コードを使用するの例に示すように、LANGUAGE PYTHON AS
の後にPYSPARK_CODE
を追加します。PYSPARK_CODE
: プロシージャの本文をインラインで渡す場合のCREATE PROCEDURE
ステートメント内の PySpark アプリケーションの定義値は文字列リテラルです。コードに引用符やバックスラッシュが含まれる場合は、エスケープするか、元の文字列として表す必要があります。たとえば、コード
"\n";
は次のいずれかで表されます。- 引用符付き文字列:
"return \"\\n\";"
。引用符とバックスラッシュはエスケープされます。 - 三重引用符付き文字列:
"""return "\\n";"""
。バックスラッシュはエスケープされますが、引用符はエスケープされません。 - 元の文字列:
r"""return "\n";"""
。エスケープは不要です。
- 引用符付き文字列:
CONTAINER_IMAGE
: Artifact Registry 内のイメージのパス。プロシージャで使用するライブラリのみを含める必要があります。指定しない場合、ランタイム バージョンに関連付けられたシステムのデフォルトのコンテナ イメージが使用されます。
Spark を使用してカスタム コンテナ イメージをビルドする方法については、カスタム コンテナ イメージをビルドするをご覧ください。
Spark ストアド プロシージャを呼び出す
ストアド プロシージャを作成したら、次のいずれかの方法で呼び出すことができます。
コンソール
BigQuery ページに移動します。
[エクスプローラ] ペインでプロジェクトを開き、実行する Spark のストアド プロシージャを選択します。
[ストアド プロシージャ情報] ウィンドウで、[ストアド プロシージャを呼び出す] をクリックします。あるいは、[アクションを表示] オプションを展開して [呼び出す] をクリックします。
[実行] をクリックします。
[すべての結果] セクションで、[結果を表示] をクリックします。
省略可: [クエリ結果] セクションで、次の操作を行います。
Spark ドライバのログを表示するには、[実行の詳細] をクリックします。
Cloud Logging でログを表示する場合、[ジョブ情報] をクリックし、[ログ] フィールドで [ログ] をクリックします。
Spark History Server エンドポイントを取得する場合は、[ジョブ情報]、[Spark History Server] の順にクリックします。
SQL
ストアド プロシージャを呼び出すには、CALL PROCEDURE
ステートメントを使用します。
Google Cloud コンソールで [BigQuery] ページに移動します。
クエリエディタで次のステートメントを入力します。
CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()
[
実行] をクリックします。
クエリの実行方法については、インタラクティブ クエリを実行するをご覧ください。
カスタム サービス アカウントを使用する
Spark コード内のデータにアクセスする場合、データアクセスに Spark 接続のサービス ID を使用する代わりに、カスタム サービス アカウントを使用できます。
カスタム サービス アカウントを使用するには、Spark ストアド プロシージャの作成時に INVOKER
セキュリティ モードを指定します(EXTERNAL SECURITY INVOKER
ステートメントを使用)。また、ストアド プロシージャを呼び出すときに、サービス アカウントを指定します。
Cloud Storage から Spark コードにアクセスして使用するには、Spark 接続のサービス ID に必要な権限を付与する必要があります。接続のサービス アカウントに storage.objects.get
IAM 権限または storage.objectViewer
IAM ロールを付与する必要があります。
接続で Dataproc Metastore と Dataproc Persistent History Server を指定した場合、これらに対するアクセス権を接続のサービス アカウントに付与できます。詳細については、サービス アカウントへのアクセス権を付与するをご覧ください。
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) EXTERNAL SECURITY INVOKER WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE] SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
必要に応じて、次のコードに次の引数を追加できます。
SET @@spark_proc_properties.staging_bucket='BUCKET_NAME'; SET @@spark_proc_properties.staging_dataset_id='DATASET';
次のように置き換えます。
CUSTOM_SERVICE_ACCOUNT
: 必須。ユーザーが提供するカスタム サービス アカウント。BUCKET_NAME
: 省略可。デフォルトの Spark アプリケーション ファイル システムとして使用される Cloud Storage バケット。指定しない場合、プロジェクトにデフォルトの Cloud Storage バケットが作成され、同じプロジェクトで実行されるすべてのジョブでバケットが共有されます。DATASET
: 省略可。プロシージャの呼び出しによって生成される一時データを格納するデータセット。ジョブが完了すると、データはクリーンアップされます。指定しなかった場合、ジョブ用にデフォルトの一時データセットが作成されます。
カスタム サービス アカウントには、次の権限が必要です。
デフォルトの Spark アプリケーション ファイル システムとして使用されるステージング バケットに読み書きするには:
- 指定したステージング バケットに対する
storage.objects.*
権限またはroles/storage.objectAdmin
IAM ロール。 - また、ステージング バケットが指定されていない場合は、プロジェクトに対する
storage.buckets.*
権限またはroles/storage.Admin
IAM ロール。
- 指定したステージング バケットに対する
(省略可)BigQuery との間でデータを読み書きするには:
- BigQuery テーブルに対する
bigquery.tables.*
。 - プロジェクトに対する
bigquery.readsessions.*
。 roles/bigquery.admin
IAM ロールには、以前の権限が含まれています。
- BigQuery テーブルに対する
(省略可)Cloud Storage との間でデータの読み取りと書き込みを行うには:
- Cloud Storage オブジェクトに対する
storage.objects.*
権限またはroles/storage.objectAdmin
IAM ロール。
- Cloud Storage オブジェクトに対する
(省略可)
INOUT/OUT
パラメータに使用されるステージング データセットの読み取りと書き込みを行うには:- 指定したステージング データセットに対する
bigquery.tables.*
またはroles/bigquery.dataEditor
IAM ロール。 - また、ステージング データセットが指定されていない場合は、プロジェクトに対する
bigquery.datasets.create
権限またはroles/bigquery.dataEditor
IAM ロール。
- 指定したステージング データセットに対する
Spark ストアド プロシージャの例
このセクションでは、Apache Spark に対するストアド プロシージャの作成方法の例を示します。
Cloud Storage で PySpark または JAR ファイルを使用する
次の例は、my-project-id.us.my-connection
接続と、Cloud Storage バケットに保存されている PySpark または JAR ファイルを使用して、Spark 用ストアド プロシージャを作成する方法を示しています。
Python
CREATE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-pyspark-main.py") LANGUAGE PYTHON
Java または Scala
main_file_uri
を使用してストアド プロシージャを作成します。
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_file_uri="gs://my-bucket/my-scala-main.jar") LANGUAGE SCALA
main_class
を使用してストアド プロシージャを作成します。
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1", main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"]) LANGUAGE SCALA
インライン コードを使用する
次の例は、接続 my-project-id.us.my-connection
とインライン PySpark コードを使用して Spark に対するストアド プロシージャを作成する方法を示しています。
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() # Load data from BigQuery. words = spark.read.format("bigquery") \ .option("table", "bigquery-public-data:samples.shakespeare") \ .load() words.createOrReplaceTempView("words") # Perform word count. word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count") word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("wordcount_dataset.wordcount_output") """
入力パラメータとして値を渡す
次の例では、Python で入力パラメータとして値を渡す 2 つの方法を示しています。
方法 1: 環境変数を使用する
PySpark コードでは、Spark ドライバとエグゼキュータで環境変数を使用して、Spark に対するストアド プロシージャの入力パラメータを取得できます。環境変数の名前の形式は BIGQUERY_PROC_PARAM.PARAMETER_NAME
です。ここで、PARAMETER_NAME
は入力パラメータの名前です。たとえば、入力パラメータの名前が var
の場合、対応する環境変数の名前は BIGQUERY_PROC_PARAM.var
です。入力パラメータは JSON でエンコードされています。PySpark コードで、環境変数から JSON 文字列の入力パラメータ値を取得し、Python 変数にデコードできます。
次の例は、INT64
型の入力パラメータの値を PySpark コードに取得する方法を示しています。
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession import os import json spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc = spark.sparkContext # Get the input parameter num in JSON string and convert to a Python variable num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"])) """
方法 2: 組み込みライブラリを使用する
PySpark コードで組み込みライブラリをインポートして、すべてのタイプのパラメータを自動的に入力します。パラメータをエグゼキュータに渡すには、Spark ドライバのパラメータを Python 変数として設定し、その値をエグゼキュータに渡します。組み込みライブラリは、INTERVAL
、GEOGRAPHY
、NUMERIC
、BIGNUMERIC
を除く、ほとんどの BigQuery データ型をサポートしています。
BigQuery のデータ型 | Python のデータ型 |
---|---|
BOOL
|
bool
|
STRING
|
str
|
FLOAT64
|
float
|
INT64
|
int
|
BYTES
|
bytes
|
DATE
|
datetime.date
|
TIMESTAMP
|
datetime.datetime
|
TIME
|
datetime.time
|
DATETIME
|
datetime.datetime
|
Array
|
Array
|
Struct
|
Struct
|
JSON
|
Object
|
NUMERIC
|
サポート対象外 |
BIGNUMERIC
|
サポート対象外 |
INTERVAL
|
サポート対象外 |
GEOGRAPHY
|
サポート対象外 |
次の例は、組み込みライブラリをインポートして、INT64 型の入力パラメータと ARRAY<STRUCT<a INT64, b STRING>> 型の入力パラメータを PySpark コードに設定する方法を示しています。
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession from bigquery.spark.procedure import SparkProcParamContext def check_in_param(x, num): return x['a'] + num def main(): spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc=spark.sparkContext spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Get the input parameter num of type INT64 num = spark_proc_param_context.num # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>> info = spark_proc_param_context.info # Pass the parameter to executors df = sc.parallelize(info) value = df.map(lambda x : check_in_param(x, num)).sum() main() """
Java または Scala のコードでは、Spark ドライバとエグゼキュータの環境変数を使用して、Spark 用ストアド プロシージャの入力パラメータを取得できます。環境変数の名前の形式は BIGQUERY_PROC_PARAM.PARAMETER_NAME
です。ここで、PARAMETER_NAME
は入力パラメータの名前です。たとえば、入力パラメータの名前が var の場合、対応する環境変数の名前は BIGQUERY_PROC_PARAM.var
です。Java または Scala のコードでは、環境変数から入力パラメータ値を取得できます。
次の例は、環境変数から Scala コードへの入力パラメータの値を取得する方法を示しています。
val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get
次の例は、環境変数から Java コードへの入力パラメータを取得する方法を示しています。
String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");
OUT
パラメータと INOUT
パラメータとして値を渡す
出力パラメータは Spark プロシージャからの値を返します。INOUT
パラメータは、プロシージャの値を受け取って、プロシージャからの値を返します。OUT
パラメータと INOUT
パラメータを使用するには、Spark プロシージャを作成するときにパラメータ名の前に OUT
キーワードまたは INOUT
キーワードを追加します。PySpark コードでは、組み込みライブラリを使用して値を OUT
または INOUT
パラメータとして返します。入力パラメータと同様に、組み込みライブラリは INTERVAL
、GEOGRAPHY
、NUMERIC
、BIGNUMERIC
を除くほとんどの BigQuery データ型をサポートしています。TIME
型と DATETIME
型の値は、OUT
または INOUT
パラメータとして返されるときに UTC タイムゾーンに変換されます。
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON) WITH CONNECTION `my_bq_project.my_dataset.my_connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql.session import SparkSession import datetime from bigquery.spark.procedure import SparkProcParamContext spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate() spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Reading the IN and INOUT parameter values. int = spark_proc_param_context.int dt = spark_proc_param_context.datetime print("IN parameter value: ", int, ", INOUT parameter value: ", dt) # Returning the value of the OUT and INOUT parameters. spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.b = True spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}] spark_proc_param_context.time = datetime.time(23, 20, 50, 520000) spark_proc_param_context.f = 20.23 spark_proc_param_context.bs = b"hello" spark_proc_param_context.date = datetime.date(1985, 4, 12) spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.js = {"name": "Alice", "age": 30} """;
Hive メタストア テーブルから読み取って結果を BigQuery に書き込む
次の例は、Hive メタストア テーブルを変換し、結果を BigQuery に書き込む方法を示しています。
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="1.1") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \ .enableHiveSupport() \ .getOrCreate() spark.sql("CREATE DATABASE IF NOT EXISTS records") spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)") spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)") df = spark.sql("SELECT * FROM records.student") df.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("records_dataset.student") """
ログフィルタを表示する
Spark に対するストアド プロシージャを呼び出すと、ログ情報を表示できます。Cloud Logging のフィルタ情報と Spark History Cluster エンドポイントを取得するには、bq
show
コマンドを使用します。フィルタ情報は、子ジョブの SparkStatistics
フィールドで確認できます。ログフィルタを取得するには、次の操作を行います。
BigQuery ページに移動します。
クエリエディタで、ストアド プロシージャのスクリプト ジョブの子ジョブを一覧表示します。
bq ls -j --parent_job_id=$parent_job_id
ジョブ ID を取得する方法については、ジョブの詳細を表示するをご覧ください。
出力は次のようになります。
jobId Job Type State Start Time Duration ---------------------------------------------- --------- --------- --------------- ---------------- script_job_90fb26c32329679c139befcc638a7e71_0 query SUCCESS 07 Sep 18:00:27 0:05:15.052000
ストアド プロシージャに対する
jobId
を特定し、bq show
コマンドを使用して、ジョブの詳細を表示します。bq show --format=prettyjson --job $child_job_id
別の手順で必要になるため、
sparkStatistics
フィールドをコピーします。出力は次のようになります。
{ "configuration": {...} … "statistics": { … "query": { "sparkStatistics": { "loggingInfo": { "projectId": "myproject", "resourceType": "myresource" }, "sparkJobId": "script-job-90f0", "sparkJobLocation": "us-central1" }, … } } }
Logging 用に、
SparkStatistics
フィールドを使用してログフィルタを生成します。resource.type = sparkStatistics.loggingInfo.resourceType resource.labels.resource_container=sparkStatistics.loggingInfo.projectId resource.labels.spark_job_id=sparkStatistics.sparkJobId resource.labels.location=sparkStatistics.sparkJobLocation
ログは、
bigquery.googleapis.com/SparkJob
モニタリング対象リソースに書き込まれます。ログには、INFO
、DRIVER
、EXECUTOR
コンポーネントのラベルが付けられます。Spark ドライバからログをフィルタするには、ログフィルタにlabels.component = "DRIVER"
コンポーネントを追加します。Spark エグゼキュータからログをフィルタするには、ログフィルタにlabels.component = "EXECUTOR"
コンポーネントを追加します。
顧客管理の暗号鍵を使用する
BigQuery Spark プロシージャは、顧客管理の暗号鍵(CMEK)と BigQuery のデフォルトの暗号化を使用してコンテンツを保護します。Spark プロシージャで CMEK を使用するには、まず BigQuery の暗号化サービス アカウントの作成をトリガーし、必要な権限を付与します。プロジェクトに適用されている場合、Spark プロシージャは CMEK の組織のポリシーもサポートします。
ストアド プロシージャが INVOKER
セキュリティ モードを使用している場合は、プロシージャを呼び出すときに SQL システム変数を使用して CMEK を指定する必要があります。それ以外の場合は、ストアド プロシージャに関連付けられた接続を介して CMEK を指定できます。
Spark ストアド プロシージャの作成時に接続を介して CMEK を指定するには、次のサンプルコードを使用します。
bq mk --connection --connection_type='SPARK' \ --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \ --project_id=PROJECT_ID \ --location=LOCATION \ CONNECTION_NAME
プロシージャを呼び出すときに SQL システム変数を使用して CMEK を指定するには、次のサンプルコードを使用します。
SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
VPC Service Controls を使用する
VPC Service Controls を使用すると、データ漏洩を防ぐためのセキュアな境界を設定できます。セキュリティを強化するために、Spark プロシージャで VPC Service Controls を使用するには、まずサービス境界を作成します。
Spark プロシージャ ジョブを完全に保護するには、次の API をサービス境界に追加します。
- BigQuery API(
bigquery.googleapis.com
) - Cloud Logging API(
logging.googleapis.com
) - Cloud Storage を使用している場合: Cloud Storage API(
storage.googleapis.com
) - カスタム コンテナを使用する場合: Artifact Registry API(
artifactregistry.googleapis.com
)または Container Registry API(containerregistry.googleapis.com
) - Dataproc Metastore を使用する場合: Dataproc Metastore API(
metastore.googleapis.com
)、Cloud Run Admin API(run.googleapis.com
)
Spark プロシージャのクエリ プロジェクトを境界に追加します。Spark コードやデータをホストする他のプロジェクトを境界に追加します。
ベスト プラクティス
プロジェクトで初めて接続を使用する場合は、プロビジョニングにさらに 1 分ほどかかります。Spark ストアド プロシージャを作成するときに既存の Spark 接続を再利用すると時間を短縮できます。
本番環境で使用するために Spark プロシージャを作成する場合は、ランタイム バージョンを指定することをおすすめします。サポートされているランタイム バージョンのリストについては、Dataproc Serverless ランタイム バージョンをご覧ください。長期サポート(LTS)バージョンの使用をおすすめします。
Spark プロシージャでカスタム コンテナを指定する場合は、Artifact Registry とイメージ ストリーミングを使用することをおすすめします。
パフォーマンスを向上させるには、Spark プロシージャでリソース割り当てプロパティを指定します。Spark ストアド プロシージャは、Dataproc Serverless と同じリソース割り当てプロパティのリストをサポートしています。
制限事項
- Dataproc Metastore に接続できるのは、gRPC エンドポイント プロトコルのみです。他のタイプの Hive メタストアはまだサポートされていません。
- 顧客管理の暗号鍵(CMEK)は、顧客が単一リージョンの Spark プロシージャを作成する場合にのみ使用できます。グローバル リージョンの CMEK 鍵とマルチリージョンの CMEK 鍵(
EU
、US
など)はサポートされていません。 - 出力パラメータの引き渡しは PySpark でのみサポートされています。
- Spark のストアド プロシージャに関連付けられたデータセットが、クロスリージョン データセット レプリケーションによって宛先リージョンに複製される場合、ストアド プロシージャは、作成されたリージョン内でのみクエリされます。
- Spark は、プライベート VPC Service Controls ネットワーク内の HTTP エンドポイントへのアクセスをサポートしていません。
割り当てと上限
割り当てと上限の詳細については、Spark 用ストアド プロシージャの割り当てと上限をご覧ください。
次のステップ
- ストアド プロシージャを表示する方法を学習する。
- ストアド プロシージャを削除する方法を確認する。
- SQL ストアド プロシージャを操作する方法を学習する。