搭配使用 BigQuery 連接器和 Dataproc Serverless for Spark

搭配使用 spark-bigquery-connectorApache Spark,即可在 BigQuery 中讀取及寫入資料。本教學課程將示範使用 spark-bigquery-connector 的 PySpark 應用程式。

搭配工作負載使用 BigQuery 連接器

請參閱 Dataproc Serverless for Spark 執行階段版本,瞭解在批次工作負載執行階段版本中安裝的 BigQuery 連接器版本。如果沒有列出連接器,請參閱下一節的操作說明,瞭解如何讓應用程式使用連接器。

如何搭配使用連接器和 Spark 執行階段 2.0 版

BigQuery 連接器並未安裝在 Spark 執行階段 2.0 版中。使用 Spark 2.0 以上版本時,您可以透過下列其中一種方式,讓應用程式使用連接器:

  • 提交 Dataproc Serverless for Spark 批次工作負載時,請使用 jars 參數指向連接器 jar 檔案。以下範例會指定連接器 jar 檔案 (如要查看可用的連接器 jar 檔案清單,請參閱 GitHub 上的 GoogleCloudDataproc/spark-bigquery-connector 存放區)。
    • Google Cloud CLI 範例:
      gcloud dataproc batches submit pyspark \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \
          ... other args
      
  • 在 Spark 應用程式中加入連接器 JAR 檔案做為依附元件 (請參閱「針對連接器進行編譯」)

計算費用

本教學課程使用 Google Cloud的計費元件,包括:

  • Dataproc Serverless
  • BigQuery
  • Cloud Storage

使用 Pricing Calculator 可根據您的預測使用量來產生預估費用。

新的 Cloud Platform 使用者可能符合申請免費試用的資格。

BigQuery I/O

此範例會將 BigQuery 中的資料讀取到 Spark DataFrame,以便使用標準資料來源 API 執行字數計算。

連接器會將 wordcount 輸出內容寫入 BigQuery,如下所示:

  1. 將資料緩衝至 Cloud Storage 值區中的暫存檔案

  2. 透過單一作業,將資料從 Cloud Storage 值區複製到 BigQuery

  3. 在 BigQuery 載入作業完成後刪除 Cloud Storage 中的臨時檔案 (Spark 應用程式終止後也會刪除臨時檔案)。如果刪除作業失敗,您必須刪除所有不需要的臨時 Cloud Storage 檔案,這些檔案通常會放在 gs://YOUR_BUCKET/.spark-bigquery-JOB_ID-UUID 中。

設定帳單

根據預設,系統會針對與憑證或服務帳戶相關聯的專案收取 API 使用費。如要對其他專案收費,請設定以下設定:spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")

您也可以新增至讀取或寫入作業,如下所示:.option("parentProject", "<BILLED-GCP-PROJECT>")

提交 PySpark 字詞計數批次工作負載

執行 Spark 批次工作負載,計算公開資料集中的字詞數量。

  1. 開啟本機終端機或 Cloud Shell
  2. 在本機終端機或 Cloud Shell 中,使用 bq 指令列工具建立 wordcount_dataset
    bq mk wordcount_dataset
    
  3. 使用 Google Cloud CLI 建立 Cloud Storage 值區。
    gcloud storage buckets create gs://YOUR_BUCKET
    
    YOUR_BUCKET 替換為您建立的 Cloud Storage 值區名稱。
  4. 在文字編輯器中複製下列 PySpark 程式碼,以在本機建立 wordcount.py 檔案。
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "YOUR_BUCKET"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
  5. 提交 PySpark 批次工作負載:
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=REGION \
        --deps-bucket=YOUR_BUCKET
    
    終端機輸出內容範例:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    如要預覽 Google Cloud 控制台中的輸出資料表,請開啟專案的 BigQuery 頁面,選取 wordcount_output 資料表,然後按一下「Preview」(預覽)

瞭解詳情