BigQuery-Connector mit Spark verwenden

Der spark-bigquery-connector wird mit Apache Spark verwendet, um Daten aus BigQuery zu lesen und zu schreiben. Diese Anleitung enthält Beispielcode, der den spark-bigquery-connector in einer Spark-Anwendung verwendet. Eine Anleitung zum Erstellen eines Clusters finden Sie in den Dataproc-Kurzanleitungen.

Connector für Ihre Anwendung verfügbar machen

Sie haben folgende Möglichkeiten, den spark-bigquery-Connector für Ihre Anwendung verfügbar zu machen:

  1. Installieren Sie den Spark-bigquery-Connector im Spark-JAR-Verzeichnis jedes Knotens. Verwenden Sie dazu beim Erstellen des Clusters die Initialisierungsaktion für Dataproc-Connectors.

  2. Geben Sie den Connector-URI an, wenn Sie den Job einreichen:

    1. Google Cloud Console:Verwenden Sie das Element „Spark-Job“ Jars files auf der Dataproc-Seite Job senden.
    2. gcloud CLI:Verwenden Sie das Flag gcloud dataproc jobs submit spark --jars.
    3. Dataproc API:Verwenden Sie das Feld SparkJob.jarFileUris.
  3. Schließen Sie die JAR-Datei in Ihre Scala- oder Java Spark-Anwendung als Abhängigkeit ein (siehe Für den Connector kompilieren).

JAR-URI des Connectors angeben

Spark-BigQuery-Connector-Versionen sind im GitHub-Repository GoogleCloudDataproc/spark-bigquery-connector aufgeführt.

Geben Sie die JAR-Datei des Connectors an, indem Sie die Informationen zur Scala- und Connector-Version im folgenden URI-String ersetzen:

gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar

  • Scala 2.12 mit Dataproc-Image-Versionen 1.5+ verwenden

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
    

    Beispiel für die gcloud CLI:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
        -- job-args
    

  • Scala 2.11 mit Dataproc-Image-Versionen 1.4 und niedriger verwenden:

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
    

    Beispiel für die gcloud CLI:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
        -- job-args
    

Kosten berechnen

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

  • Dataproc
  • BigQuery
  • Cloud Storage

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Lese-/Schreibvorgänge in BigQuery

In diesem Beispiel werden Daten aus BigQuery in einen Spark-DataFrame eingelesen und dann mit der Standard-Datenquellen-API einer Wortzählung unterzogen.

Der Connector schreibt die Daten zuerst in BigQuery, indem er zuerst alle Daten in eine temporäre Tabelle von Cloud Storage zwischenspeichert. Anschließend werden alle Daten in einem Vorgang in BigQuery kopiert. Sobald der BigQuery-Ladevorgang erfolgreich war, versucht der Connector, die temporären Dateien zu löschen. Wenn der Job fehlschlägt, entfernen Sie alle verbleibenden temporären Cloud Storage-Dateien. In der Regel befinden sich temporäre BigQuery-Dateien in gs://[bucket]/.spark-bigquery-[jobid]-[UUID].

Abrechnung konfigurieren

Standardmäßig wird das Projekt, das mit den Anmeldeinformationen oder dem Servicekonto verbunden ist, für die API-Nutzung abgerechnet. Wenn Sie ein anderes Projekt in Rechnung stellen möchten, legen Sie die folgende Konfiguration fest: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Sie kann auch folgendermaßen einem Lese-/Schreibvorgang hinzugefügt werden: .option("parentProject", "<BILLED-GCP-PROJECT>").

Code ausführen

Bevor Sie dieses Beispiel ausführen, erstellen Sie ein Dataset mit dem Namen "wordcount_dataset" oder ändern Sie das Ausgabe-Dataset im Code in ein vorhandenes BigQuery-Dataset in Ihrem Google Cloud-Projekt.

Verwenden Sie den bq-Befehl zum Erstellen des wordcount_dataset:

bq mk wordcount_dataset

Verwenden Sie den Befehl der Google Cloud CLI, um einen Cloud Storage-Bucket zu erstellen, der für den Export nach BigQuery verwendet wird:

gcloud storage buckets create gs://[bucket]

Scala

  1. Untersuchen Sie den Code und ersetzen Sie den Platzhalter [bucket] durch den zuvor erstellten Cloud Storage-Bucket.
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      (spark.read.format("bigquery")
      .option("table","bigquery-public-data:samples.shakespeare")
      .load()
      .cache())
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .option("table","wordcount_dataset.wordcount_output")
      .save())
  2. Code in einem Cluster ausführen
    1. SSH-Verbindung zum Masterknoten des Dataproc-Clusters herstellen
      1. Rufen Sie in der Google Cloud Console die Seite Dataproc-Cluster auf und klicken Sie auf den Namen Ihres Clusters.
        Seite „Dataproc-Cluster“ in der Cloud Console
      2. Wählen Sie auf der Seite > Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Cluster-Masterknotens auf SSH.
        Seite mit den Details zum Dataproc-Cluster in der Cloud Console

        Ein Browserfenster wird in Ihrem Basisverzeichnis auf dem Masterknoten geöffnet.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Erstellen Sie wordcount.scala mit dem vorinstallierten Texteditor vi, vim oder nano und fügen Sie dann den Scala-Code aus der Scala-Code-Liste ein.
      nano wordcount.scala
        
    3. Starten Sie die spark-shell-REPL.
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. Führen Sie wordcount.scala mit dem :load wordcount.scala-Befehl aus, um die BigQuery-wordcount_output-Tabelle zu erstellen. Die Ausgabeliste zeigt 20 Zeilen von der Wordcount-Ausgabe an.
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Wenn Sie eine Vorschau der Ausgabetabelle aufrufen möchten, öffnen Sie die Seite BigQuery, wählen Sie die Tabelle wordcount_output aus und klicken Sie dann auf Vorschau.
      Tabelle in der BigQuery Explorer-Seite in der Cloud Console in der Vorschau ansehen

PySpark

  1. Untersuchen Sie den Code und ersetzen Sie den Platzhalter [bucket] durch den zuvor erstellten Cloud Storage-Bucket.
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
  2. Code in Ihrem Cluster ausführen
    1. SSH-Verbindung zum Masterknoten des Dataproc-Clusters herstellen
      1. Rufen Sie in der Google Cloud Console die Seite Dataproc-Cluster auf und klicken Sie auf den Namen Ihres Clusters.
        Seite „Cluster“ in der Cloud Console
      2. Wählen Sie auf der Seite Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Cluster-Masterknotens auf SSH.
        Wählen Sie in der Cloud Console auf der Seite „Clusterdetails“ in der Zeile „Clustername“ die Option „SSH“ aus.

        Ein Browserfenster wird in Ihrem Basisverzeichnis auf dem Masterknoten geöffnet.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Erstellen Sie wordcount.py mit dem vorinstallierten Texteditor vi, vim oder nano und fügen Sie dann den PySpark-Code aus der PySpark-Codeliste ein.
      nano wordcount.py
      
    3. Führen Sie Wordcount mit spark-submit aus, um die BigQuery-wordcount_output-Tabelle zu erstellen. Die Ausgabeliste zeigt 20 Zeilen von der Wordcount-Ausgabe an.
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Wenn Sie eine Vorschau der Ausgabetabelle aufrufen möchten, öffnen Sie die Seite BigQuery, wählen Sie die Tabelle wordcount_output aus und klicken Sie dann auf Vorschau.
      Tabelle in der BigQuery Explorer-Seite in der Cloud Console in der Vorschau ansehen

Weitere Informationen