Dataproc 可选 Iceberg 组件

使用可选组件功能创建 Dataproc 集群时,您可以安装 Iceberg 等其他组件。本页面介绍了如何选择性地在 Dataproc 集群上安装 Iceberg 组件。

概览

Apache Iceberg 是一种用于大型分析数据集的开放表格式。该格式为大数据提供了 SQL 表的可靠性和简单性,并且让 Spark、Trino、PrestoDB、Flink 和 Hive 等引擎能够同时安全地处理相同的表。

在 Dataproc 集群上安装 Apache Iceberg 组件后,它会安装 Iceberg 库,并在集群中配置 Spark 和 Hive,以与 Iceberg 搭配使用。

主要 Iceberg 功能

Iceberg 功能包括:

  • 架构演变:添加、移除或重命名列,而无需重写整个表。
  • 时间旅行:查询历史表快照,以用于审核或回滚。
  • 隐藏分区:优化数据布局,可更快地进行查询,而无需向用户公开分区详细信息。
  • ACID 事务:确保数据一致性并防止冲突。

兼容的 Dataproc 映像版本

您可以在使用 2.2.47 及更高映像版本创建的Dataproc 集群上安装 Iceberg 组件。该集群上安装的 Iceberg 版本列在 2.2 发布版本页面中。

创建包含 Iceberg 集群的 Dataproc 时,系统会将以下 Spark 和 Hive 属性配置为与 Iceberg 搭配使用。

配置文件 属性 默认值
/etc/spark/conf/spark-defaults.conf spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.driver.extraClassPath /usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar
spark.executor.extraClassPath /usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/iceberg/lib/iceberg-hive-runtime.jar
iceberg.engine.hive.enabled true

安装 Iceberg 可选组件

创建 Dataproc 集群时安装 Iceberg 组件。Dataproc 集群映像版本列表页面会显示最新 Dataproc 集群映像版本中包含的 Iceberg 组件版本。

Google Cloud 控制台

如需创建用于安装 Iceberg 组件的 Dataproc 集群,请在 Google Cloud 控制台中完成以下步骤:

  1. 打开 Dataproc 的创建集群页面。设置集群面板已处于选中状态。
  2. 组件部分的可选组件下,选择 Iceberg 组件。
  3. 确认或指定其他集群设置,然后点击创建

Google Cloud CLI

如需创建用于安装 Iceberg 组件的 Dataproc 集群,请将 gcloud dataproc clusters create 命令与 --optional-components 标志结合使用。

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=ICEBERG \
     other flags ...

替换以下内容:

  • CLUSTER_NAME:新集群的名称。
  • REGION集群区域

REST API

如需创建用于安装 Iceberg 可选组件的 Dataproc 集群,请在 clusters.create 请求中指定 Iceberg SoftwareConfig.Component

将 Iceberg 表与 Spark 和 Hive 搭配使用

创建在集群上安装了 Iceberg 可选组件的 Dataproc 集群后,您可以使用 Spark 和 Hive 读取和写入 Iceberg 表数据。

Spark

为 Iceberg 配置 Spark 会话

您可以在本地使用 gcloud CLI 命令,也可以使用在 dataproc 集群主节点上运行的 spark-shellpyspark REPL(读取-评估-输出循环),以启用 Iceberg 的 Spark 扩展程序,并设置 Spark 目录以使用 Iceberg 表。

gcloud

在本地终端窗口或 Cloud Shell 中运行以下 gcloud CLI 示例,以提交 Spark 作业并设置 Spark 属性,从而为 Iceberg 配置 Spark 会话。

gcloud dataproc jobs submit spark  \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --properties="spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --properties="spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --properties="spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER" \
     other flags ...

替换以下内容:

  • CLUSTER_NAME:集群名称。
  • REGIONCompute Engine 区域。
  • CATALOG_NAME:Iceberg 目录名称。
  • BUCKETFOLDER:Iceberg 目录在 Cloud Storage 中的位置。

