使用可选组件功能创建 Dataproc 集群时,您可以安装 Iceberg 等其他组件。本页面介绍了如何选择性地在 Dataproc 集群上安装 Iceberg 组件。
概览
Apache Iceberg 是一种用于大型分析数据集的开放表格式。该格式为大数据提供了 SQL 表的可靠性和简单性,并且让 Spark、Trino、PrestoDB、Flink 和 Hive 等引擎能够同时安全地处理相同的表。
在 Dataproc 集群上安装 Apache Iceberg 组件后,它会安装 Iceberg 库,并在集群中配置 Spark 和 Hive,以与 Iceberg 搭配使用。
主要 Iceberg 功能
Iceberg 功能包括:
- 架构演变:添加、移除或重命名列,而无需重写整个表。
- 时间旅行:查询历史表快照,以用于审核或回滚。
- 隐藏分区:优化数据布局,可更快地进行查询,而无需向用户公开分区详细信息。
- ACID 事务:确保数据一致性并防止冲突。
兼容的 Dataproc 映像版本
您可以在使用 2.2.47 及更高映像版本创建的Dataproc 集群上安装 Iceberg 组件。该集群上安装的 Iceberg 版本列在 2.2 发布版本页面中。
与 Iceberg 相关的属性
创建包含 Iceberg 集群的 Dataproc 时,系统会将以下 Spark 和 Hive 属性配置为与 Iceberg 搭配使用。
配置文件 | 属性 | 默认值 |
---|---|---|
/etc/spark/conf/spark-defaults.conf |
spark.sql.extensions |
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions |
spark.driver.extraClassPath |
/usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar |
|
spark.executor.extraClassPath |
/usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar |
|
/etc/hive/conf/hive-site.xml |
hive.aux.jars.path |
file:///usr/lib/iceberg/lib/iceberg-hive-runtime.jar |
iceberg.engine.hive.enabled |
true |
安装 Iceberg 可选组件
在创建 Dataproc 集群时安装 Iceberg 组件。Dataproc 集群映像版本列表页面会显示最新 Dataproc 集群映像版本中包含的 Iceberg 组件版本。
Google Cloud 控制台
如需创建用于安装 Iceberg 组件的 Dataproc 集群,请在 Google Cloud 控制台中完成以下步骤:
- 打开 Dataproc 的创建集群页面。设置集群面板已处于选中状态。
- 在组件部分的可选组件下,选择 Iceberg 组件。
- 确认或指定其他集群设置,然后点击创建。
Google Cloud CLI
如需创建用于安装 Iceberg 组件的 Dataproc 集群,请将 gcloud dataproc clusters create
命令与 --optional-components
标志结合使用。
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --optional-components=ICEBERG \ other flags ...
替换以下内容:
- CLUSTER_NAME:新集群的名称。
- REGION:集群区域。
REST API
如需创建用于安装 Iceberg 可选组件的 Dataproc 集群,请在 clusters.create
请求中指定 Iceberg SoftwareConfig.Component
。
将 Iceberg 表与 Spark 和 Hive 搭配使用
创建在集群上安装了 Iceberg 可选组件的 Dataproc 集群后,您可以使用 Spark 和 Hive 读取和写入 Iceberg 表数据。
Spark
为 Iceberg 配置 Spark 会话
您可以在本地使用 gcloud CLI 命令,也可以使用在 dataproc 集群主节点上运行的 spark-shell
或 pyspark
REPL(读取-评估-输出循环),以启用 Iceberg 的 Spark 扩展程序,并设置 Spark 目录以使用 Iceberg 表。
gcloud
在本地终端窗口或 Cloud Shell 中运行以下 gcloud CLI 示例,以提交 Spark 作业并设置 Spark 属性,从而为 Iceberg 配置 Spark 会话。
gcloud dataproc jobs submit spark \ --cluster=CLUSTER_NAME \ --region=REGION \ --properties="spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \ --properties="spark.sql.catalog.CATALOG_NAME.type=hadoop" \ --properties="spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER" \ other flags ...
替换以下内容:
- CLUSTER_NAME:集群名称。
- REGION:Compute Engine 区域。
- CATALOG_NAME:Iceberg 目录名称。
- BUCKET 和 FOLDER:Iceberg 目录在 Cloud Storage 中的位置。
spark-shell
如需使用 Dataproc 集群上的 spark-shell
REPL 为 Iceberg 配置 Spark 会话,请完成以下步骤:
使用 SSH 连接到 Dataproc 集群的主节点。
在 SSH 会话终端中运行以下命令,为 Iceberg 配置 Spark 会话。
spark-shell \ --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \ --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \ --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \ --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"
替换以下内容:
- CLUSTER_NAME:集群名称。
- REGION:Compute Engine 区域。
- CATALOG_NAME:Iceberg 目录名称。
- BUCKET 和 FOLDER:Iceberg 目录在 Cloud Storage 中的位置。
pyspark shell
如需使用 Dataproc 集群上的 pyspark
REPL 为 Iceberg 配置 Spark 会话,请完成以下步骤:
使用 SSH 连接到 Dataproc 集群的主节点。
在 SSH 会话终端中运行以下命令,为 Iceberg 配置 Spark 会话:
pyspark \ --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \ --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \ --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \ --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"
替换以下内容:
- CLUSTER_NAME:集群名称。
- REGION:Compute Engine 区域。
- CATALOG_NAME:Iceberg 目录名称。
- BUCKET 和 FOLDER:Iceberg 目录在 Cloud Storage 中的位置。
将数据写入 Iceberg 表
您可以使用 Spark 将数据写入 Iceberg 表。以下代码段会创建一个包含示例数据的 DataFrame
,在 Cloud Storage 中创建一个 Iceberg 表,然后将数据写入 Iceberg 表。
PySpark
# Create a DataFrame with sample data. data = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"]) # Create an Iceberg table in Cloud Storage. spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME ( id integer, name string) USING iceberg LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""") # Write the DataFrame to the Iceberg table in Cloud Storage. data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()
Scala
// Create a DataFrame with sample data. val data = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name") // Create an Iceberg table in Cloud Storage. spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME ( id integer, name string) USING iceberg LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""") // Write the DataFrame to the Iceberg table in Cloud Storage. data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()
从 Iceberg 表中读取数据
您可以使用 Spark 从 Iceberg 表中读取数据。以下代码段会读取该表,然后显示其内容。
PySpark
# Read Iceberg table data into a DataFrame. df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME") # Display the data. df.show()
Scala
// Read Iceberg table data into a DataFrame. val df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME") // Display the data. df.show()
Spark SQL
SELECT * FROM CATALOG_NAME.NAMESPACE.TABLE_NAME
Hive
在 Hive 中创建 Iceberg 表
Dataproc 集群会预先将 Hive 配置为与 Iceberg 搭配使用。
如需运行本部分中的代码段,请完成以下步骤:
使用 SSH 连接到 Dataproc 集群的主节点。
在 SSH 终端窗口中启动
beeline
。beeline -u jdbc:hive2://
您可以在 Hive 中创建未分区或分区 Iceberg 表。
未分区表
在 Hive 中创建未分区 Iceberg 表。
CREATE TABLE my_table ( id INT, name STRING, created_at TIMESTAMP ) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
分区表
通过在 PARTITIONED BY
子句中指定分区列,可在 Hive 中创建分区 Iceberg 表。
CREATE TABLE my_partitioned_table ( id INT, name STRING ) PARTITIONED BY (date_sk INT) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
将数据插入 Hive 中的 Iceberg 表
您可以使用标准 Hive INSERT
语句将数据插入 Iceberg 表。
SET hive.execution.engine=mr; INSERT INTO my_table SELECT 1, 'Alice', current_timestamp();
限制
- DML(数据操纵语言)操作仅支持 MR (MapReduce) 执行引擎。
- Hive
3.1.3
中已弃用 MR 执行。
从 Hive 的 Iceberg 表中读取数据
如需从 Iceberg 表中读取数据,请使用 SELECT
语句。
SELECT * FROM my_table;
在 Hive 中删除 Iceberg 表。
如需在 Hive 中删除 Iceberg 表,请使用 DROP TABLE
语句。
DROP TABLE my_table;