spark-bigquery-connector viene utilizzato con Apache Spark per leggere e scrivere dati da e in BigQuery. Il connettore sfrutta l'API BigQuery Storage durante la lettura dei dati da BigQuery.
Questo tutorial fornisce informazioni sulla disponibilità del connettore preinstallato e mostra come rendere disponibile una versione specifica del connettore per i job Spark. Il codice di esempio mostra come utilizzare il connettore Spark BigQuery all'interno di un'applicazione Spark.
Utilizzare il connettore preinstallato
Il connettore Spark BigQuery è preinstallato ed è disponibile per i job Spark eseguiti su cluster Dataproc creati con versioni dell'immagine 2.1
e successive. La versione del connettore preinstallato è elencata nella pagina di rilascio di ogni versione dell'immagine. Ad esempio, la riga Connettore BigQuery nella pagina
Versioni di rilascio delle immagini 2.2.x
mostra la versione del connettore installata nelle ultime
versioni delle immagini 2.2.
Rendere disponibile una versione specifica del connettore per i job Spark
Se vuoi utilizzare una versione del connettore diversa da una versione preinstallata in un cluster di versioni immagine 2.1
o successive oppure se vuoi installare il connettore in un cluster di versioni immagine precedenti a 2.1
, segui le istruzioni riportate in questa sezione.
Importante:la versione spark-bigquery-connector
deve essere compatibile con
la versione dell'immagine del cluster Dataproc. Consulta la
matrice di compatibilità del connettore con l'immagine Dataproc.
Cluster con versione dell'immagine 2.1
e successive
Quando crei un cluster Dataproc
con una versione immagine 2.1
o successiva, specifica la
versione del connettore come metadati del cluster.
Esempio dell'interfaccia a riga della gcloud CLI:
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=2.2 \ --metadata=SPARK_BQ_CONNECTOR_VERSION or SPARK_BQ_CONNECTOR_URL\ other flags
Note:
SPARK_BQ_CONNECTOR_VERSION: specifica una versione del connettore. Le versioni del connettore Spark BigQuery sono elencate nella pagina spark-bigquery-connector/releases su GitHub.
Esempio:
--metadata=SPARK_BQ_CONNECTOR_VERSION=0.42.1
SPARK_BQ_CONNECTOR_URL: specifica un URL che rimanda al file JAR in Cloud Storage. Puoi specificare l'URL di un connettore elencato nella colonna link in Download e utilizzo del connettore in GitHub o il percorso di una posizione Cloud Storage in cui hai inserito un file JAR del connettore personalizzato.
Esempi:
--metadata=SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar --metadata=SPARK_BQ_CONNECTOR_URL=gs://PATH_TO_CUSTOM_JAR
2.0
e cluster di versioni precedenti dell'immagine
Puoi rendere disponibile il connettore Spark BigQuery per la tua applicazione in uno dei seguenti modi:
Installa spark-bigquery-connector nella directory dei file JAR di Spark di ogni nodo utilizzando l'azione di inizializzazione dei connettori Dataproc quando crei il cluster.
Fornisci l'URL del file JAR del connettore quando invii il job al cluster utilizzando la console Google Cloud , gcloud CLI o l'API Dataproc.
Console
Utilizza l'elemento File JAR del job Spark nella pagina Invia un job di Dataproc.
gcloud
Utilizza il flag
gcloud dataproc jobs submit spark --jars
.API
Utilizza il campo
SparkJob.jarFileUris
.Come specificare il file JAR del connettore quando esegui job Spark su cluster con versioni immagine precedenti alla 2.0
- Specifica il file jar del connettore sostituendo le informazioni sulla versione di Scala e del connettore
nella seguente stringa URI:
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
- Utilizzare Scala
2.12
con le versioni immagine di Dataproc1.5+
Esempio gcloud CLI:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job args
- Utilizza Scala
2.11
con le versioni immagine di Dataproc1.4
e precedenti: Esempio gcloud CLI:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
- Specifica il file jar del connettore sostituendo le informazioni sulla versione di Scala e del connettore
nella seguente stringa URI:
Includi il file JAR del connettore nell'applicazione Spark Scala o Java come dipendenza (vedi Compilazione rispetto al connettore).
Calcola i costi
In questo documento, utilizzi i seguenti componenti fatturabili di Google Cloud:
- Dataproc
- BigQuery
- Cloud Storage
Per generare una stima dei costi in base all'utilizzo previsto,
utilizza il calcolatore prezzi.
Leggere e scrivere dati da e verso BigQuery
Questo esempio legge i dati da BigQuery in un DataFrame Spark per eseguire un conteggio delle parole utilizzando l'API dell'origine dati standard.
Il connettore scrive i dati in BigQuery eseguendo prima il buffering di tutti i dati in una tabella temporanea di Cloud Storage. Quindi, copia tutti i dati in BigQuery in un'unica operazione. Il
connettore tenta di eliminare i file temporanei una volta che l'operazione di caricamento di BigQuery
è riuscita e di nuovo quando l'applicazione Spark termina.
Se il job non va a buon fine, rimuovi tutti i file temporanei rimanenti di Cloud Storage. In genere, i file BigQuery temporanei
si trovano in gs://[bucket]/.spark-bigquery-[jobid]-[UUID]
.
Configura la fatturazione
Per impostazione predefinita, il progetto associato alle credenziali o al account di servizio viene fatturato per l'utilizzo dell'API. Per fatturare un progetto diverso, imposta la seguente
configurazione: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Può anche essere aggiunto a un'operazione di lettura o scrittura, come segue:
.option("parentProject", "<BILLED-GCP-PROJECT>")
.
Esegui il codice
Prima di eseguire questo esempio, crea un set di dati denominato "wordcount_dataset" o modifica il set di dati di output nel codice in un set di dati BigQuery esistente nel tuo progettoGoogle Cloud .
Utilizza il
comando bq per creare
wordcount_dataset
:
bq mk wordcount_dataset
Utilizza il comando Google Cloud CLI per creare un bucket Cloud Storage, che verrà utilizzato per l'esportazione in BigQuery:
gcloud storage buckets create gs://[bucket]
Scala
- Esamina il codice e sostituisci il segnaposto [bucket] con il bucket Cloud Storage che hai creato in precedenza.
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = (spark.read.format("bigquery") .option("table","bigquery-public-data:samples.shakespeare") .load() .cache()) wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .option("table","wordcount_dataset.wordcount_output") .save())
- Esegui il codice sul tuo cluster
- Utilizza SSH per connetterti al nodo master del cluster Dataproc
- Vai alla pagina
Cluster Dataproc
nella console Google Cloud , quindi fai clic sul nome del cluster
- Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Poi, fai clic su
SSH
a destra del nome del nodo master del cluster>
Si apre una finestra del browser nella tua home directory sul nodo masterConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Vai alla pagina
Cluster Dataproc
nella console Google Cloud , quindi fai clic sul nome del cluster
- Crea
wordcount.scala
con l'editor di testo preinstallatovi
,vim
onano
, quindi incolla il codice Scala dall'elenco di codice Scala.nano wordcount.scala
- Avvia il REPL
spark-shell
.$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
- Esegui wordcount.scala con il comando
:load wordcount.scala
per creare la tabella BigQuerywordcount_output
. L'elenco di output mostra 20 righe dell'output del conteggio parole.:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Per visualizzare l'anteprima della tabella di output, apri la paginaBigQuery
, seleziona la tabellawordcount_output
e poi fai clic su Anteprima.
- Utilizza SSH per connetterti al nodo master del cluster Dataproc
PySpark
- Esamina il codice e sostituisci il segnaposto [bucket] con il bucket Cloud Storage che hai creato in precedenza.
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Esegui il codice sul cluster
- Utilizza SSH per connetterti al nodo master del cluster Dataproc
- Vai alla pagina
Cluster Dataproc
nella console Google Cloud , quindi fai clic sul nome del cluster
- Nella pagina Dettagli cluster, seleziona la scheda Istanze VM. Quindi, fai clic su
SSH
a destra del nome del nodo master del cluster
Si apre una finestra del browser nella tua home directory sul nodo masterConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Vai alla pagina
Cluster Dataproc
nella console Google Cloud , quindi fai clic sul nome del cluster
- Crea
wordcount.py
con l'editor di testovi
,vim
onano
preinstallato, poi incolla il codice PySpark dall'elenco di codici PySpark.nano wordcount.py
- Esegui wordcount con
spark-submit
per creare la tabella BigQuerywordcount_output
. L'elenco di output mostra 20 righe dell'output del conteggio parole.spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Per visualizzare l'anteprima della tabella di output, apri la paginaBigQuery
, seleziona la tabellawordcount_output
e poi fai clic su Anteprima.
- Utilizza SSH per connetterti al nodo master del cluster Dataproc
Passaggi successivi
- Consulta BigQuery Storage e Spark SQL - Python.
- Scopri come creare un file di definizione della tabella per un'origine dati esterna.
- Scopri come eseguire query sui dati partizionati esternamente.
- Vedi Suggerimenti per l'ottimizzazione del job Spark.