Menggunakan konektor BigQuery dengan Google Cloud Serverless untuk Apache Spark

Gunakan spark-bigquery-connector dengan Apache Spark untuk membaca dan menulis data dari dan ke BigQuery. Tutorial ini menunjukkan aplikasi PySpark yang menggunakan spark-bigquery-connector.

Menggunakan konektor BigQuery dengan beban kerja Anda

Lihat Rilis runtime Serverless untuk Apache Spark untuk menentukan versi konektor BigQuery yang diinstal di versi runtime workload batch Anda. Jika konektor tidak tercantum, lihat bagian berikutnya untuk mengetahui petunjuk tentang cara membuat konektor tersedia untuk aplikasi.

Cara menggunakan konektor dengan runtime Spark versi 2.0

Konektor BigQuery tidak diinstal di Spark runtime versi 2.0. Saat menggunakan Spark runtime versi 2.0, Anda dapat menyediakan konektor untuk aplikasi dengan salah satu cara berikut:

  • Gunakan parameter jars untuk mengarah ke file jar konektor saat Anda mengirimkan workload batch Serverless untuk Apache Spark. Contoh berikut menentukan file jar konektor (lihat repositori GoogleCloudDataproc/spark-bigquery-connector di GitHub untuk mengetahui daftar file jar konektor yang tersedia). Google Cloud
    • Contoh Google Cloud CLI:
      gcloud dataproc batches submit pyspark \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \
          ... other args
      
  • Sertakan file jar konektor dalam aplikasi Spark Anda sebagai dependensi (lihat Mengompilasi terhadap konektor)

Menghitung biaya

Tutorial ini menggunakan komponen Google Cloudyang dapat ditagih, termasuk:

  • Serverless untuk Apache Spark
  • BigQuery
  • Cloud Storage

Gunakan Kalkulator Harga untuk membuat perkiraan biaya berdasarkan penggunaan yang Anda proyeksikan.

Pengguna baru Cloud Platform mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

I/O BigQuery

Contoh ini membaca data dari BigQuery ke dalam DataFrame Spark untuk melakukan penghitungan kata menggunakan API sumber data standar.

Konektor menulis output wordcount ke BigQuery sebagai berikut:

  1. Melakukan buffering data ke file sementara di bucket Cloud Storage Anda

  2. Menyalin data dalam satu operasi dari bucket Cloud Storage Anda ke BigQuery

  3. Menghapus file sementara di Cloud Storage setelah operasi pemuatan BigQuery selesai (file sementara juga dihapus setelah aplikasi Spark dihentikan). Jika penghapusan gagal, Anda harus menghapus file Cloud Storage sementara yang tidak diinginkan, yang biasanya ditempatkan di gs://YOUR_BUCKET/.spark-bigquery-JOB_ID-UUID.

Mengonfigurasi penagihan

Secara default, project yang terkait dengan kredensial atau akun layanan akan ditagih untuk penggunaan API. Untuk menagih project lain, tetapkan konfigurasi berikut: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Anda juga dapat menambahkan ke operasi baca atau tulis, sebagai berikut: .option("parentProject", "<BILLED-GCP-PROJECT>").

Mengirimkan workload batch wordcount PySpark

Jalankan beban kerja batch Spark yang menghitung jumlah kata dalam set data publik.

  1. Buka terminal lokal atau Cloud Shell
  2. Buat wordcount_dataset dengan alat command line bq di terminal lokal atau di Cloud Shell.
    bq mk wordcount_dataset
    
  3. Buat bucket Cloud Storage dengan Google Cloud CLI.
    gcloud storage buckets create gs://YOUR_BUCKET
    
    Ganti YOUR_BUCKET dengan nama bucket Cloud Storage yang Anda buat.
  4. Buat file wordcount.py secara lokal di editor teks dengan menyalin kode PySpark berikut.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "YOUR_BUCKET"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
  5. Kirimkan workload batch PySpark:
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=REGION \
        --deps-bucket=YOUR_BUCKET
    
    Contoh output terminal:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    Untuk melihat pratinjau tabel output di konsol Google Cloud , buka halaman BigQuery project Anda, pilih tabel wordcount_output, lalu klik Preview.

Untuk informasi selengkapnya