Dataproc 選用 Delta Lake 元件

當您使用選用元件功能建立 Dataproc 叢集時,可以安裝 Delta Lake 等其他元件。本頁面說明如何選擇在 Dataproc 叢集中安裝 Delta Lake 元件。

在 Dataproc 叢集中安裝 Delta Lake 元件時,會安裝 Delta Lake 程式庫,並設定叢集中的 Spark 和 Hive,以便與 Delta Lake 搭配使用。

相容的 Dataproc 映像檔版本

您可以在使用 Dataproc 映像檔 2.2.46 以上版本建立的 Dataproc 叢集中安裝 Delta Lake 元件。

請參閱支援的 Dataproc 版本,瞭解 Dataproc 映像檔版本中包含的 Delta Lake 元件版本。

建立 Dataproc 叢集時,如果啟用 Delta Lake 元件,系統會將下列 Spark 屬性設為與 Delta Lake 搭配運作。

設定檔 屬性 預設值
/etc/spark/conf/spark-defaults.conf spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
/etc/spark/conf/spark-defaults.conf spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog

安裝元件

請在使用 Google Cloud 控制台、Google Cloud CLI 或 Dataproc API 建立 Dataproc 叢集時安裝元件。

控制台

  1. 在 Google Cloud 控制台中,前往 Dataproc 的「Create a cluster」頁面。

    前往「建立叢集」

    選取「設定叢集」面板。

  2. 在「元件」部分的「選用元件」下方,選取要安裝在叢集中的「Delta Lake」和其他選用元件。

gcloud CLI

如要建立包含 Delta Lake 元件的 Dataproc 叢集,請使用 gcloud dataproc clusters create 指令搭配 --optional-components 標記。

gcloud dataproc clusters create CLUSTER_NAME \
    --optional-components=DELTA \
    --region=REGION \
    ... other flags

注意:

REST API

您可以透過 Dataproc API,使用 SoftwareConfig.Component 做為 clusters.create 要求的一部分,指定 Delta Lake 元件。

使用範例

本節提供使用 Delta Lake 資料表的資料讀取和寫入範例。

Delta Lake 資料表

寫入 Delta Lake 資料表

您可以使用 Spark DataFrame 將資料寫入 Delta Lake 資料表。以下範例會建立含有範例資料的 DataFrame、在 Cloud Storage 中建立 my_delta_table Delta Lake 資料表,然後將資料寫入 Delta Lake 資料表。

PySpark

# Create a DataFrame with sample data.
data = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])

# Create a Delta Lake table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS my_delta_table (
    id integer,
    name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table'""")

# Write the DataFrame to the Delta Lake table in Cloud Storage.
data.writeTo("my_delta_table").append()

Scala

// Create a DataFrame with sample data.
val data = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")

// Create a Delta Lake table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS my_delta_table (
    id integer,
    name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table'""")

// Write the DataFrame to the Delta Lake table in Cloud Storage.
data.write.format("delta").mode("append").saveAsTable("my_delta_table")

Spark SQL

CREATE TABLE IF NOT EXISTS my_delta_table (
    id integer,
    name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table';

INSERT INTO my_delta_table VALUES ("1", "Alice"), ("2", "Bob");

讀取 Delta Lake 資料表

以下範例會讀取 my_delta_table 並顯示其內容。

PySpark

# Read the Delta Lake table into a DataFrame.
df = spark.table("my_delta_table")

# Display the data.
df.show()

Scala

// Read the Delta Lake table into a DataFrame.
val df = spark.table("my_delta_table")

// Display the data.
df.show()

Spark SQL

SELECT * FROM my_delta_table;

搭配 Delta Lake 的 Hive

寫入 Hive 中的 Delta 資料表。

Dataproc Delta Lake 選用元件已預先設定為與 Hive 外部資料表搭配運作。

詳情請參閱「Hive 連接器」。

在 beeline 用戶端中執行範例。

beeline -u jdbc:hive2://

建立 Spark Delta Lake 資料表。

必須使用 Spark 建立 Delta Lake 資料表,才能讓 Hive 外部資料表參照該資料表。

CREATE TABLE IF NOT EXISTS my_delta_table (
    id integer,
    name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table';

INSERT INTO my_delta_table VALUES ("1", "Alice"), ("2", "Bob");

建立 Hive 外部資料表。

SET hive.input.format=io.delta.hive.HiveInputFormat;
SET hive.tez.input.format=io.delta.hive.HiveInputFormat;

CREATE EXTERNAL TABLE deltaTable(id INT, name STRING)
STORED BY 'io.delta.hive.DeltaStorageHandler'
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table';

注意:

  • io.delta.hive.DeltaStorageHandler 類別會實作 Hive 資料來源 API。可載入 Delta 資料表並擷取中繼資料。如果 CREATE TABLE 陳述式中的資料表結構定義與底層 Delta Lake 中繼資料不一致,系統會擲回錯誤。

從 Hive 中的 Delta Lake 資料表讀取資料。

如要讀取 Delta 資料表中的資料,請使用 SELECT 陳述式:

SELECT * FROM deltaTable;

刪除 Delta Lake 資料表。

如要捨棄 Delta 資料表,請使用 DROP TABLE 陳述式:

DROP TABLE deltaTable;