將 BigLake Metastore 與 Dataproc Serverless 搭配使用

本文說明如何搭配 Dataproc Serverless 使用 BigLake metastore。

事前準備

  1. 為 Google Cloud 專案啟用計費功能。瞭解如何檢查專案是否已啟用計費功能
  2. 啟用 BigQuery 和 Dataproc API。

    啟用 API

  3. 選用:瞭解 BigLake 中繼資料存放區的運作方式,以及使用該存放區的原因。

必要的角色

如要取得使用 Spark 和 Dataproc Serverless 時,以 BigLake metastore 做為中繼資料存放區所需的權限,請要求管理員授予您下列 IAM 角色:

  • 在 Spark 中建立 BigLake Metastore 資料表:
    • Dataproc 工作站 (roles/dataproc.worker) 專案中 Dataproc Serverless 服務帳戶的存取權
    • 專案中 Dataproc Serverless 服務帳戶的「BigQuery 資料編輯者」 (roles/bigquery.dataEditor) 角色
    • 專案中 Dataproc Serverless 服務帳戶的儲存空間物件管理員 (roles/storage.objectAdmin)
  • 在 BigQuery 中查詢 BigLake 中繼存放區資料表:

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

您或許還可透過自訂角色或其他預先定義的角色取得必要權限。

一般工作流程

如要搭配使用 BigQuery 與 Dataproc Serverless,請按照下列一般步驟操作:

  1. 建立檔案,其中包含要在 BigLake Metastore 中執行的指令。
  2. 連結至您選擇的開放原始碼軟體引擎。
  3. 使用您選擇的方法 (例如 Spark SQL 或 PySpark) 提交批次工作。

將 BigLake Metastore 連接至 Spark

下列操作說明詳述如何將 Dataproc Serverless 連線至 BigLake Metastore:

SparkSQL

如要提交 Spark SQL 批次工作,請完成下列步驟。

  1. 建立 SQL 檔案,其中包含要在 BigLake metastore 中執行的 Spark SQL 指令。舉例來說,這個指令會建立命名空間和資料表。

    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    更改下列內容:

    • CATALOG_NAME:參照 Spark 資料表的目錄名稱。
    • NAMESPACE_NAME:參照 Spark 資料表的命名空間名稱。
    • TABLE_NAME:Spark 資料表的資料表名稱。
    • WAREHOUSE_DIRECTORY:Cloud Storage 資料夾的 URI,用於儲存資料倉儲。
  2. 執行下列 gcloud dataproc batches submit spark-sql gcloud CLI 指令,提交 Spark SQL 批次工作:

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
    --project=PROJECT_ID \
    --region=REGION \
    --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
    --deps-bucket=BUCKET_PATH \
    --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    更改下列內容:

    • SQL_SCRIPT_PATH:批次工作使用的 SQL 檔案路徑。
    • PROJECT_ID:要執行批次作業的 Google Cloud 專案 ID。
    • REGION:執行工作負載的區域。
    • SUBNET_NAME:選用:REGION 中虛擬私有雲子網路的名稱,該子網路已啟用私人 Google 存取權,且符合其他工作階段子網路需求
    • LOCATION:執行批次作業的位置。
    • BUCKET_PATH:用來上傳工作負載依附元件的 Cloud Storage 值區位置。WAREHOUSE_FOLDER位於這個值區中。 不需要提供 bucket 的 gs:// URI 前置字串。您可以指定值區路徑或值區名稱,例如 mybucketname1

    如要進一步瞭解如何提交 Spark 批次工作,請參閱「執行 Spark 批次工作負載」。

PySpark

如要提交 PySpark 批次作業,請完成下列步驟。

  1. 建立 Python 檔案,其中包含要在 BigLake 中繼資料存放區執行的 PySpark 指令。

    舉例來說,下列指令會設定 Spark 環境,與儲存在 BigLake 中繼存放區的 Iceberg 資料表互動。接著,指令會在該命名空間中建立新的命名空間和 Iceberg 資料表。

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    更改下列內容:

    • PROJECT_ID:要執行批次作業的 Google Cloud 專案 ID。
    • :BigQuery 資源所在的位置LOCATION
    • CATALOG_NAME:參照 Spark 資料表的目錄名稱。
    • TABLE_NAME:Spark 資料表的資料表名稱。
    • WAREHOUSE_DIRECTORY:Cloud Storage 資料夾的 URI,用於儲存資料倉儲。
    • NAMESPACE_NAME:參照 Spark 資料表的命名空間名稱。
  2. 使用下列 gcloud dataproc batches submit pyspark 指令提交批次工作。

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
     --version=2.2 \
     --project=PROJECT_ID \
     --region=REGION \
     --deps-bucket=BUCKET_PATH \
     --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    更改下列內容:

    • PYTHON_SCRIPT_PATH:批次作業使用的 Python 指令碼路徑。
    • PROJECT_ID:要執行批次作業的 Google Cloud 專案 ID。
    • REGION:執行工作負載的區域。
    • BUCKET_PATH:用來上傳工作負載依附元件的 Cloud Storage 值區位置。不需要提供 bucket 的 gs:// URI 前置字串。您可以指定值區路徑或值區名稱,例如 mybucketname1

    如要進一步瞭解如何提交 PySpark 批次工作,請參閱 PySpark gcloud 參考資料

後續步驟