Dataproc 可选 Delta Lake 组件

使用可选组件功能创建 Dataproc 集群时,酌情安装 Delta Lake 等其他组件。本页面介绍了如何在 Dataproc 集群上选择安装 Delta Lake 组件。

在 Dataproc 集群上安装 Delta Lake 组件后,它将安装 Delta Lake 库,并在集群中配置 Spark 和 Hive,以与 Delta Lake 一起使用。

兼容的 Dataproc 映像版本

您可以使用 Dataproc 映像版本 2.2.46 及更高版本创建的 Dataproc 集群上安装 Delta Lake 组件。

如需查看 Dataproc 映像版本中包含的 Delta Lake 组件版本,请参阅支持的 Dataproc 版本

创建启用了 Delta Lake 组件的 Dataproc 集群时,以下 Spark 属性将被配置为与 Delta Lake 一起使用。

配置文件 属性 默认值
/etc/spark/conf/spark-defaults.conf spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
/etc/spark/conf/spark-defaults.conf spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog

安装组件

在使用 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API 创建 Dataproc 集群时安装组件。

控制台

  1. 在 Google Cloud 控制台中,前往 Dataproc 创建集群页面。

    前往“创建集群”

    设置集群面板已处于选中状态。

  2. 组件部分的可选组件下,选择 Delta Lake 以及其他一些要在集群上安装的可选组件。

gcloud CLI

如需创建包含 Delta Lake 组件的 Dataproc 集群,请使用带有 --optional-components 标志的 gcloud dataproc clusters create 命令。

gcloud dataproc clusters create CLUSTER_NAME \
    --optional-components=DELTA \
    --region=REGION \
    ... other flags

注意:

REST API

可以通过 Dataproc API 使用 SoftwareConfig.Component 将 Delta Lake 组件指定为 clusters.create 请求的一部分。

用法示例

本部分提供了使用 Delta Lake 表的数据读写示例。

Delta Lake 表

写入 Delta Lake 表

您可以使用 Spark DataFrame 将数据写入 Delta Lake 表。以下示例会创建一个包含示例数据的 DataFrame,在 Cloud Storage 中创建一个 my_delta_table Delta Lake 表,并将数据写入该 Delta Lake 表。

PySpark

# Create a DataFrame with sample data.
data = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])

# Create a Delta Lake table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS my_delta_table (
    id integer,
    name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table'""")

# Write the DataFrame to the Delta Lake table in Cloud Storage.
data.writeTo("my_delta_table").append()

Scala

// Create a DataFrame with sample data.
val data = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")

// Create a Delta Lake table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS my_delta_table (
    id integer,
    name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table'""")

// Write the DataFrame to the Delta Lake table in Cloud Storage.
data.write.format("delta").mode("append").saveAsTable("my_delta_table")

Spark SQL

CREATE TABLE IF NOT EXISTS my_delta_table (
    id integer,
    name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table';

INSERT INTO my_delta_table VALUES ("1", "Alice"), ("2", "Bob");

从 Delta Lake 表读取

以下示例读取 my_delta_table 并显示其内容。

PySpark

# Read the Delta Lake table into a DataFrame.
df = spark.table("my_delta_table")

# Display the data.
df.show()

Scala

// Read the Delta Lake table into a DataFrame.
val df = spark.table("my_delta_table")

// Display the data.
df.show()

Spark SQL

SELECT * FROM my_delta_table;

Hive 与 Delta Lake

将数据写入 Hive 中的 Delta 表。

Dataproc Delta Lake 可选组件已预先配置为与 Hive 外部表配合使用。

如需了解详情,请参阅 Hive 连接器

在 beeline 客户端中运行示例。

beeline -u jdbc:hive2://

创建 Spark Delta Lake 表。

必须先使用 Spark 创建 Delta Lake 表,然后 Hive 外部表才能引用该表。

CREATE TABLE IF NOT EXISTS my_delta_table (
    id integer,
    name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table';

INSERT INTO my_delta_table VALUES ("1", "Alice"), ("2", "Bob");

创建 Hive 外部表。

SET hive.input.format=io.delta.hive.HiveInputFormat;
SET hive.tez.input.format=io.delta.hive.HiveInputFormat;

CREATE EXTERNAL TABLE deltaTable(id INT, name STRING)
STORED BY 'io.delta.hive.DeltaStorageHandler'
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table';

注意:

  • io.delta.hive.DeltaStorageHandler 类实现了 Hive 数据源 API。它可以加载 Delta 表并提取其元数据。如果 CREATE TABLE 语句中的表架构与底层 Delta Lake 元数据不一致,系统会抛出错误。

从 Hive 中的 Delta Lake 表读取数据。

如需从 Delta 表中读取数据,请使用 SELECT 语句:

SELECT * FROM deltaTable;

删除 Delta Lake 表。

如需删除 Delta 表,请使用 DROP TABLE 语句:

DROP TABLE deltaTable;