Utilizzare le stored procedure per Apache Spark
Questo documento è destinato a data engineer, data scientist e analisti di dati per creare e chiamare stored procedure per Spark in BigQuery.
Utilizzando BigQuery, puoi creare ed eseguire stored procedure Spark scritte in Python, Java e Scala. Puoi quindi eseguire queste stored procedure in BigQuery utilizzando una query GoogleSQL, in modo simile all'esecuzione di stored procedure SQL.
Prima di iniziare
Per creare una stored procedure per Spark, chiedi all'amministratore di creare una connessione Spark e condividerla con te. L'amministratore deve inoltre concedere all'account di servizio associato alla connessione le autorizzazioni Identity and Access Management (IAM) richieste.
Ruoli obbligatori
Per ottenere le autorizzazioni necessarie per eseguire le attività descritte in questo documento, chiedi all'amministratore di concederti i seguenti ruoli IAM:
-
Crea una stored procedure per Spark:
-
Editor dati BigQuery (
roles/bigquery.dataEditor
) sul set di dati in cui crei la stored procedure -
Amministratore connessione BigQuery (
roles/bigquery.connectionAdmin
) sulla connessione utilizzata dalla stored procedure -
Utente job BigQuery (
roles/bigquery.jobUser
) nel tuo progetto
-
Editor dati BigQuery (
-
Chiama una stored procedure per Spark:
-
Visualizzatore metadati BigQuery (
roles/bigquery.metadataViewer
) sul set di dati in cui è archiviata la stored procedure -
Utente connessione BigQuery (
roles/bigquery.connectionUser
) sulla connessione -
Utente job BigQuery (
roles/bigquery.jobUser
) nel tuo progetto
-
Visualizzatore metadati BigQuery (
Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.
Questi ruoli predefiniti contengono le autorizzazioni necessarie per eseguire le attività descritte in questo documento. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:
Autorizzazioni obbligatorie
Per eseguire le attività descritte in questo documento sono necessarie le seguenti autorizzazioni:
-
Crea una connessione:
-
bigquery.connections.create
-
bigquery.connections.list
-
-
Crea una stored procedure per Spark:
-
bigquery.routines.create
-
bigquery.connections.delegate
-
bigquery.jobs.create
-
-
Chiama una stored procedure per Spark:
-
bigquery.routines.get
-
bigquery.connections.use
-
bigquery.jobs.create
-
Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.
Considerazione della posizione
Devi creare una stored procedure per Spark nella stessa posizione della connessione perché la stored procedure viene eseguita nella stessa posizione della connessione. Ad esempio, per creare una stored procedure nella multi-regione degli Stati Uniti, utilizzi una connessione che si trova nella multi-regione degli Stati Uniti.
Prezzi
Gli addebiti per l'esecuzione di procedure Spark su BigQuery sono simili a quelli per l'esecuzione di procedure Spark su Dataproc Serverless. Per ulteriori informazioni, consulta i prezzi di Dataproc Serverless.
Le stored procedure Spark possono essere utilizzate con il modello di determinazione dei prezzi on demand e con qualsiasi delle versioni di BigQuery. Le procedure Spark vengono addebitate utilizzando il modello di pagamento a consumo di BigQuery Enterprise in tutti i casi, indipendentemente dal modello di prezzi di calcolo utilizzato nel tuo progetto.
Le stored procedure Spark per BigQuery non supportano l'utilizzo di prenotazioni o impegni. Le prenotazioni e gli impegni esistenti continuano a essere utilizzati per altre query e procedure supportate. I costi per l'utilizzo delle stored procedure Spark vengono aggiunti alla fattura al costo della versione Enterprise a consumo. Gli sconti della tua organizzazione vengono applicati, se applicabili.
Sebbene le stored procedure Spark utilizzino un motore di esecuzione Spark, non vedrai addebiti separati per l'esecuzione di Spark. Come indicato, i costi corrispondenti vengono segnalati come SKU pay-as-you-go di BigQuery Enterprise.
Le stored procedure Spark non offrono un livello gratuito.
Crea una stored procedure per Spark
Devi creare la stored procedure nella stessa posizione della connessione che utilizzi.
Se il corpo della stored procedure supera 1 MB, ti consigliamo di inserire la stored procedure in un file in un bucket Cloud Storage anziché utilizzare il codice inline. BigQuery fornisce due metodi per creare una stored procedure per Spark utilizzando Python:
- Se vuoi utilizzare l'istruzione
CREATE PROCEDURE
, utilizza l'editor di query SQL. - Se vuoi digitare direttamente il codice Python, utilizza l'editor PySpark. Puoi salvare il codice come stored procedure.
Utilizzare l'editor di query SQL
Per creare una stored procedure per Spark nell'editor di query SQL:
Vai alla pagina BigQuery.
Nell'editor delle query, aggiungi il codice campione per l'istruzione
CREATE PROCEDURE
che viene visualizzata.In alternativa, nel riquadro Explorer, fai clic sulla connessione nel progetto che hai utilizzato per creare la risorsa di connessione. Quindi, per creare una stored procedure per Spark, fai clic su
Crea stored procedure.Python
Per creare una stored procedure per Spark in Python, utilizza il seguente codice campione:
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Java o Scala
Per creare una stored procedure per Spark in Java o Scala con l'opzione
main_file_uri
, utilizza il seguentecodice campioneo:CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_JAR_URI"]); LANGUAGE JAVA|SCALA
Per creare una stored procedure per Spark in Java o Scala con le opzioni
main_class
ejar_uris
, utilizza il seguentecodice campioneo:CREATE [OR REPLACE] PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_class=["CLASS_NAME"], jar_uris=["URI"]); LANGUAGE JAVA|SCALA
Sostituisci quanto segue:
PROJECT_ID
: il progetto in cui vuoi creare la stored procedure, ad esempiomyproject
.DATASET
: il set di dati in cui vuoi creare la stored procedure, ad esempiomydataset
.PROCEDURE_NAME
: il nome della stored procedure che vuoi eseguire in BigQuery, ad esempiomysparkprocedure
.PROCEDURE_ARGUMENT
: un parametro per inserire gli argomenti di input.In questo parametro, specifica i seguenti campi:
ARGUMENT_MODE
: la modalità dell'argomento.I valori validi includono
IN
,OUT
eINOUT
. Per impostazione predefinita, il valore èIN
.ARGUMENT_NAME
: il nome dell'argomento.ARGUMENT_TYPE
: il tipo di argomento.
Ad esempio:
myproject.mydataset.mysparkproc(num INT64)
.Per saperne di più, consulta Trasmettere un valore come parametro
IN
o i parametriOUT
eINOUT
in questo documento.CONNECTION_PROJECT_ID
: il progetto che contiene la connessione per eseguire la procedura Spark.CONNECTION_REGION
: la regione che contiene la connessione per eseguire la procedura Spark, ad esempious
.CONNECTION_ID
: l'ID della connessione, ad esempiomyconnection
.Quando visualizzi i dettagli della connessione nella console Google Cloud , l'ID connessione è il valore nell'ultima sezione dell'ID connessione completo visualizzato in ID connessione, ad esempio
projects/myproject/locations/connection_location/connections/myconnection
.RUNTIME_VERSION
: la versione del runtime di Spark, ad esempio2.2
.MAIN_PYTHON_FILE_URI
: il percorso di un file PySpark, ad esempiogs://mybucket/mypysparkmain.py
.In alternativa, se vuoi aggiungere il corpo della stored procedure nell'istruzione
CREATE PROCEDURE
, aggiungiPYSPARK_CODE
dopoLANGUAGE PYTHON AS
, come mostrato nell'esempio in Utilizzare il codice inline in questo documento.PYSPARK_CODE
: la definizione di un'applicazione PySpark nell'istruzioneCREATE PROCEDURE
se vuoi passare il corpo della procedura in linea.Il valore è un valore letterale stringa. Se il codice include virgolette e barre rovesciate, queste devono essere sottoposte a escape o rappresentate come stringa non elaborata. Ad esempio, il codice di ritorno
"\n";
può essere rappresentato come uno dei seguenti:- Stringa tra virgolette:
"return \"\\n\";"
. Vengono inseriti i caratteri di escape per virgolette e barre rovesciate. - Stringa tra virgolette triple:
"""return "\\n";"""
. Le barre rovesciate vengono sottoposte all'escape, mentre i segni di virgolette no. - Stringa non elaborata:
r"""return "\n";"""
. Non è necessario eseguire l'escape.
- Stringa tra virgolette:
MAIN_JAR_URI
: il percorso del file JAR che contiene la classemain
, ad esempiogs://mybucket/my_main.jar
.CLASS_NAME
: il nome completo di una classe in un insieme JAR con l'opzionejar_uris
, ad esempiocom.example.wordcount
.URI
: il percorso del file JAR che contiene la classe specificata nella classemain
, ad esempiogs://mybucket/mypysparkmain.jar
.
Per altre opzioni che puoi specificare in
OPTIONS
, consulta l'elenco delle opzioni della procedura.
Utilizzare l'editor PySpark
Quando crei una procedura utilizzando l'editor PySpark, non devi utilizzare l'istruzione
CREATE PROCEDURE
. Aggiungi invece il codice Python direttamente nell'editor Pyspark e salva o esegui il codice.
Per creare una stored procedure per Spark nell'editor PySpark, segui questi passaggi:
Vai alla pagina BigQuery.
Se vuoi digitare direttamente il codice PySpark, apri l'editor PySpark. Per aprire l'editor PySpark, fai clic sul menu
accanto a Crea query SQL, quindi seleziona Crea procedura PySpark.Per impostare le opzioni, fai clic su Altro > Opzioni PySpark, quindi procedi nel seguente modo:
Specifica la posizione in cui vuoi eseguire il codice PySpark.
Nel campo Connessione, specifica la connessione Spark.
Nella sezione Chiamata di stored procedure, specifica il set di dati in cui vuoi archiviare le stored procedure temporanee generate. Puoi impostare un set di dati specifico o consentire l'utilizzo di un set di dati temporaneo per richiamare il codice PySpark.
Il set di dati temporaneo viene generato con la località specificata nel passaggio precedente. Se viene specificato un nome del set di dati, assicurati che il set di dati e la connessione Spark si trovino nella stessa posizione.
Nella sezione Parametri, definisci i parametri per la stored procedure. Il valore del parametro viene utilizzato solo durante le esecuzioni in sessione del codice PySpark, ma la dichiarazione stessa viene memorizzata nella procedura.
Nella sezione Opzioni avanzate, specifica le opzioni della procedura. Per un elenco dettagliato delle opzioni della procedura, consulta l'elenco delle opzioni della procedura.
Nella sezione Proprietà, aggiungi le coppie chiave-valore per configurare il job. Puoi utilizzare una qualsiasi delle coppie chiave-valore delle proprietà Spark di Dataproc Serverless.
In Impostazioni dell'account di servizio, specifica l'account di servizio personalizzato, CMEK, il set di dati di staging e la cartella Cloud Storage di staging da utilizzare durante le esecuzioni in sessione del codice PySpark.
Fai clic su Salva.
Salva una stored procedure per Spark
Dopo aver creato la stored procedure utilizzando l'editor PySpark, puoi salvarla. Per farlo, segui questi passaggi:
Nella console Google Cloud , vai alla pagina BigQuery.
Nell'editor di query, crea una stored procedure per Spark utilizzando Python con l'editor PySpark.
Fai clic su
Salva > Salva procedura.
Nella finestra di dialogo Salva stored procedure, specifica il nome del set di dati in cui vuoi archiviare la stored procedure e il nome della stored procedure.
Fai clic su Salva.
Se vuoi solo eseguire il codice PySpark anziché salvarlo come stored procedure, puoi fare clic su Esegui anziché su Salva.
Utilizzo di container personalizzati
Il container personalizzato fornisce l'ambiente di runtime per i processi del driver e dell'executor del carico di lavoro. Per utilizzare container personalizzati, utilizza il seguente codice campione:
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Sostituisci quanto segue:
PROJECT_ID
: il progetto in cui vuoi creare la stored procedure, ad esempiomyproject
.DATASET
: il set di dati in cui vuoi creare la stored procedure, ad esempiomydataset
.PROCEDURE_NAME
: il nome della stored procedure che vuoi eseguire in BigQuery, ad esempiomysparkprocedure
.PROCEDURE_ARGUMENT
: un parametro per inserire gli argomenti di input.In questo parametro, specifica i seguenti campi:
ARGUMENT_MODE
: la modalità dell'argomento.I valori validi includono
IN
,OUT
eINOUT
. Per impostazione predefinita, il valore èIN
.ARGUMENT_NAME
: il nome dell'argomento.ARGUMENT_TYPE
: il tipo di argomento.
Ad esempio:
myproject.mydataset.mysparkproc(num INT64)
.Per saperne di più, consulta Trasmettere un valore come parametro
IN
o i parametriOUT
eINOUT
in questo documento.CONNECTION_PROJECT_ID
: il progetto che contiene la connessione per eseguire la procedura Spark.CONNECTION_REGION
: la regione che contiene la connessione per eseguire la procedura Spark, ad esempious
.CONNECTION_ID
: l'ID connessione, ad esempiomyconnection
.Quando visualizzi i dettagli della connessione nella console Google Cloud , l'ID connessione è il valore nell'ultima sezione dell'ID connessione completo visualizzato in ID connessione, ad esempio
projects/myproject/locations/connection_location/connections/myconnection
.RUNTIME_VERSION
: la versione del runtime di Spark, ad esempio2.2
.MAIN_PYTHON_FILE_URI
: il percorso di un file PySpark, ad esempio,gs://mybucket/mypysparkmain.py
.In alternativa, se vuoi aggiungere il corpo della stored procedure nell'istruzione
CREATE PROCEDURE
, aggiungiPYSPARK_CODE
dopoLANGUAGE PYTHON AS
, come mostrato nell'esempio in Utilizzare il codice inline in questo documento.PYSPARK_CODE
: la definizione di un'applicazione PySpark nell'istruzioneCREATE PROCEDURE
se vuoi passare il corpo della procedura inline.Il valore è un valore letterale stringa. Se il codice include virgolette e barre rovesciate, queste devono essere sottoposte a escape o rappresentate come stringa non elaborata. Ad esempio, il codice di ritorno
"\n";
può essere rappresentato come uno dei seguenti:- Stringa tra virgolette:
"return \"\\n\";"
. Vengono inseriti i caratteri di escape sia per le virgolette che per le barre rovesciate. - Stringa tra virgolette triple:
"""return "\\n";"""
. Le barre rovesciate vengono sottoposte a escape, mentre i segni di virgolette no. - Stringa non elaborata:
r"""return "\n";"""
. Non è necessario eseguire l'escape.
- Stringa tra virgolette:
CONTAINER_IMAGE
: percorso dell'immagine nel registro degli artefatti. Deve contenere solo le librerie da utilizzare nella procedura. Se non specificata, viene utilizzata l'immagine container predefinita del sistema associata alla versione runtime.
Per saperne di più su come creare un'immagine container personalizzata con Spark, vedi Creare un'immagine container personalizzata.
Chiama una stored procedure per Spark
Dopo aver creato una stored procedure, puoi chiamarla utilizzando una delle seguenti opzioni:
Console
Vai alla pagina BigQuery.
Nel riquadro Explorer, espandi il progetto e seleziona la stored procedure per Spark che vuoi eseguire.
Nella finestra Informazioni stored procedure, fai clic su Richiama stored procedure. In alternativa, puoi espandere l'opzione Visualizza azioni e fare clic su Richiama.
Fai clic su Esegui.
Nella sezione Tutti i risultati, fai clic su Visualizza risultati.
(Facoltativo) Nella sezione Risultati delle query, segui questi passaggi:
Se vuoi visualizzare i log del driver Spark, fai clic su Dettagli esecuzione.
Se vuoi visualizzare i log in Cloud Logging, fai clic su Informazioni sul job, quindi nel campo Log, fai clic su log.
Se vuoi ottenere l'endpoint del server di cronologia Spark, fai clic su Informazioni sul job e poi su Server di cronologia Spark.
SQL
Per chiamare una stored procedure, utilizza l'istruzione CALL PROCEDURE
:
Nella console Google Cloud , vai alla pagina BigQuery.
Nell'editor di query, inserisci la seguente istruzione:
CALL `PROJECT_ID`.DATASET.PROCEDURE_NAME()
Fai clic su
Esegui.
Per maggiori informazioni su come eseguire le query, consulta Eseguire una query interattiva.
Utilizzo di un account di servizio personalizzato
Anziché utilizzare l'identità di servizio della connessione Spark per l'accesso ai dati, puoi utilizzare un account di servizio personalizzato per accedere ai dati all'interno del codice Spark.
Per utilizzare un account di servizio personalizzato, specifica la modalità di sicurezza INVOKER
(utilizzando l'istruzione EXTERNAL SECURITY INVOKER
) quando crei una stored procedure Spark e specifica il account di servizio quando richiami la stored procedure.
Quando esegui la stored procedure Spark con l'account di servizio personalizzato per la prima volta, BigQuery crea un agente di servizio Spark e concede all'agente di servizio le autorizzazioni richieste. Assicurati di non modificare questa concessione prima di richiamare la stored procedure Spark. Per saperne di più, vedi Agente di servizio BigQuery Spark.
Se vuoi accedere e utilizzare il codice Spark da Cloud Storage, devi concedere le autorizzazioni necessarie all'identità di servizio della connessione Spark. Devi concedere all'account di servizio della connessione l'autorizzazione IAM storage.objects.get
o il ruolo IAM storage.objectViewer
.
(Facoltativo) Puoi concedere al account di servizio della connessione l'accesso a Dataproc Metastore e al server di cronologia permanente Dataproc se li hai specificati nella connessione. Per saperne di più, vedi Concedere l'accesso al service account.
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) EXTERNAL SECURITY INVOKER WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE] SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
Se vuoi, puoi aggiungere i seguenti argomenti al codice precedente:
SET @@spark_proc_properties.staging_bucket='BUCKET_NAME'; SET @@spark_proc_properties.staging_dataset_id='DATASET';
Sostituisci quanto segue:
CUSTOM_SERVICE_ACCOUNT
: obbligatorio. Un account di servizio personalizzato fornito da te.BUCKET_NAME
: (Facoltativo). Il bucket Cloud Storage utilizzato come file system predefinito dell'applicazione Spark. Se non viene fornito, viene creato un bucket Cloud Storage predefinito nel progetto e il bucket viene condiviso da tutti i job in esecuzione nello stesso progetto.DATASET
: (Facoltativo). Il set di dati in cui archiviare i dati temporanei prodotti richiamando la procedura. I dati vengono puliti al termine del job. Se non viene fornito, per il job viene creato un set di dati temporaneo predefinito.
Il account di servizio personalizzato deve disporre delle seguenti autorizzazioni:
Per leggere e scrivere nel bucket di staging utilizzato come file system predefinito dell'applicazione Spark:
storage.objects.*
o il ruolo IAMroles/storage.objectAdmin
nel bucket di staging che specifichi.- Inoltre, le autorizzazioni
storage.buckets.*
o il ruolo IAMroles/storage.Admin
sul progetto se il bucket di staging non è specificato.
(Facoltativo) Per leggere e scrivere dati da e verso BigQuery:
bigquery.tables.*
sulle tue tabelle BigQuery.bigquery.readsessions.*
sul tuo progetto.- Il ruolo IAM
roles/bigquery.admin
include le autorizzazioni precedenti.
(Facoltativo) Per leggere e scrivere dati da e su Cloud Storage:
- autorizzazioni
storage.objects.*
o il ruolo IAMroles/storage.objectAdmin
sugli oggetti Cloud Storage.
- autorizzazioni
(Facoltativo) Per leggere e scrivere nel set di dati di staging utilizzato per i parametri
INOUT/OUT
:bigquery.tables.*
oroles/bigquery.dataEditor
sul set di dati di staging che specifichi.- Inoltre, l'autorizzazione
bigquery.datasets.create
o il ruolo IAMroles/bigquery.dataEditor
sul progetto se il set di dati di staging non è specificato.
Esempi di stored procedure per Spark
Questa sezione mostra esempi di come creare una stored procedure per Apache Spark.
Utilizzare un file PySpark o JAR in Cloud Storage
L'esempio seguente mostra come creare una stored procedure per Spark utilizzando la connessione my-project-id.us.my-connection
e un file PySpark o JAR archiviato in un bucket Cloud Storage:
Python
CREATE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2", main_file_uri="gs://my-bucket/my-pyspark-main.py") LANGUAGE PYTHON
Java o Scala
Utilizza main_file_uri
per creare una stored procedure:
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_wtih_main_jar() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2", main_file_uri="gs://my-bucket/my-scala-main.jar") LANGUAGE SCALA
Utilizza main_class
per creare una stored procedure:
CREATE PROCEDURE my_bq_project.my_dataset.scala_proc_with_main_class() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2", main_class="com.example.wordcount", jar_uris=["gs://my-bucket/wordcount.jar"]) LANGUAGE SCALA
Utilizzare il codice in linea
Il seguente esempio mostra come creare una stored procedure per Spark utilizzando la connessione my-project-id.us.my-connection
e il codice PySpark incorporato:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() # Load data from BigQuery. words = spark.read.format("bigquery") \ .option("table", "bigquery-public-data:samples.shakespeare") \ .load() words.createOrReplaceTempView("words") # Perform word count. word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count") word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("wordcount_dataset.wordcount_output") """
Passare un valore come parametro di input
I seguenti esempi mostrano i due metodi per passare un valore come parametro di input in Python:
Metodo 1: utilizza le variabili di ambiente
Nel codice PySpark, puoi ottenere i parametri di input della stored procedure
per Spark tramite le variabili di ambiente nel driver Spark e
negli executor. Il nome della variabile di ambiente ha il formato
BIGQUERY_PROC_PARAM.PARAMETER_NAME
,
dove PARAMETER_NAME
è il nome del parametro di input. Ad esempio, se il nome del parametro di input è var
, il nome della variabile di ambiente corrispondente è BIGQUERY_PROC_PARAM.var
. I parametri
di input sono codificati in JSON.
Nel codice PySpark, puoi ottenere il valore parametro di input in una stringa JSON dalla variabile di ambiente e decodificarlo in una variabile Python.
Il seguente esempio mostra come ottenere il valore di un parametro di input di tipo
INT64
nel codice PySpark:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession import os import json spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc = spark.sparkContext # Get the input parameter num in JSON string and convert to a Python variable num = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"])) """
Metodo 2: utilizza una libreria integrata
Nel codice PySpark, puoi semplicemente importare una libreria integrata e utilizzarla per
compilare tutti i tipi di parametri. Per passare i parametri agli esecutori, inserisci
i parametri in un driver Spark come variabili Python e passa i valori agli
esecutori. La libreria integrata supporta la maggior parte dei tipi di dati BigQuery, ad eccezione di INTERVAL
, GEOGRAPHY
, NUMERIC
e BIGNUMERIC
.
Tipo di dati BigQuery | Tipo di dati Python |
---|---|
BOOL
|
bool
|
STRING
|
str
|
FLOAT64
|
float
|
INT64
|
int
|
BYTES
|
bytes
|
DATE
|
datetime.date
|
TIMESTAMP
|
datetime.datetime
|
TIME
|
datetime.time
|
DATETIME
|
datetime.datetime
|
Array
|
Array
|
Struct
|
Struct
|
JSON
|
Object
|
NUMERIC
|
Non supportato |
BIGNUMERIC
|
Non supportato |
INTERVAL
|
Non supportato |
GEOGRAPHY
|
Non supportato |
L'esempio seguente mostra come importare la libreria integrata e utilizzarla per compilare un parametro di input di tipo INT64 e un parametro di input di tipo ARRAY<STRUCT<a INT64, b STRING>> nel codice PySpark:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc(num INT64, info ARRAY<STRUCT<a INT64, b STRING>>) WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession from bigquery.spark.procedure import SparkProcParamContext def check_in_param(x, num): return x['a'] + num def main(): spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc=spark.sparkContext spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Get the input parameter num of type INT64 num = spark_proc_param_context.num # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>> info = spark_proc_param_context.info # Pass the parameter to executors df = sc.parallelize(info) value = df.map(lambda x : check_in_param(x, num)).sum() main() """
Nel codice Java o Scala, puoi ottenere i parametri di input della stored procedure per Spark tramite le variabili di ambiente nel driver e negli executor Spark. Il nome della variabile di ambiente ha il formato
BIGQUERY_PROC_PARAM.PARAMETER_NAME
, dove PARAMETER_NAME
è il nome del
parametro di input. Ad esempio, se il nome del parametro di input è var, il nome della variabile di ambiente corrispondente è BIGQUERY_PROC_PARAM.var
.
Nel codice Java o Scala, puoi ottenere il valore parametro di input dalla variabile di ambiente.
Il seguente esempio mostra come ottenere il valore di un parametro di input dalle variabili di ambiente nel codice Scala:
val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get
Il seguente esempio mostra come ottenere i parametri di input dalle variabili di ambiente nel codice Java:
String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");
Passare i valori come parametri OUT
e INOUT
I parametri di output restituiscono il valore della procedura Spark, mentre il parametro INOUT
accetta un valore per la procedura e restituisce un valore dalla procedura.
Per utilizzare i parametri OUT
e INOUT
, aggiungi la parola chiave OUT
o INOUT
prima del nome del parametro quando crei la procedura Spark. Nel codice PySpark, utilizzi la libreria integrata per restituire un valore come parametro OUT
o INOUT
. Come i parametri di input, la libreria integrata supporta la maggior parte dei tipi di dati BigQuery, ad eccezione di INTERVAL
, GEOGRAPHY
, NUMERIC
e BIGNUMERIC
. I valori di tipo TIME
e DATETIME
vengono convertiti nel fuso orario UTC
quando vengono restituiti come parametri OUT
o INOUT
.
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.pyspark_proc(IN int INT64, INOUT datetime DATETIME,OUT b BOOL, OUT info ARRAY<STRUCT<a INT64, b STRING>>, OUT time TIME, OUT f FLOAT64, OUT bs BYTES, OUT date DATE, OUT ts TIMESTAMP, OUT js JSON) WITH CONNECTION `my_bq_project.my_dataset.my_connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql.session import SparkSession import datetime from bigquery.spark.procedure import SparkProcParamContext spark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate() spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Reading the IN and INOUT parameter values. int = spark_proc_param_context.int dt = spark_proc_param_context.datetime print("IN parameter value: ", int, ", INOUT parameter value: ", dt) # Returning the value of the OUT and INOUT parameters. spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.b = True spark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}] spark_proc_param_context.time = datetime.time(23, 20, 50, 520000) spark_proc_param_context.f = 20.23 spark_proc_param_context.bs = b"hello" spark_proc_param_context.date = datetime.date(1985, 4, 12) spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc) spark_proc_param_context.js = {"name": "Alice", "age": 30} """;
Leggi da una tabella Hive Metastore e scrivi i risultati in BigQuery
L'esempio seguente mostra come trasformare una tabella Hive Metastore e scrivere i risultati in BigQuery:
CREATE OR REPLACE PROCEDURE my_bq_project.my_dataset.spark_proc() WITH CONNECTION `my-project-id.us.my-connection` OPTIONS(engine="SPARK", runtime_version="2.2") LANGUAGE PYTHON AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL Dataproc Hive Metastore integration test example") \ .enableHiveSupport() \ .getOrCreate() spark.sql("CREATE DATABASE IF NOT EXISTS records") spark.sql("CREATE TABLE IF NOT EXISTS records.student (eid int, name String, score int)") spark.sql("INSERT INTO records.student VALUES (1000000, 'AlicesChen', 10000)") df = spark.sql("SELECT * FROM records.student") df.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("records_dataset.student") """
Visualizzare i filtri dei log
Dopo aver chiamato una stored procedure per Spark, puoi
visualizzare le informazioni del log. Per ottenere le informazioni sul filtro Cloud Logging
e l'endpoint del cluster Spark History, utilizza il comando bq
show
.
Le informazioni sul filtro sono disponibili nel campo SparkStatistics
del job secondario. Per ottenere i filtri dei log:
Vai alla pagina BigQuery.
Nell'editor di query, elenca i job secondari del job script della stored procedure:
bq ls -j --parent_job_id=$parent_job_id
Per scoprire come ottenere l'ID job, vedi Visualizzare i dettagli del job.
L'output è simile al seguente:
jobId Job Type State Start Time Duration ---------------------------------------------- --------- --------- --------------- ---------------- script_job_90fb26c32329679c139befcc638a7e71_0 query SUCCESS 07 Sep 18:00:27 0:05:15.052000
Identifica
jobId
per la tua stored procedure e utilizza il comandobq show
per visualizzare i dettagli del job:bq show --format=prettyjson --job $child_job_id
Copia il campo
sparkStatistics
perché ti servirà in un altro passaggio.L'output è simile al seguente:
{ "configuration": {...} … "statistics": { … "query": { "sparkStatistics": { "loggingInfo": { "projectId": "myproject", "resourceType": "myresource" }, "sparkJobId": "script-job-90f0", "sparkJobLocation": "us-central1" }, … } } }
Per la registrazione, genera filtri di log con i campi
SparkStatistics
:resource.type = sparkStatistics.loggingInfo.resourceType resource.labels.resource_container=sparkStatistics.loggingInfo.projectId resource.labels.spark_job_id=sparkStatistics.sparkJobId resource.labels.location=sparkStatistics.sparkJobLocation
I log vengono scritti nella risorsa
bigquery.googleapis.com/SparkJob
monitorata. I log sono etichettati in base ai componentiINFO
,DRIVER
eEXECUTOR
. Per filtrare i log dal driver Spark, aggiungi il componentelabels.component = "DRIVER"
ai filtri dei log. Per filtrare i log dall'executor Spark, aggiungi il componentelabels.component = "EXECUTOR"
ai filtri dei log.
Utilizzare la chiave di crittografia gestita dal cliente
La procedura BigQuery Spark utilizza la chiave di crittografia gestita dal cliente (CMEK) per proteggere i tuoi contenuti, insieme alla crittografia predefinita fornita da BigQuery. Per utilizzare la chiave CMEK nella procedura Spark, attiva prima la creazione del account di servizio di crittografia BigQuery e concedi le autorizzazioni richieste. La procedura Spark supporta anche i criteri dell'organizzazione CMEK se vengono applicati al tuo progetto.
Se la tua stored procedure utilizza la modalità di sicurezza INVOKER
, la CMEK deve essere specificata tramite la variabile di sistema SQL quando viene chiamata la procedura. In caso contrario, la tua CMEK può essere specificata tramite la connessione associata alla stored procedure.
Per specificare la CMEK tramite la connessione quando crei una stored procedure Spark, utilizza il seguente codice campione:
bq mk --connection --connection_type='SPARK' \ --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \ --project_id=PROJECT_ID \ --location=LOCATION \ CONNECTION_NAME
Per specificare CMEK tramite la variabile di sistema SQL quando chiami la procedura, utilizza il seguentecodice campioneo:
SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
Utilizzare i Controlli di servizio VPC
I Controlli di servizio VPC ti consentono di configurare un perimetro sicuro per proteggerti dall'esfiltrazione di dati. Per utilizzare Controlli di servizio VPC con una procedura Spark per una maggiore sicurezza, crea prima un perimetro di servizio.
Per proteggere completamente i job della procedura Spark, aggiungi le seguenti API al perimetro di servizio:
- API BigQuery (
bigquery.googleapis.com
) - API Cloud Logging (
logging.googleapis.com
) - API Cloud Storage (
storage.googleapis.com
), se utilizzi Cloud Storage - API Artifact Registry (
artifactregistry.googleapis.com
) o API Container Registry (containerregistry.googleapis.com
), se utilizzi un container personalizzato - API Dataproc Metastore (
metastore.googleapis.com
) e API Cloud Run Admin (run.googleapis.com
), se utilizzi Dataproc Metastore
Aggiungi il progetto di query della procedura Spark al perimetro. Aggiungi al perimetro altri progetti che ospitano il codice o i dati Spark.
Best practice
Quando utilizzi una connessione nel tuo progetto per la prima volta, il provisioning richiede circa un minuto in più. Per risparmiare tempo, puoi riutilizzare una connessione Spark esistente quando crei una stored procedure per Spark.
Quando crei una procedura Spark per l'utilizzo in produzione, Google consiglia di specificare una versione del runtime. Per un elenco delle versioni del runtime supportate, consulta Versioni del runtime di Dataproc Serverless. Ti consigliamo di utilizzare la versione Long-Time-Support (LTS).
Quando specifichi un container personalizzato in una procedura Spark, ti consigliamo di utilizzare Artifact Registry e lo streaming delle immagini.
Per un rendimento migliore, puoi specificare le proprietà di allocazione delle risorse nella procedura Spark. Le stored procedure Spark supportano un elenco di proprietà di allocazione delle risorse identico a Dataproc Serverless.
Limitazioni
- Puoi utilizzare solo il protocollo dell'endpoint gRPC per connetterti a Dataproc Metastore. Altri tipi di Hive Metastore non sono ancora supportati.
- Le chiavi di crittografia gestite dal cliente (CMEK) sono disponibili solo quando i clienti
creano procedure Spark a una sola regione. Le chiavi CMEK della regione globale e le chiavi CMEK multiregionali, ad esempio
EU
oUS
, non sono supportate. - Il passaggio dei parametri di output è supportato solo per PySpark.
- Se il set di dati associato alla stored procedure per Spark viene replicato in una regione di destinazione tramite la replica dei set di dati tra regioni, la stored procedure può essere interrogata solo nella regione in cui è stata creata.
- Spark non supporta l'accesso agli endpoint HTTP nella tua rete Controlli di servizio VPC privata.
Quote e limiti
Per informazioni su quote e limiti, consulta procedure archiviate per quote e limiti di Spark.
Passaggi successivi
- Scopri come visualizzare una stored procedure.
- Scopri come eliminare una stored procedure.
- Scopri come utilizzare una stored procedure SQL.