使用 Apache Spark 的預存程序
本文件適用於資料工程師、數據資料學家和資料分析師,可用於在 BigQuery 中建立及呼叫 Spark 的已儲存程序。
您可以使用 BigQuery 建立及執行以 Python、Java 和 Scala 編寫的 Spark 儲存程序。接著,您可以使用 GoogleSQL 查詢,在 BigQuery 中執行這些儲存的程序,原理類似執行 SQL 儲存程序。
事前準備
如要為 Spark 建立儲存程序,請管理員建立 Spark 連線並與您分享。管理員也必須為與連線相關聯的服務帳戶授予必要的 Identity and Access Management (IAM) 權限。
必要的角色
如要取得執行本文件中任務所需的權限,請要求管理員授予您下列 IAM 角色:
-
建立 Spark 預存程序:
-
在建立預存程序的資料集中使用 BigQuery 資料編輯器 (
roles/bigquery.dataEditor
) -
在預存程序使用的連線中,將 BigQuery 連線管理員 (
roles/bigquery.connectionAdmin
) 設為「允許」 -
專案中的 BigQuery 工作使用者 (
roles/bigquery.jobUser
)
-
在建立預存程序的資料集中使用 BigQuery 資料編輯器 (
-
呼叫 Spark 的預存程序:
-
儲存程序儲存的資料集的 BigQuery 中繼資料檢視器 (
roles/bigquery.metadataViewer
) -
連線上的 BigQuery 連線使用者 (
roles/bigquery.connectionUser
) -
專案中的 BigQuery 工作使用者 (
roles/bigquery.jobUser
)
-
儲存程序儲存的資料集的 BigQuery 中繼資料檢視器 (
如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。
這些預先定義的角色包含執行本文件中工作所需的權限。如要查看確切的必要權限,請展開「必要權限」部分:
所需權限
如要執行本文中的任務,您必須具備下列權限:
-
建立連線:
-
bigquery.connections.create
-
bigquery.connections.list
-
-
建立 Spark 預存程序:
-
bigquery.routines.create
-
bigquery.connections.delegate
-
bigquery.jobs.create
-
-
呼叫 Spark 的預存程序:
-
bigquery.routines.get
-
bigquery.connections.use
-
bigquery.jobs.create
-
位置考量
您必須在連線所在位置建立 Spark 的儲存程序,因為儲存程序會在與連線相同的位置執行。舉例來說,如要在美國多區域建立儲存程序,請使用位於美國多區域的連線。
定價
在 BigQuery 上執行 Spark 程序的費用與在 Dataproc Serverless 上執行 Spark 程序的費用類似。詳情請參閱「Dataproc Serverless 定價」。
Spark 預存程序可搭配以量計價的收費模式和任何 BigQuery 版本使用。無論專案使用哪種運算定價模式,Spark 程序的費用一律會以 BigQuery Enterprise 版即付即用模式計費。
適用於 BigQuery 的 Spark 預存程序不支援使用預留或承諾。現有的保留項目和承諾會繼續用於其他支援的查詢和程序。使用 Spark 儲存程序的費用會以 Enterprise 版的即付即用費率計費,並加計至帳單。系統會套用貴機構的折扣 (如適用)。
雖然 Spark 預存程序會使用 Spark 執行引擎,但您不會看到 Spark 執行作業的單獨費用。如前所述,相應的費用會以 BigQuery Enterprise 版「付費即用」SKU 的形式記錄。
Spark 預存程序不提供免費方案。
建立 Spark 預存程序
您必須在與所用連線相同的位置建立儲存程序。
如果儲存程序的內容超過 1 MB,建議您將儲存程序放入 Cloud Storage 值區中的檔案中,而非使用內嵌程式碼。BigQuery 提供兩種方法,可使用 Python 為 Spark 建立預存程序:
- 如要使用
CREATE PROCEDURE
陳述式,請使用 SQL 查詢編輯器。 - 如果您想直接輸入 Python 程式碼,請使用 PySpark 編輯器。您可以將程式碼儲存為預存程序。
使用 SQL 查詢編輯器
如要在 SQL 查詢編輯器中為 Spark 建立儲存程序,請按照下列步驟操作:
前往「BigQuery」頁面
在查詢編輯器中,為顯示的
CREATE PROCEDURE
陳述式新增程式碼範例。或者,您也可以在「Explorer」窗格中,點選用來建立連線資源的專案中的連線。接著,如要為 Spark 建立已儲存的程序,請按一下
「Create stored procedure」。Python
如要在 Python 中建立 Spark 的預存程序,請使用下列程式碼範例:
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Java 或 Scala
如要在 Java 或 Scala 中使用
main_file_uri
選項為 Spark 建立儲存程序,請使用下列程式碼範例:CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_JAR_URI"]); LANGUAGE JAVA|SCALA
如要在 Java 或 Scala 中使用
main_class
和jar_uris
選項為 Spark 建立儲存程序,請使用以下程式碼範例:CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_class=["CLASS_NAME"], jar_uris=["URI"]); LANGUAGE JAVA|SCALA
請依指示取代下列項目:
PROJECT_ID
:您要建立儲存程序的專案,例如myproject
。DATASET
:您要建立儲存程序的資料集,例如mydataset
。PROCEDURE_NAME
:您要在 BigQuery 中執行的已儲存程序名稱,例如mysparkprocedure
。PROCEDURE_ARGUMENT
:輸入輸入引數的參數。在這個參數中,指定下列欄位:
ARGUMENT_MODE
:引數的模式。有效值包括
IN
、OUT
和INOUT
。預設值為IN
。ARGUMENT_NAME
:引數的名稱。ARGUMENT_TYPE
:引數的類型。
例如
myproject.mydataset.mysparkproc(num INT64)
。如需更多資訊,請參閱本文件中的「將值做為
IN
參數」或「OUT
和INOUT
參數」一節。CONNECTION_PROJECT_ID
:包含執行 Spark 程序的連線專案。CONNECTION_REGION
:包含執行 Spark 程序的連線的區域,例如us
。CONNECTION_ID
:連線 ID,例如myconnection
。在 Google Cloud 控制台查看連線詳細資料時,連線 ID 是「連線 ID」中顯示的完整連線 ID 最後一個部分的值,例如
projects/myproject/locations/connection_location/connections/myconnection
。RUNTIME_VERSION
:Spark 的執行階段版本,例如2.2
。MAIN_PYTHON_FILE_URI
:PySpark 檔案的路徑,例如gs://mybucket/mypysparkmain.py
。或者,如果您想在
CREATE PROCEDURE
陳述式中新增儲存程序的內文,請在LANGUAGE PYTHON AS
後方新增PYSPARK_CODE
,如本文件「使用內嵌程式碼」一節的範例所示。PYSPARK_CODE
:如果您想在內文中傳遞程序的內容,請在CREATE PROCEDURE
陳述式中定義 PySpark 應用程式。這個值為字串常值。如果程式碼包含引號和反斜線,則必須進行轉義,或以原始字串的形式表示。舉例來說,程式碼傳回的
"\n";
可以表示為下列其中一個:- 加引號的字串:
"return \"\\n\";"
。半形引號和反斜線都會逸出。 - 加三引號的字串:
"""return "\\n";"""
。反斜線會加上反斜線,而引號則不會。 - 原始字串:
r"""return "\n";"""
。不需要逸出。
- 加引號的字串:
MAIN_JAR_URI
:包含main
類別的 JAR 檔案路徑,例如gs://mybucket/my_main.jar
。CLASS_NAME
:使用jar_uris
選項的 JAR 集合中,類別的完整名稱,例如com.example.wordcount
。URI
:包含main
類別中指定的類別的 JAR 檔案路徑,例如gs://mybucket/mypysparkmain.jar
。
如要瞭解可在
OPTIONS
中指定的其他選項,請參閱程序選項清單。
使用 PySpark 編輯器
使用 PySpark 編輯器建立程序時,您不需要使用 CREATE PROCEDURE
陳述式。請改為直接在 Pyspark 編輯器中新增 Python 程式碼,然後儲存或執行程式碼。
如要在 PySpark 編輯器中建立 Spark 的已儲存程序,請按照下列步驟操作:
前往「BigQuery」頁面
如要直接輸入 PySpark 程式碼,請開啟 PySpark 編輯器。如要開啟 PySpark 編輯器,請按一下
「Create SQL query」旁的 選單,然後選取「Create PySpark Procedure」。如要設定選項,請依序按一下「More」>「PySpark Options」,然後執行下列操作:
指定要執行 PySpark 程式碼的位置。
在「連線」欄位中指定 Spark 連線。
在「預存程序叫用」部分,指定要儲存產生的臨時預存程序的資料集。您可以設定特定資料集,也可以使用暫時資料集來叫用 PySpark 程式碼。
系統會根據先前步驟中指定的位置產生臨時資料集。如果指定資料集名稱,請確認資料集和 Spark 連線必須位於相同位置。
在「Parameters」部分中,定義儲存程序的參數。參數的值僅會在 PySpark 程式碼的工作階段執行期間使用,但宣告本身會儲存在程序中。
在「Advanced options」部分中,指定程序選項。如需詳細的程序選項清單,請參閱程序選項清單。
在「屬性」部分中,新增鍵/值組合來設定工作。您可以使用 Dataproc Serverless Spark 屬性中的任何鍵/值組合。
在「服務帳戶設定」中,指定在 PySpark 程式碼的會話內執行期間要使用的自訂服務帳戶、CMEK、暫存資料集和暫存 Cloud Storage 資料夾。
按一下 [儲存]。
儲存 Spark 預存程序
使用 PySpark 編輯器建立已儲存的程序後,您可以儲存該程序。如要這樣做,請按照下列步驟操作:
前往 Google Cloud 控制台的「BigQuery」頁面。
在查詢編輯器中,使用 Python 和 PySpark 編輯器為 Spark 建立預存程序。
依序點選
「儲存」。
在「Save Stored Procedure」對話方塊中,指定要儲存預存程序的資料集名稱和預存程序名稱。
按一下 [儲存]。
如果您只想執行 PySpark 程式碼,而非將其儲存為預存程序,請按一下「Run」,而非「Save」。
使用自訂容器
自訂容器可為工作負載的驅動程式和執行程序提供執行階段環境。如要使用自訂容器,請使用下列範例程式碼:
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
請依指示取代下列項目:
PROJECT_ID
:您要建立儲存程序的專案,例如myproject
。DATASET
:您要建立儲存程序的資料集,例如mydataset
。PROCEDURE_NAME
:您要在 BigQuery 中執行的已儲存程序名稱,例如mysparkprocedure
。PROCEDURE_ARGUMENT
:輸入引數的參數。在這個參數中,指定下列欄位:
ARGUMENT_MODE
:引數的模式。有效值包括
IN
、OUT
和INOUT
。預設值為IN
。ARGUMENT_NAME
:引數的名稱。ARGUMENT_TYPE
:引數的類型。
例如
myproject.mydataset.mysparkproc(num INT64)
。詳情請參閱本文件中的「將值做為
IN
參數」或「OUT
和INOUT
參數」一節。CONNECTION_PROJECT_ID
:包含執行 Spark 程序的連線專案。CONNECTION_REGION
:包含執行 Spark 程序的連線的區域,例如us
。CONNECTION_ID
:連線 ID,例如myconnection
。在 Google Cloud 控制台查看連線詳細資料時,連線 ID 是「連線 ID」中顯示的完整連線 ID 最後一節的值,例如
projects/myproject/locations/connection_location/connections/myconnection
。RUNTIME_VERSION
:Spark 的執行階段版本,例如2.2
。MAIN_PYTHON_FILE_URI
:PySpark 檔案的路徑,例如gs://mybucket/mypysparkmain.py
。或者,如果您想在
CREATE PROCEDURE
陳述式中新增儲存程序的內文,請在LANGUAGE PYTHON AS
後方新增PYSPARK_CODE
,如本文件「使用內嵌程式碼」一節的範例所示。PYSPARK_CODE
:如果您想在內嵌方式傳遞程序的內容,請在CREATE PROCEDURE
陳述式中定義 PySpark 應用程式。這個值為字串常值。如果程式碼包含引號和反斜線,則必須逸出這些字元,或以原始字串表示。舉例來說,傳回
"\n";
的程式碼可表示為下列任一項目:- 加引號的字串:
"return \"\\n\";"
。半形引號和反斜線都會逸出。 - 加三引號的字串:
"""return "\\n";"""
。反斜線會加上反斜線,而引號則不會。 - 原始字串:
r"""return "\n";"""
。不需要逸出。
- 加引號的字串:
CONTAINER_IMAGE
:構件登錄表中的圖片路徑。這個檔案只能包含在程序中使用的程式庫。如未指定,系統會使用與執行階段版本相關聯的預設容器映像檔。
如要進一步瞭解如何使用 Spark 建構自訂容器映像檔,請參閱「建構自訂容器映像檔」一文。
呼叫 Spark 的預存程序
建立儲存程序後,您可以使用下列任一選項呼叫儲存程序:
主控台
前往「BigQuery」頁面
在「Explorer」窗格中展開專案,然後選取要執行的 Spark 儲存程序。
在「預存程序資訊」視窗中,按一下「叫用預存程序」。或者,您也可以展開「View actions」選項,然後點選「Invoke」。
按一下「執行」。
在「所有結果」部分中,按一下「查看結果」。
選用步驟:在「查詢結果」部分,按照下列步驟操作:
如要查看 Spark 驅動程式記錄,請按一下「執行詳細資料」。
如要在 Cloud Logging 中查看記錄,請按一下「Job information」(工作資訊),然後在「Log」欄位中按一下「log」。
如要取得 Spark 記錄伺服器端點,請依序點選「工作資訊」和「Spark 記錄伺服器」。
SQL
如要呼叫預存程序,請使用 CALL PROCEDURE
陳述式:
前往 Google Cloud 控制台的「BigQuery」頁面。
在查詢編輯器中輸入以下陳述式:
CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()
按一下
「Run」。
如要進一步瞭解如何執行查詢,請參閱「執行互動式查詢」一文。
使用自訂服務帳戶
您可以使用自訂服務帳戶存取 Spark 程式碼中的資料,而非使用 Spark 連線的服務身分。
如要使用自訂服務帳戶,請在建立 Spark 儲存程序時指定 INVOKER
安全模式 (使用 EXTERNAL SECURITY INVOKER
陳述式),並在叫用儲存程序時指定服務帳戶。
當您首次使用自訂服務帳戶執行 Spark 預存程序時,BigQuery 會建立 Spark 服務代理人,並授予服務代理人所需的權限。請務必在叫用 Spark 儲存程序之前,不要修改這項授權。如需更多詳細資訊,請參閱 BigQuery Spark Service Agent。
如果您想從 Cloud Storage 存取及使用 Spark 程式碼,就必須將必要權限授予 Spark 連線的服務 ID。您必須將 storage.objects.get
IAM 權限或 storage.objectViewer
IAM 角色授予連線的服務帳戶。
您可以選擇授予連線的服務帳戶存取 Dataproc Metastore 和 Dataproc 永久記錄伺服器的權限 (如果您已在連線中指定這些項目)。詳情請參閱「授予服務帳戶存取權」。
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) EXTERNAL SECURITY INVOKER WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE] SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
您可以選擇在上述程式碼中加入下列引數:
SET @@spark_proc_properties.staging_bucket='BUCKET_NAME'; SET @@spark_proc_properties.staging_dataset_id='DATASET';
更改下列內容:
CUSTOM_SERVICE_ACCOUNT
:必填。您提供的自訂服務帳戶。BUCKET_NAME
:選用。用於做為預設 Spark 應用程式檔案系統的 Cloud Storage 值區。如果未提供這個值,系統會在專案中建立預設的 Cloud Storage 值區,並由在同一專案下執行的所有工作共用該值區。DATASET
:選用。資料集,用於儲存叫用程序所產生的暫時性資料。工作完成後,系統會清理資料。如果未提供,系統會為工作建立預設的暫存資料集。
自訂服務帳戶必須具備下列權限:
如要讀取及寫入用於做為預設 Spark 應用程式檔案系統的暫存值區:
storage.objects.*
權限或您指定的roles/storage.objectAdmin
IAM 角色。- 此外,如果未指定測試值區,則專案上的
storage.buckets.*
權限或roles/storage.Admin
IAM 角色。
(選用) 如要讀取及寫入 BigQuery 中的資料,請按照下列步驟操作:
- 在 BigQuery 資料表上使用
bigquery.tables.*
。 bigquery.readsessions.*
專案。roles/bigquery.admin
IAM 角色包含先前的權限。
- 在 BigQuery 資料表上使用
(選用) 如要讀取及寫入 Cloud Storage 中的資料,請按照下列步驟操作:
storage.objects.*
權限或 Cloud Storage 物件的roles/storage.objectAdmin
IAM 角色。
(選用) 如要讀取及寫入用於
INOUT/OUT
參數的暫存資料集:bigquery.tables.*
或roles/bigquery.dataEditor
IAM 角色,適用於您指定的暫存資料集。- 此外,如果未指定測試資料集,則需要專案的
bigquery.datasets.create
權限或roles/bigquery.dataEditor
IAM 角色。
Spark 預存程序範例
本節將舉例說明如何為 Apache Spark 建立已儲存的程序。
在 Cloud Storage 中使用 PySpark 或 JAR 檔案
下列範例說明如何使用 my-project-id.us.my-connection
連線和儲存在 Cloud Storage 值區中的 PySpark 或 JAR 檔案,為 Spark 建立已儲存的程序:
Python
CREATE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2", main_file_uri="gs://my-bucket/my-pyspark-main.py") LANGUAGE PYTHON
Java 或 Scala
使用 main_file_uri
建立預存程序:
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2", main_file_uri="gs://my-bucket/my-scala-main.jar") LANGUAGE SCALA
使用 main_class
建立預存程序:
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2", main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"]) LANGUAGE SCALA
使用內嵌程式碼
以下範例說明如何使用連線 my-project-id.us.my-connection
和內嵌 PySpark 程式碼,為 Spark 建立儲存程序:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() # Load data from BigQuery. words = spark.read.format("bigquery") \ .option("table", "bigquery-public-data:samples.shakespeare") \ .load() words.createOrReplaceTempView("words") # Perform word count. word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count") word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("wordcount_dataset.wordcount_output") """
將值做為輸入參數傳遞
以下範例顯示兩種方法,可在 Python 中將值做為輸入參數傳遞:
方法 1:使用環境變數
在 PySpark 程式碼中,您可以透過 Spark 驅動程式和執行緒中的環境變數,取得 Spark 已儲存程序的輸入參數。環境變數的名稱格式為 BIGQUERY_PROC_PARAM.PARAMETER_NAME
,其中 PARAMETER_NAME
是輸入參數的名稱。舉例來說,如果輸入參數的名稱為 var
,則對應的環境變數名稱為 BIGQUERY_PROC_PARAM.var
。輸入參數採用 JSON 編碼。在 PySpark 程式碼中,您可以從環境變數取得 JSON 字串中的輸入參數值,並將其解碼為 Python 變數。
以下範例說明如何取得 INT64
類型輸入參數的值,並將其放入 PySpark 程式碼中:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession import os import json spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc = spark.sparkContext # Get the input parameter num in JSON string and convert to a Python variable num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"])) """
方法 2:使用內建程式庫
在 PySpark 程式碼中,您只需匯入內建程式庫,然後使用該程式庫填入所有類型的參數。如要將參數傳遞給執行緒,請在 Spark 驅動程式中將參數填入為 Python 變數,並將值傳遞給執行緒。內建程式庫支援大部分的 BigQuery 資料類型,但 INTERVAL
、GEOGRAPHY
、NUMERIC
和 BIGNUMERIC
除外。
BigQuery 資料類型 | Python 資料類型 |
---|---|
BOOL
|
bool
|
STRING
|
str
|
FLOAT64
|
float
|
INT64
|
int
|
BYTES
|
bytes
|
DATE
|
datetime.date
|
TIMESTAMP
|
datetime.datetime
|
TIME
|
datetime.time
|
DATETIME
|
datetime.datetime
|
Array
|
Array
|
Struct
|
Struct
|
JSON
|
Object
|
NUMERIC
|
不支援 |
BIGNUMERIC
|
不支援 |
INTERVAL
|
不支援 |
GEOGRAPHY
|
不支援 |
以下範例說明如何匯入內建程式庫,並使用該程式庫在 PySpark 程式碼中填入 INT64 類型的輸入參數,以及 ARRAY<STRUCT<a INT64, b STRING>> 類型的輸入參數:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession from bigquery.spark.procedure import SparkProcParamContext def check_in_param(x, num): return x['a'] + num def main(): spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc=spark.sparkContext spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Get the input parameter num of type INT64 num = spark_proc_param_context.num # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>> info = spark_proc_param_context.info # Pass the parameter to executors df = sc.parallelize(info) value = df.map(lambda x : check_in_param(x, num)).sum() main() """
在 Java 或 Scala 程式碼中,您可以透過 Spark 驅動程式和執行緒中的環境變數,取得 Spark 已儲存程序的輸入參數。環境變數的名稱格式為 BIGQUERY_PROC_PARAM.PARAMETER_NAME
,其中 PARAMETER_NAME
是輸入參數的名稱。舉例來說,如果輸入參數的名稱為 var,則對應的環境變數名稱為 BIGQUERY_PROC_PARAM.var
。在 Java 或 Scala 程式碼中,您可以從環境變數取得輸入參數值。
以下範例說明如何從環境變數取得輸入參數的值,並將其帶入 Scala 程式碼:
val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get
以下範例說明如何從環境變數取得輸入參數,並將其納入 Java 程式碼:
String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");
將值傳遞為 OUT
和 INOUT
參數
輸出參數會傳回 Spark 程序的值,而 INOUT
參數會接受程序的值,並傳回程序的值。如要使用 OUT
和 INOUT
參數,請在建立 Spark 程序時,在參數名稱前方加上 OUT
或 INOUT
關鍵字。在 PySpark 程式碼中,您可以使用內建程式庫將值做為 OUT
或 INOUT
參數傳回。與輸入參數相同,內建程式庫支援大部分的 BigQuery 資料類型,但 INTERVAL
、GEOGRAPHY
、NUMERIC
和 BIGNUMERIC
除外。當 TIME
和 DATETIME
類型值以 OUT
或 INOUT
參數傳回時,會轉換為世界標準時間時區。
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON) WITH CONNECTION `my_bq_project.my_dataset.my_connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql.session import SparkSession import datetime from bigquery.spark.procedure import SparkProcParamContext spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate() spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Reading the IN and INOUT parameter values. int = spark_proc_param_context.int dt = spark_proc_param_context.datetime print("IN parameter value: ", int, ", INOUT parameter value: ", dt) # Returning the value of the OUT and INOUT parameters. spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.b = True spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}] spark_proc_param_context.time = datetime.time(23, 20, 50, 520000) spark_proc_param_context.f = 20.23 spark_proc_param_context.bs = b"hello" spark_proc_param_context.date = datetime.date(1985, 4, 12) spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.js = {"name": "Alice", "age": 30} """;
從 Hive Metastore 資料表讀取資料,並將結果寫入 BigQuery
以下範例說明如何轉換 Hive Metastore 資料表,並將結果寫入 BigQuery:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \ .enableHiveSupport() \ .getOrCreate() spark.sql("CREATE DATABASE IF NOT EXISTS records") spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)") spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)") df = spark.sql("SELECT * FROM records.student") df.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("records_dataset.student") """
查看記錄檔篩選器
呼叫 Spark 的預存程序後,您可以查看記錄資訊。如要取得 Cloud Logging 篩選器資訊和 Spark 記錄叢集端點,請使用 bq
show
指令。篩選器資訊位於子工作中的 SparkStatistics
欄位下方。如要取得記錄篩選器,請按照下列步驟操作:
前往「BigQuery」頁面
在查詢編輯器中,列出儲存程序的腳本工作子項工作:
bq ls -j --parent_job_id=$parent_job_id
如要瞭解如何取得工作 ID,請參閱「查看工作詳細資料」。
輸出結果會與下列內容相似:
jobId Job Type State Start Time Duration ---------------------------------------------- --------- --------- --------------- ---------------- script_job_90fb26c32329679c139befcc638a7e71_0 query SUCCESS 07 Sep 18:00:27 0:05:15.052000
找出儲存程序的
jobId
,然後使用bq show
指令查看工作詳細資料:bq show --format=prettyjson --job $child_job_id
複製
sparkStatistics
欄位,因為您在其他步驟中會用到。輸出結果會與下列內容相似:
{ "configuration": {...} … "statistics": { … "query": { "sparkStatistics": { "loggingInfo": { "projectId": "myproject", "resourceType": "myresource" }, "sparkJobId": "script-job-90f0", "sparkJobLocation": "us-central1" }, … } } }
針對記錄,請使用
SparkStatistics
欄位產生記錄篩選器:resource.type = sparkStatistics.loggingInfo.resourceType resource.labels.resource_container=sparkStatistics.loggingInfo.projectId resource.labels.spark_job_id=sparkStatistics.sparkJobId resource.labels.location=sparkStatistics.sparkJobLocation
記錄會寫入
bigquery.googleapis.com/SparkJob
監控資源。記錄會標示為INFO
、DRIVER
和EXECUTOR
元件。如要篩選 Spark 驅動程式的記錄,請將labels.component = "DRIVER"
元件新增至記錄篩選器。如要篩選 Spark 執行緒的記錄,請將labels.component = "EXECUTOR"
元件新增至記錄篩選器。
使用客戶自行管理的加密金鑰
BigQuery Spark 程序會使用客戶管理的加密金鑰 (CMEK) 和 BigQuery 提供的預設加密技術來保護您的內容。如要在 Spark 程序中使用 CMEK,請先觸發建立 BigQuery 加密服務帳戶並授予必要權限。如果您的專案套用了 CMEK 機構政策,Spark 程序也支援這些政策。
如果儲存程序使用 INVOKER
安全模式,請在呼叫程序時透過 SQL 系統變數指定 CMEK。否則,您可以透過與儲存程序相關聯的連線指定 CMEK。
如要在建立 Spark 預存程序時透過連線指定 CMEK,請使用下列程式碼範例:
bq mk --connection --connection_type='SPARK' \ --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \ --project_id=PROJECT_ID \ --location=LOCATION \ CONNECTION_NAME
如要在呼叫程序時透過 SQL 系統變數指定 CMEK,請使用以下程式碼範例:
SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
使用 VPC Service Controls
您可以使用 VPC Service Controls 設定安全範圍,防範資料外洩。如要使用 VPC Service Controls 搭配 Spark 程序來強化安全性,請先建立服務範圍。
如要完全保護 Spark 程序工作,請將下列 API 新增至服務範圍:
- BigQuery API (
bigquery.googleapis.com
) - Cloud Logging API (
logging.googleapis.com
) - Cloud Storage API (
storage.googleapis.com
) (如果您使用 Cloud Storage) - Artifact Registry API (
artifactregistry.googleapis.com
) 或 Container Registry API (containerregistry.googleapis.com
) (如果您使用自訂容器) - Dataproc Metastore API (
metastore.googleapis.com
) 和 Cloud Run Admin API (run.googleapis.com
) (如果您使用 Dataproc Metastore)
將 Spark 程序的查詢專案新增至安全範圍。將代管 Spark 程式碼或資料的其他專案新增至範圍。
最佳做法
初次在專案中使用連線時,系統大約需要額外一分鐘才能佈建。為節省時間,您可以在建立 Spark 的已儲存程序時,重複使用現有的 Spark 連線。
建立實際工作環境用途的 Spark 程序時,Google 建議您指定執行階段版本。如需支援的執行階段版本清單,請參閱「Dataproc Serverless 執行階段版本」。建議您使用長期支援版 (LTS)。
在 Spark 程序中指定自訂容器時,建議您使用 Artifact Registry 和圖片串流。
為提升效能,您可以在 Spark 程序中指定資源分配屬性。Spark 儲存程序支援資源分配屬性清單,這與 Dataproc Serverless 相同。
限制
- 您只能使用 gRPC 端點通訊協定連線至 Dataproc Metastore。其他類型的 Hive Metastore 尚不支援。
- 客戶自行管理的加密金鑰 (CMEK) 僅適用於客戶建立單一區域的 Spark 程序。不支援全域區域的 CMEK 金鑰和多區域 CMEK 金鑰,例如
EU
或US
。 - 只有 PySpark 支援傳遞輸出參數。
- 如果與 Spark 的已儲存程序相關聯的資料集是透過跨區域資料集複製複製到目的地區域,則只能在建立已儲存程序的區域中查詢該程序。
- Spark 不支援存取私人 VPC Service Controls 網路中的 HTTP 端點。
配額與限制
如要進一步瞭解配額和限制,請參閱Spark 配額和限制的已儲存程序。
後續步驟
- 瞭解如何查看預存程序。
- 瞭解如何刪除儲存程序。
- 瞭解如何使用 SQL 預存程序。