spark-shell

如需使用 Dataproc 集群上的 spark-shell REPL 为 Iceberg 配置 Spark 会话,请完成以下步骤:

  1. 使用 SSH 连接到 Dataproc 集群的主节点。

  2. 在 SSH 会话终端中运行以下命令,为 Iceberg 配置 Spark 会话。

spark-shell \
    --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"

替换以下内容:

  • CLUSTER_NAME:集群名称。
  • REGIONCompute Engine 区域。
  • CATALOG_NAME:Iceberg 目录名称。
  • BUCKETFOLDER:Iceberg 目录在 Cloud Storage 中的位置。

pyspark shell

如需使用 Dataproc 集群上的 pyspark REPL 为 Iceberg 配置 Spark 会话,请完成以下步骤:

  1. 使用 SSH 连接到 Dataproc 集群的主节点。

  2. 在 SSH 会话终端中运行以下命令,为 Iceberg 配置 Spark 会话:

pyspark \
    --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"

替换以下内容:

  • CLUSTER_NAME:集群名称。
  • REGIONCompute Engine 区域。
  • CATALOG_NAME:Iceberg 目录名称。
  • BUCKETFOLDER:Iceberg 目录在 Cloud Storage 中的位置。

将数据写入 Iceberg 表

您可以使用 Spark 将数据写入 Iceberg 表。以下代码段会创建一个包含示例数据的 DataFrame,在 Cloud Storage 中创建一个 Iceberg 表,然后将数据写入 Iceberg 表。

PySpark

# Create a DataFrame with sample data.
data = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])

# Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
    id integer,
    name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")

# Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()

Scala

// Create a DataFrame with sample data.
val data = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")

// Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
    id integer,
    name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")

// Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()

从 Iceberg 表中读取数据

您可以使用 Spark 从 Iceberg 表中读取数据。以下代码段会读取该表,然后显示其内容。

PySpark

# Read Iceberg table data into a DataFrame.
df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")
# Display the data.
df.show()

Scala

// Read Iceberg table data into a DataFrame.
val df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")

// Display the data.
df.show()

Spark SQL

SELECT * FROM CATALOG_NAME.NAMESPACE.TABLE_NAME

Hive

在 Hive 中创建 Iceberg 表

Dataproc 集群会预先将 Hive 配置为与 Iceberg 搭配使用。

如需运行本部分中的代码段,请完成以下步骤:

  1. 使用 SSH 连接到 Dataproc 集群的主节点。

  2. 在 SSH 终端窗口中启动 beeline

    beeline -u jdbc:hive2://
    

您可以在 Hive 中创建未分区或分区 Iceberg 表。

未分区表

在 Hive 中创建未分区 Iceberg 表。

CREATE TABLE my_table (
  id INT,
  name STRING,
  created_at TIMESTAMP
) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

分区表

通过在 PARTITIONED BY 子句中指定分区列,可在 Hive 中创建分区 Iceberg 表。

CREATE TABLE my_partitioned_table (
  id INT,
  name STRING
) PARTITIONED BY (date_sk INT)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

将数据插入 Hive 中的 Iceberg 表

您可以使用标准 Hive INSERT 语句将数据插入 Iceberg 表。

SET hive.execution.engine=mr;

INSERT INTO my_table
SELECT 1, 'Alice', current_timestamp();

限制

  • DML(数据操纵语言)操作仅支持 MR (MapReduce) 执行引擎。
  • Hive 3.1.3 中已弃用 MR 执行。

从 Hive 的 Iceberg 表中读取数据

如需从 Iceberg 表中读取数据,请使用 SELECT 语句。

SELECT * FROM my_table;

在 Hive 中删除 Iceberg 表。

如需在 Hive 中删除 Iceberg 表,请使用 DROP TABLE 语句。

DROP TABLE my_table;

了解详情