搭配使用 spark-bigquery-connector 和 Apache Spark,即可在 BigQuery 中讀取及寫入資料。本教學課程將示範使用 spark-bigquery-connector
的 PySpark 應用程式。
搭配工作負載使用 BigQuery 連接器
請參閱 Dataproc Serverless for Spark 執行階段版本,瞭解在批次工作負載執行階段版本中安裝的 BigQuery 連接器版本。如果沒有列出連接器,請參閱下一節的操作說明,瞭解如何讓應用程式使用連接器。
如何搭配使用連接器和 Spark 執行階段 2.0 版
BigQuery 連接器並未安裝在 Spark 執行階段 2.0 版中。使用 Spark 2.0 以上版本時,您可以透過下列其中一種方式,讓應用程式使用連接器:
- 提交 Dataproc Serverless for Spark 批次工作負載時,請使用
jars
參數指向連接器 jar 檔案。以下範例會指定連接器 jar 檔案 (如要查看可用的連接器 jar 檔案清單,請參閱 GitHub 上的 GoogleCloudDataproc/spark-bigquery-connector 存放區)。- Google Cloud CLI 範例:
gcloud dataproc batches submit pyspark \ --region=region \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \ ... other args
- Google Cloud CLI 範例:
- 在 Spark 應用程式中加入連接器 JAR 檔案做為依附元件 (請參閱「針對連接器進行編譯」)
計算費用
本教學課程使用 Google Cloud的計費元件,包括:
- Dataproc Serverless
- BigQuery
- Cloud Storage
使用 Pricing Calculator 可根據您的預測使用量來產生預估費用。
BigQuery I/O
此範例會將 BigQuery 中的資料讀取到 Spark DataFrame,以便使用標準資料來源 API 執行字數計算。
連接器會將 wordcount 輸出內容寫入 BigQuery,如下所示:
將資料緩衝至 Cloud Storage 值區中的暫存檔案
透過單一作業,將資料從 Cloud Storage 值區複製到 BigQuery
在 BigQuery 載入作業完成後刪除 Cloud Storage 中的臨時檔案 (Spark 應用程式終止後也會刪除臨時檔案)。如果刪除作業失敗,您必須刪除所有不需要的臨時 Cloud Storage 檔案,這些檔案通常會放在
gs://YOUR_BUCKET/.spark-bigquery-JOB_ID-UUID
中。
設定帳單
根據預設,系統會針對與憑證或服務帳戶相關聯的專案收取 API 使用費。如要對其他專案收費,請設定以下設定:spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
。
您也可以新增至讀取或寫入作業,如下所示:.option("parentProject", "<BILLED-GCP-PROJECT>")
。
提交 PySpark 字詞計數批次工作負載
執行 Spark 批次工作負載,計算公開資料集中的字詞數量。
- 開啟本機終端機或 Cloud Shell
- 在本機終端機或 Cloud Shell 中,使用 bq 指令列工具建立
wordcount_dataset
。bq mk wordcount_dataset
- 使用 Google Cloud CLI 建立 Cloud Storage 值區。
將gcloud storage buckets create gs://YOUR_BUCKET
YOUR_BUCKET
替換為您建立的 Cloud Storage 值區名稱。 - 在文字編輯器中複製下列 PySpark 程式碼,以在本機建立
wordcount.py
檔案。#!/usr/bin/python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "YOUR_BUCKET" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Saving the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- 提交 PySpark 批次工作負載:
終端機輸出內容範例:gcloud dataproc batches submit pyspark wordcount.py \ --region=REGION \ --deps-bucket=YOUR_BUCKET
... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
如要預覽 Google Cloud 控制台中的輸出資料表,請開啟專案的 BigQuery 頁面,選取wordcount_output
資料表,然後按一下「Preview」(預覽)。