Der spark-bigquery-connector wird mit Apache Spark verwendet, um Daten aus BigQuery zu lesen und zu schreiben. Der Connector nutzt beim Lesen von Daten aus BigQuery die BigQuery Storage API.
In dieser Anleitung finden Sie Informationen zur Verfügbarkeit des vorinstallierten Connectors und erfahren, wie Sie eine bestimmte Connectorversion für Spark-Jobs verfügbar machen. Beispielcode zeigt, wie Sie den Spark BigQuery-Connector in einer Spark-Anwendung verwenden.
Vorinstallierten Connector verwenden
Der Spark BigQuery-Connector ist auf Spark-Jobs vorinstalliert und für sie verfügbar, die auf Dataproc-Clustern ausgeführt werden, die mit Image-Versionen 2.1
und höher erstellt wurden. Die vorinstallierte Connector-Version ist auf der jeweiligen Release-Seite der Image-Version aufgeführt. In der Zeile BigQuery Connector auf der Seite 2.2.x-Image-Releaseversionen wird beispielsweise die Connectorversion angezeigt, die in den neuesten 2.2-Image-Releases installiert ist.
Bestimmte Connectorversion für Spark-Jobs verfügbar machen
Wenn Sie eine Connector-Version verwenden möchten, die sich von einer vorinstallierten Version in einem Cluster mit der Image-Version 2.1
oder höher unterscheidet, oder wenn Sie den Connector in einem Cluster mit einer Image-Version vor 2.1
installieren möchten, folgen Sie der Anleitung in diesem Abschnitt.
Wichtig:Die spark-bigquery-connector
-Version muss mit der Image-Version des Dataproc-Clusters kompatibel sein. Weitere Informationen finden Sie in der Kompatibilitätsmatrix für den Connector zu Dataproc-Images.
Cluster mit Image-Version 2.1
und höher
Wenn Sie einen Dataproc-Cluster mit der Image-Version 2.1
oder höher erstellen, geben Sie die Connectorversion als Clustermetadaten an.
Beispiel für die gcloud-Befehlszeile:
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=2.2 \ --metadata=SPARK_BQ_CONNECTOR_VERSION or SPARK_BQ_CONNECTOR_URL\ other flags
Hinweise:
SPARK_BQ_CONNECTOR_VERSION: Geben Sie eine Connectorversion an. Spark BigQuery-Connector-Versionen sind auf der Seite spark-bigquery-connector/releases auf GitHub aufgeführt.
Beispiel:
--metadata=SPARK_BQ_CONNECTOR_VERSION=0.42.1
SPARK_BQ_CONNECTOR_URL: Geben Sie eine URL an, die auf das JAR-File in Cloud Storage verweist. Sie können die URL eines Connectors angeben, der in der Spalte Link im Abschnitt Connector herunterladen und verwenden in GitHub aufgeführt ist, oder den Pfad zu einem Cloud Storage-Speicherort, an dem Sie ein benutzerdefiniertes Connector-JAR abgelegt haben.
Beispiele:
--metadata=SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar --metadata=SPARK_BQ_CONNECTOR_URL=gs://PATH_TO_CUSTOM_JAR
Cluster mit der Image-Version 2.0
und älter
Sie haben folgende Möglichkeiten, den Spark BigQuery-Connector für Ihre Anwendung verfügbar zu machen:
Installieren Sie den Spark-bigquery-Connector im Spark-JAR-Verzeichnis jedes Knotens. Verwenden Sie dazu beim Erstellen des Clusters die Initialisierungsaktion für Dataproc-Connectors.
Geben Sie die JAR-URL des Connectors an, wenn Sie den Job über die Google Cloud Console, die gcloud-Befehlszeile oder die Dataproc API an den Cluster senden.
Console
Verwenden Sie den Spark-Job-Artikel Jar-Dateien auf der Dataproc-Seite Job senden.
gcloud
Verwenden Sie das Flag
gcloud dataproc jobs submit spark --jars
.API
Verwenden Sie das Feld
SparkJob.jarFileUris
.Connector-JAR-Datei angeben, wenn Spark-Jobs in Clustern mit einer Image-Version vor 2.0 ausgeführt werden
- Geben Sie die JAR-Datei des Connectors an, indem Sie die Informationen zur Scala- und Connector-Version im folgenden URI-String ersetzen:
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
- Scala
2.12
mit Dataproc-Image-Versionen1.5+
verwenden Beispiel für die gcloud CLI:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job args
- Scala
2.11
mit Dataproc-Image-Versionen1.4
und niedriger verwenden: Beispiel für die gcloud CLI:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
- Geben Sie die JAR-Datei des Connectors an, indem Sie die Informationen zur Scala- und Connector-Version im folgenden URI-String ersetzen:
Schließen Sie die JAR-Datei des Connectors in Ihre Scala- oder Java Spark-Anwendung als Abhängigkeit ein (siehe Für den Connector kompilieren).
Kosten berechnen
In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:
- Dataproc
- BigQuery
- Cloud Storage
Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.
Daten von und nach BigQuery lesen und schreiben
In diesem Beispiel werden Daten aus BigQuery in einen Spark-DataFrame eingelesen und dann mit der Standard-Datenquellen-API einer Wortzählung unterzogen.
Der Connector schreibt die Daten zuerst in BigQuery, indem er zuerst alle Daten in eine temporäre Tabelle von Cloud Storage zwischenspeichert. Anschließend werden alle Daten in einem Vorgang in BigQuery kopiert. Sobald der BigQuery-Ladevorgang erfolgreich war, versucht der Connector, die temporären Dateien zu löschen.
Wenn der Job fehlschlägt, entfernen Sie alle verbleibenden temporären Cloud Storage-Dateien. In der Regel befinden sich temporäre BigQuery-Dateien in gs://[bucket]/.spark-bigquery-[jobid]-[UUID]
.
Abrechnung konfigurieren
Standardmäßig wird das Projekt, das mit den Anmeldeinformationen oder dem Servicekonto verbunden ist, für die API-Nutzung abgerechnet. Wenn Sie ein anderes Projekt in Rechnung stellen möchten, legen Sie die folgende Konfiguration fest: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Sie kann auch folgendermaßen einem Lese-/Schreibvorgang hinzugefügt werden: .option("parentProject", "<BILLED-GCP-PROJECT>")
.
Code ausführen
Bevor Sie dieses Beispiel ausführen, erstellen Sie ein Dataset mit dem Namen „wordcount_dataset“ oder ändern Sie das Ausgabe-Dataset im Code in ein vorhandenes BigQuery-Dataset in IhremGoogle Cloud -Projekt.
Verwenden Sie den bq-Befehl zum Erstellen des wordcount_dataset
:
bq mk wordcount_dataset
Verwenden Sie den Befehl der Google Cloud CLI, um einen Cloud Storage-Bucket zu erstellen, der für den Export nach BigQuery verwendet wird:
gcloud storage buckets create gs://[bucket]
Scala
- Untersuchen Sie den Code und ersetzen Sie den Platzhalter [bucket] durch den zuvor erstellten Cloud Storage-Bucket.
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = (spark.read.format("bigquery") .option("table","bigquery-public-data:samples.shakespeare") .load() .cache()) wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .option("table","wordcount_dataset.wordcount_output") .save())
- Code in einem Cluster ausführen
- SSH-Verbindung zum Masterknoten des Dataproc-Clusters herstellen
- Rufen Sie in der Google Cloud Console die Seite Dataproc-Cluster auf und klicken Sie auf den Namen Ihres Clusters.
- Wählen Sie auf der Seite > Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Cluster-Masterknotens auf
SSH
.
Ein Browserfenster wird in Ihrem Basisverzeichnis auf dem Masterknoten geöffnet.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Rufen Sie in der Google Cloud Console die Seite Dataproc-Cluster auf und klicken Sie auf den Namen Ihres Clusters.
- Erstellen Sie
wordcount.scala
mit dem vorinstallierten Texteditorvi
,vim
odernano
und fügen Sie dann den Scala-Code aus der Scala-Code-Liste ein.nano wordcount.scala
- Starten Sie die
spark-shell
-REPL.$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
- Führen Sie wordcount.scala mit dem
:load wordcount.scala
-Befehl aus, um die BigQuery-wordcount_output
-Tabelle zu erstellen. Die Ausgabeliste zeigt 20 Zeilen von der Wordcount-Ausgabe an.:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Wenn Sie eine Vorschau der Ausgabetabelle aufrufen möchten, öffnen Sie die SeiteBigQuery
, wählen Sie die Tabellewordcount_output
aus und klicken Sie dann auf Vorschau.
- SSH-Verbindung zum Masterknoten des Dataproc-Clusters herstellen
PySpark
- Untersuchen Sie den Code und ersetzen Sie den Platzhalter [bucket] durch den zuvor erstellten Cloud Storage-Bucket.
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Code in Ihrem Cluster ausführen
- SSH-Verbindung zum Masterknoten des Dataproc-Clusters herstellen
- Rufen Sie in der Google Cloud Console die Seite Dataproc-Cluster auf und klicken Sie auf den Namen Ihres Clusters.
- Wählen Sie auf der Seite Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Cluster-Masterknotens auf
SSH
.
Ein Browserfenster wird in Ihrem Basisverzeichnis auf dem Masterknoten geöffnet.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Rufen Sie in der Google Cloud Console die Seite Dataproc-Cluster auf und klicken Sie auf den Namen Ihres Clusters.
- Erstellen Sie
wordcount.py
mit dem vorinstallierten Texteditorvi
,vim
odernano
und fügen Sie dann den PySpark-Code aus der PySpark-Codeliste ein.nano wordcount.py
- Führen Sie Wordcount mit
spark-submit
aus, um die BigQuery-wordcount_output
-Tabelle zu erstellen. Die Ausgabeliste zeigt 20 Zeilen von der Wordcount-Ausgabe an.spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Wenn Sie eine Vorschau der Ausgabetabelle aufrufen möchten, öffnen Sie die SeiteBigQuery
, wählen Sie die Tabellewordcount_output
aus und klicken Sie dann auf Vorschau.
- SSH-Verbindung zum Masterknoten des Dataproc-Clusters herstellen
Weitere Informationen
- BigQuery Storage und Spark SQL – Python
- Tabellendefinitionsdatei für eine externe Datenquelle erstellen
- Extern partitionierte Daten abfragen
- Tipps zur Abstimmung von Spark-Jobs