Dataproc 選用 Iceberg 元件

當您使用選用元件功能建立 Dataproc 叢集時,可以安裝 Iceberg 等其他元件。本頁說明如何選擇在 Dataproc 叢集中安裝 Iceberg 元件。

總覽

Apache Iceberg 是開放式資料表格式,適用於大型分析資料集。這項工具可為大數據帶來 SQL 資料表的可靠性和簡易性,同時讓 Spark、Trino、PrestoDB、Flink 和 Hive 等引擎能夠安全地同時使用相同的資料表。

在 Dataproc 叢集中安裝 Apache Iceberg 元件時,該元件會安裝 Iceberg 程式庫,並設定 Spark 和 Hive,以便在叢集中與 Iceberg 搭配運作。

Iceberg 的主要功能

Iceberg 的功能包括:

  • 結構定義演進:新增、移除或重新命名資料欄,而無須重寫整個資料表。
  • 時間旅行:查詢歷來資料表快照,以便稽核或回溯。
  • 隱藏式分區:最佳化資料版面配置,加快查詢速度,但不向使用者公開分區詳細資料。
  • ACID 交易:確保資料一致性並避免衝突。

相容的 Dataproc 映像檔版本

您可以在使用 2.2.47 以上映像檔版本建立的 Dataproc 叢集中安裝 Iceberg 元件。叢集中安裝的 Iceberg 版本會列於「2.2 發布版本」頁面。

建立含有 Iceberg 叢集的 Dataproc 時,系統會將下列 Spark 和 Hive 屬性設定為與 Iceberg 搭配運作。

設定檔 屬性 預設值
/etc/spark/conf/spark-defaults.conf spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.driver.extraClassPath /usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar
spark.executor.extraClassPath /usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/iceberg/lib/iceberg-hive-runtime.jar
iceberg.engine.hive.enabled true

安裝 Iceberg 選用元件

建立 Dataproc 叢集時,請安裝 Iceberg 元件。「Dataproc 叢集映像檔版本清單」頁面會顯示最新 Dataproc 叢集映像檔版本中包含的 Iceberg 元件版本。

Google Cloud 控制台

如要建立安裝 Iceberg 元件的 Dataproc 叢集,請在 Google Cloud 控制台中完成下列步驟:

  1. 開啟 Dataproc「Create a cluster」(建立叢集) 頁面。選取「設定叢集」面板。
  2. 在「Components」部分的「Optional components」下方,選取「Iceberg」元件。
  3. 確認或指定其他叢集設定,然後按一下「建立」

Google Cloud CLI

如要建立安裝 Iceberg 元件的 Dataproc 叢集,請使用 gcloud dataproc clusters create 指令搭配 --optional-components 標記。

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=ICEBERG \
     other flags ...

更改下列內容:

  • CLUSTER_NAME:新的叢集名稱。
  • REGION叢集區域

REST API

如要建立會安裝 Iceberg 選用元件的 Dataproc 叢集,請在 clusters.create 要求中指定 Iceberg SoftwareConfig.Component

搭配使用 Spark 和 Hive 使用 Iceberg 資料表

建立 Dataproc 叢集後,如果叢集中已安裝 Iceberg 選用元件,您就可以使用 Spark 和 Hive 讀取及寫入 Iceberg 資料表資料。

Spark

為 Iceberg 設定 Spark 工作階段

您可以在本機上使用 gcloud CLI 指令,或在 Dataproc 叢集主要節點上執行 spark-shellpyspark REPL (讀取-評估-列印迴圈),以啟用 Iceberg 的 Spark 擴充功能,並設定 Spark 目錄以使用 Iceberg 資料表。

gcloud

在本機終端機視窗或 Cloud Shell 中執行下列 gcloud CLI 範例,提交 Spark 工作並設定 Spark 屬性,以便為 Iceberg 設定 Spark 工作階段。

gcloud dataproc jobs submit spark  \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --properties="spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --properties="spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --properties="spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER" \
     other flags ...

更改下列內容:

  • CLUSTER_NAME:叢集名稱。
  • REGIONCompute Engine 區域。
  • CATALOG_NAME:Iceberg 目錄名稱。
  • BUCKETFOLDER:Cloud Storage 中的 Iceberg 目錄位置。

