搭配 Dataproc 使用 BigLake 中繼存放區
本文說明如何搭配使用 BigLake Metastore 與 Dataproc on Compute Engine。這個連線提供單一共用中繼資料存放區,可與 Apache Spark 或 Apache Flink 等開放原始碼軟體引擎搭配使用。
事前準備
- 為 Google Cloud 專案啟用計費功能。瞭解如何檢查專案是否已啟用計費功能。
啟用 BigQuery 和 Dataproc API。
選用:瞭解 BigLake 中繼資料存放區的運作方式,以及使用該存放區的原因。
必要的角色
如要取得使用 Spark 或 Flink 和 Dataproc 所需的權限,並將 BigLake Metastore 做為中繼資料存放區,請要求管理員授予下列 IAM 角色:
-
建立 Dataproc 叢集:
專案中 Compute Engine 預設服務帳戶的 Dataproc 工作站 (
roles/dataproc.worker
) -
在 Spark 或 Flink 中建立 BigLake metastore 資料表:
-
Dataproc 工作站 (
roles/dataproc.worker
) 專案中的 Dataproc VM 服務帳戶 -
專案中 Dataproc VM 服務帳戶的 BigQuery 資料編輯者 (
roles/bigquery.dataEditor
) 角色 -
專案中 Dataproc VM 服務帳戶的儲存空間物件管理員 (
roles/storage.objectAdmin
)
-
Dataproc 工作站 (
-
在 BigQuery 中查詢 BigLake 中繼存放區資料表:
-
專案的「BigQuery 資料檢視者」 (
roles/bigquery.dataViewer
) 角色 -
BigQuery 使用者 (
roles/bigquery.user
) 專案 -
專案的Storage 物件檢視者 (
roles/storage.objectViewer
)
-
專案的「BigQuery 資料檢視者」 (
如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。
一般工作流程
如要搭配 BigLake 中繼存放區使用 Dataproc on Compute Engine,請按照下列一般步驟操作:
- 建立 Dataproc 叢集或設定現有叢集。
- 連線至偏好的開放原始碼軟體引擎,例如 Spark 或 Flink。
- 使用 JAR 檔案在叢集上安裝 Apache Iceberg 目錄外掛程式。
- 視您使用的開放原始碼軟體引擎而定,建立及管理 BigLake metastore 資源。
- 在 BigQuery 中存取及使用 BigLake Metastore 資源。
將 BigLake Metastore 連線至 Spark
下列操作說明將示範如何使用互動式 Spark SQL,將 Dataproc 連線至 BigLake Metastore。
下載 Iceberg 目錄外掛程式
如要將 BigLake Metastore 連接至 Dataproc 和 Spark,您必須使用 BigLake Metastore Iceberg 目錄外掛程式 JAR 檔案。
Dataproc 映像檔 2.2 版預設會包含這個檔案。如果 Dataproc 叢集無法直接存取網際網路,您必須下載外掛程式,並上傳至 Dataproc 叢集可存取的 Cloud Storage 值區。
下載 BigLake metastore Iceberg 目錄外掛程式。
設定 Dataproc 叢集
如要連線至 BigLake Metastore,必須先設定 Dataproc 叢集。
如要這麼做,您可以建立新叢集或使用現有叢集。接著,您可以使用這個叢集執行互動式 Spark SQL,以及管理 BigLake Metastore 資源。
在建立叢集的區域中,子網路必須啟用私人 Google 存取權 (PGA)。 根據預設,使用 2.2 (預設) 或更新版本映像檔建立的 Dataproc 叢集 VM 只會使用內部 IP 位址。如要允許叢集 VM 與 Google API 通訊,請在建立叢集的區域中,對
default
(或使用者指定的網路名稱,如適用) 網路子網路啟用私人 Google 存取權。如要執行本指南中的 Zeppelin 網頁介面範例,請務必使用或建立已啟用 Zeppelin 選用元件的 Dataproc 叢集。
新叢集
如要建立新的 Dataproc 叢集,請執行下列 gcloud
dataproc clusters create
指令。這項設定包含使用 BigLake Metastore 時所需的設定。
gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=LOCATION \ --optional-components=ZEPPELIN \ --enable-component-gateway \ --single-node
更改下列內容:
CLUSTER_NAME
:Dataproc 叢集的名稱。PROJECT_ID
:您要建立叢集的 Google Cloud 專案 ID。LOCATION
:您要建立叢集的 Google Cloud 區域。
現有叢集
如要設定現有叢集,請將下列 Iceberg Spark 執行階段新增至叢集。
org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1
您可以透過下列任一方式新增執行階段:
初始化指令碼。將執行階段依附元件新增至自訂初始化指令碼,該指令碼會在建立時執行。
將執行階段依附元件新增至指令碼後,請按照操作說明重新建立及更新叢集。
手動安裝。手動新增 Iceberg 目錄外掛程式 JAR 檔案,並設定 Spark 屬性,將執行階段納入叢集。
提交 Spark 工作
如要提交 Spark 工作,請使用下列其中一種方法:
gcloud CLI
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region==REGION \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --execute="SPARK_SQL_COMMAND"
更改下列內容:
PROJECT_ID
:包含 Dataproc 叢集的 Google Cloud 專案 ID。CLUSTER_NAME
:用於執行 Spark SQL 工作的 Dataproc 叢集名稱。REGION
:叢集所在的 Compute Engine 區域。LOCATION
:BigQuery 資源的位置。CATALOG_NAME
:您在 SQL 工作中使用的 Spark 目錄名稱。WAREHOUSE_DIRECTORY
:包含資料倉儲的 Cloud Storage 資料夾。這個值開頭為gs://
。SPARK_SQL_COMMAND
:要執行的 Spark SQL 查詢。這項查詢包含建立資源的指令。舉例來說,如要建立命名空間和資料表。
互動式 Spark
連線至 Spark 並安裝目錄外掛程式
如要安裝 BigLake 中繼存放區的目錄外掛程式,請使用 SSH 連線至 Dataproc 叢集。
- 前往 Google Cloud 控制台的「VM Instances」(VM 執行個體) 頁面。
如要連線至 Dataproc VM 執行個體,請在虛擬機器執行個體清單中按一下「SSH」SSH。輸出結果會與下列內容相似:
Connected, host fingerprint: ssh-rsa ... Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ... ... example-cluster@cluster-1-m:~$
在終端機中執行下列 BigLake 中繼存放區初始化指令:
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
更改下列內容:
CATALOG_NAME
:您在 SQL 工作中使用的 Spark 目錄名稱。PROJECT_ID
:BigLake Metastore 目錄的專案 ID,Spark 目錄會連結至該目錄。 Google CloudLOCATION
:BigLake Metastore 的位置。 Google CloudWAREHOUSE_DIRECTORY
:包含資料倉儲的 Cloud Storage 資料夾。這個值開頭為gs://
。
成功連線至叢集後,Spark 終端機會顯示
spark-sql
提示。spark-sql (default)>
管理 BigLake Metastore 資源
您現在已連線至 BigLake Metastore。您可以查看現有資源,或根據 BigLake Metastore 中儲存的中繼資料建立新資源。
舉例來說,請嘗試在互動式 Spark SQL 工作階段中執行下列指令,建立 Iceberg 命名空間和資料表。
使用自訂 Iceberg 目錄:
USE `CATALOG_NAME`;
建立命名空間:
CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;
使用建立的命名空間:
USE NAMESPACE_NAME;
建立 Iceberg 資料表:
CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;
插入表格列:
INSERT INTO TABLE_NAME VALUES (1, "first row");
新增表格欄:
ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
查看資料表中繼資料:
DESCRIBE EXTENDED TABLE_NAME;
列出命名空間中的資料表:
SHOW TABLES;
Zeppelin 筆記本
前往 Google Cloud 控制台的「Dataproc Clusters」(Dataproc 叢集) 頁面。
按一下要使用的叢集名稱。
「叢集詳細資料」頁面隨即開啟。
在導覽選單中,按一下「Web interfaces」(網頁介面)。
在「元件閘道」下方,按一下「Zeppelin」。系統會開啟 Zeppelin 筆記本頁面。
在導覽選單中,依序點選「記事本」和「+ 建立新記事」。
在對話方塊中輸入記事本名稱。保留選取的「Spark」Spark做為預設解譯器。
按一下「建立」,系統會建立新的筆記本。
在筆記本中,依序點選設定選單和「解譯器」。
在「搜尋解譯器」欄位中,搜尋「Spark」。
按一下 [編輯]。
在「Spark.jars」Spark.jars欄位中,輸入 Spark jar 的 URI。
https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar
按一下 [儲存]。
按一下 [確定]。
將下列 PySpark 程式碼複製到 Zeppelin 筆記本。
%pyspark from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("BigLake Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("select version()").show() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;") spark.sql("DESCRIBE TABLE_NAME;").show()
更改下列內容:
CATALOG_NAME
:用於 SQL 作業的 Spark 目錄名稱。PROJECT_ID
:包含 Dataproc 叢集的 Google Cloud 專案 ID。WAREHOUSE_DIRECTORY
:包含資料倉儲的 Cloud Storage 資料夾。這個值開頭為gs://
。NAMESPACE_NAME
:參照 Spark 資料表的命名空間名稱。WAREHOUSE_DIRECTORY
:Cloud Storage 資料夾的 URI,用於儲存資料倉儲。TABLE_NAME
:Spark 資料表的資料表名稱。
按一下執行圖示或按下
Shift-Enter
即可執行程式碼。工作完成後,狀態訊息會顯示「Spark Job Finished」,輸出內容則會顯示表格內容:
將 BigLake Metastore 連線至 Flink
下列操作說明詳述如何使用 Flink SQL 用戶端,將 Dataproc 連線至 BigLake Metastore。
安裝目錄外掛程式並連線至 Flink 工作階段
如要將 BigLake Metastore 連線至 Flink,請按照下列步驟操作:
- 建立已啟用選用 Flink 元件的 Dataproc 叢集,並確認您使用的是 Dataproc 2.2 以上版本。
前往 Google Cloud 控制台的「VM instances」(VM 執行個體) 頁面。
在虛擬機器執行個體清單中,按一下「SSH」SSH,連線至 Dataproc VM 執行個體。
為 BigLake Metastore 設定 Iceberg 自訂目錄外掛程式:
FLINK_VERSION=1.17 ICEBERG_VERSION=1.5.2 cd /usr/lib/flink sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
在 YARN 上啟動 Flink 工作階段:
HADOOP_CLASSPATH=`hadoop classpath` sudo bin/yarn-session.sh -nm flink-dataproc -d sudo bin/sql-client.sh embedded \ -s yarn-session
在 Flink 中建立目錄:
CREATE CATALOG CATALOG_NAME WITH ( 'type'='iceberg', 'warehouse'='WAREHOUSE_DIRECTORY', 'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', 'gcp_project'='PROJECT_ID', 'gcp_location'='LOCATION' );
更改下列內容:
CATALOG_NAME
:Flink 目錄 ID,連結至 BigLake Metastore 目錄。WAREHOUSE_DIRECTORY
:倉庫目錄的基本路徑 (Flink 建立檔案的 Cloud Storage 資料夾)。這個值開頭為gs://
。PROJECT_ID
:Flink 目錄連結的 BigLake Metastore 目錄專案 ID。LOCATION
:BigQuery 資源的位置。
Flink 工作階段現已連線至 BigLake 中繼資料存放區,您可以執行 Flink SQL 指令。
管理 BigLake Metastore 資源
連線至 BigLake Metastore 後,您就能根據 BigLake Metastore 中儲存的中繼資料,建立及查看資源。
舉例來說,請嘗試在互動式 Flink SQL 工作階段中執行下列指令,建立 Iceberg 資料庫和資料表。
使用自訂 Iceberg 目錄:
USE CATALOG CATALOG_NAME;
將
CATALOG_NAME
替換為 Flink 目錄 ID。建立資料庫,這會在 BigQuery 中建立資料集:
CREATE DATABASE IF NOT EXISTS DATABASE_NAME;
將
DATABASE_NAME
換成新資料庫的名稱。使用您建立的資料庫:
USE DATABASE_NAME;
建立 Iceberg 資料表。以下範例會建立銷售資料表:
CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) );
將
ICEBERG_TABLE_NAME
替換為新資料表的名稱。查看資料表中繼資料:
DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
列出資料庫中的資料表:
SHOW TABLES;
將資料擷取至資料表
在上一節中建立 Iceberg 資料表後,您可以使用 Flink DataGen 做為資料來源,將即時資料擷取到資料表中。以下步驟是這個工作流程的範例:
使用 DataGen 建立暫時表格:
CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.order_number.kind' = 'sequence', 'fields.order_number.start' = '1', 'fields.order_number.end' = '1000000', 'fields.price.min' = '0', 'fields.price.max' = '10000', 'fields.buyer.first_name.length' = '10', 'fields.buyer.last_name.length' = '10' ) LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);
更改下列內容:
DATABASE_NAME
:用於儲存臨時資料表的資料庫名稱。TEMP_TABLE_NAME
:臨時資料表的名稱。ICEBERG_TABLE_NAME
:您在上一節中建立的 Iceberg 資料表名稱。
將平行處理設為 1:
SET 'parallelism.default' = '1';
設定檢查點間隔:
SET 'execution.checkpointing.interval' = '10second';
設定查核點:
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
啟動即時串流工作:
INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;
輸出結果會與下列內容相似:
[INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 0de23327237ad8a811d37748acd9c10b
如要查看串流工作狀態,請按照下列步驟操作:
前往 Google Cloud 控制台的「Clusters」(叢集) 頁面。
選取您的叢集。
按一下「網頁介面」分頁標籤。
按一下「YARN ResourceManager」連結。
在 YARN ResourceManager 介面中,找到 Flink 工作階段,然後按一下「Tracking UI」下方的「ApplicationMaster」連結。
在「狀態」欄中,確認工作狀態為「執行中」。
在 Flink SQL 用戶端中查詢串流資料:
SELECT * FROM ICEBERG_TABLE_NAME /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ ORDER BY order_time desc LIMIT 20;
在 BigQuery 中查詢串流資料:
SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME` ORDER BY order_time desc LIMIT 20;
在 Flink SQL 用戶端終止串流工作:
STOP JOB 'JOB_ID';
將
JOB_ID
替換為您建立串流工作時,輸出內容中顯示的工作 ID。