spark-shell

如要在 Dataproc 叢集中使用 spark-shell REPL 設定 Iceberg 的 Spark 工作階段,請完成下列步驟:

  1. 使用 SSH 連線至 Dataproc 叢集的主要節點。

  2. 在 SSH 工作階段終端機中執行下列指令,為 Iceberg 設定 Spark 工作階段。

spark-shell \
    --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"

更改下列內容:

  • CLUSTER_NAME:叢集名稱。
  • REGIONCompute Engine 區域。
  • CATALOG_NAME:Iceberg 目錄名稱。
  • BUCKETFOLDER:Cloud Storage 中的 Iceberg 目錄位置。

pyspark shell

如要在 Dataproc 叢集中使用 pyspark REPL 設定 Iceberg 的 Spark 工作階段,請完成下列步驟:

  1. 使用 SSH 連線至 Dataproc 叢集的主要節點。

  2. 在 SSH 工作階段終端機中執行下列指令,為 Iceberg 設定 Spark 工作階段:

pyspark \
    --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"

更改下列內容:

  • CLUSTER_NAME:叢集名稱。
  • REGIONCompute Engine 區域。
  • CATALOG_NAME:Iceberg 目錄名稱。
  • BUCKETFOLDER:Cloud Storage 中的 Iceberg 目錄位置。

將資料寫入 Iceberg 資料表

您可以使用 Spark 將資料寫入 Iceberg 資料表。下列程式碼片段會建立含有範例資料的 DataFrame,在 Cloud Storage 中建立 Iceberg 資料表,然後將資料寫入 Iceberg 資料表。

PySpark

# Create a DataFrame with sample data.
data = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])

# Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
    id integer,
    name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")

# Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()

Scala

// Create a DataFrame with sample data.
val data = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")

// Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
    id integer,
    name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")

// Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()

讀取 Iceberg 資料表中的資料

您可以使用 Spark 讀取 Iceberg 資料表中的資料。以下程式碼片段會讀取資料表,然後顯示其內容。

PySpark

# Read Iceberg table data into a DataFrame.
df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")
# Display the data.
df.show()

Scala

// Read Iceberg table data into a DataFrame.
val df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")

// Display the data.
df.show()

Spark SQL

SELECT * FROM CATALOG_NAME.NAMESPACE.TABLE_NAME

Hive

在 Hive 中建立 Iceberg 資料表

Dataproc 叢集會預先設定 Hive 以搭配 Iceberg 運作。

如要執行本節中的程式碼片段,請完成下列步驟:

  1. 使用 SSH 連線至 Dataproc 叢集的主要節點。

  2. 在 SSH 終端機視窗中開啟 beeline

    beeline -u jdbc:hive2://
    

您可以在 Hive 中建立未分區或已分區的 Iceberg 資料表。

未分區的資料表

在 Hive 中建立未分區的 Iceberg 資料表。

CREATE TABLE my_table (
  id INT,
  name STRING,
  created_at TIMESTAMP
) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

分區資料表

PARTITIONED BY 子句中指定分區欄,即可在 Hive 中建立分區 Iceberg 資料表。

CREATE TABLE my_partitioned_table (
  id INT,
  name STRING
) PARTITIONED BY (date_sk INT)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

將資料插入 Hive 中的 Iceberg 資料表

您可以使用標準 Hive INSERT 陳述式,將資料插入 Iceberg 資料表。

SET hive.execution.engine=mr;

INSERT INTO my_table
SELECT 1, 'Alice', current_timestamp();

限制

  • 只有 DML (資料操縱語言) 作業支援 MR (MapReduce) 執行引擎。
  • Hive 3.1.3 已淘汰 MapReduce 執行作業。

讀取 Hive 中的 Iceberg 資料表資料

如要讀取 Iceberg 資料表中的資料,請使用 SELECT 陳述式。

SELECT * FROM my_table;

在 Hive 中刪除 Iceberg 資料表。

如要在 Hive 中刪除 Iceberg 資料表,請使用 DROP TABLE 陳述式。

DROP TABLE my_table;

後續步驟