搭配 Dataproc Metastore 使用 Apache Iceberg 資料表

本頁說明如何搭配附加至 Dataproc 叢集的 Dataproc Metastore 服務,使用 Apache Iceberg 資料表。Apache Iceberg 是開放式資料表格式,適用於大型數據分析資料集。

相容性

Iceberg 資料表支援下列功能。

驅動因素 選取 插入 建立資料表
Spark
Hive
Presto

事前準備

搭配 Spark 使用 Iceberg 資料表

以下範例說明您應如何搭配使用 Spark 和 Iceberg 資料表。

Iceberg 資料表支援讀取和寫入作業。詳情請參閱 Apache Iceberg - Spark

Spark 設定

首先,請啟動 Spark 殼層,並使用 Cloud Storage 值區儲存資料。如要在 Spark 安裝中加入 Iceberg,請將 Iceberg Spark 執行階段 JAR 檔案新增至 Spark 的 JAR 檔案資料夾。如要下載 JAR 檔案,請參閱 Apache Iceberg 下載項目。下列指令會啟動支援 Apache Iceberg 的 Spark 殼層:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

使用 Hive Catalog 建立 Iceberg 資料表

  1. 設定 Hive 目錄設定,在 Spark Scala 中建立 Iceberg 資料表:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    import java.util.HashMap
    
  2. 建立資料表,用於插入及更新資料。以下為範例。

    1. default 資料庫下方建立名為 example 的資料表:

      val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example");
      
    2. 插入範例資料:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. 根據資料欄 id 指定區隔策略:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. 建立資料表:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. 將 Iceberg Storage Handler 和 SerDe 新增為資料表屬性:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. 將資料寫入資料表:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. 讀取資料:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. 變更資料表結構定義。以下為範例。

    1. 取得表格並新增資料欄 grade

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. 檢查新的資料表結構定義:

      table.schema.toString;
      
  4. 插入更多資料,並查看結構定義的演變。以下為範例。

    1. 在表格中新增資料:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. 檢查插入的新資料:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. 查看資料表記錄:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. 查看快照:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. 查看資訊清單檔案:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. 查看資料檔案:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. 假設您不小心新增了值為 id=6 的資料列,並想返回查看正確的表格版本:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      snapshot-id 替換為要回復的版本。

使用 Hadoop 資料表建立 Iceberg 資料表

  1. 設定 Hadoop 資料表設定,在 Spark Scala 中建立 Iceberg 資料表:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. 建立資料表,用於插入及更新資料。以下為範例。

    1. default 資料庫下方建立名為 example 的資料表:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. 插入範例資料:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. 根據資料欄 id 指定區隔策略:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. 建立資料表:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. 將資料寫入資料表:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. 讀取資料:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. 變更資料表結構定義。以下為範例。

    1. 取得表格並新增資料欄 grade

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. 檢查新的資料表結構定義:

      table.schema.toString;
      
  4. 插入更多資料,並查看結構定義的演變。以下為範例。

    1. 在表格中新增資料:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. 檢查插入的新資料:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. 查看資料表記錄:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. 查看快照:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. 查看資訊清單檔案:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. 查看資料檔案:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. 返回查看表格的特定版本:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      snapshot-id 替換為要復原的版本,並在結尾加上 "L"。例如:"3943776515926014142L"

在 Hive 上使用 Iceberg 資料表

Iceberg 可透過 StorageHandler 支援使用 Hive 讀取的資料表。請注意,系統僅支援 Hive 2.x 和 3.1.2 版本。詳情請參閱 Apache Iceberg - Hive。此外,請將 Iceberg Hive Runtime JAR 檔案新增至 Hive 類別路徑。如要下載 JAR 檔案,請參閱 Apache Iceberg 下載項目

如要將 Hive 資料表疊加在 Iceberg 資料表上,您必須使用 Hive 目錄或 Hadoop 資料表建立 Iceberg 資料表。此外,您必須據此設定 Hive,才能讀取 Iceberg 資料表中的資料。

讀取 Hive 上的 Iceberg 資料表 (Hive 目錄)

  1. 開啟 Hive 用戶端,並設定讀取 Hive 用戶端工作階段中的 Iceberg 資料表的設定:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. 讀取資料表結構定義和資料。以下為範例。

    1. 檢查資料表結構定義,以及資料表格式是否為 Iceberg:

      describe formatted example;
      
    2. 讀取表格中的資料:

      select * from example;
      

在 Hive 上讀取 Iceberg 資料表 (Hadoop 資料表)

  1. 開啟 Hive 用戶端,並設定讀取 Hive 用戶端工作階段中的 Iceberg 資料表的設定:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. 讀取資料表結構定義和資料。以下為範例。

    1. 建立外部資料表 (在 Iceberg 資料表上疊加 Hive 資料表):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. 檢查資料表結構定義,以及資料表格式是否為 Iceberg:

      describe formatted hadoop_table;
      
    3. 讀取表格中的資料:

      select * from hadoop_table;
      

在 Presto 上使用 Iceberg 資料表

Presto 查詢會使用 Hive 連接器取得分區位置,因此您必須相應設定 Presto,才能讀取及寫入 Iceberg 資料表的資料。詳情請參閱 Presto/Trino - Hive ConnectorPresto/Trino - Iceberg Connector

Presto 設定

  1. 在每個 Dataproc 叢集節點下方,建立名為 iceberg.properties /etc/presto/conf/catalog/iceberg.properties 的檔案,並按照下列方式設定 hive.metastore.uri

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    example.net:9083 替換為 Hive 元儲存庫 Thrift 服務的正確主機和通訊埠。

  2. 重新啟動 Presto 服務,以推送設定:

    sudo systemctl restart presto.service
    

在 Presto 上建立 Iceberg 資料表

  1. 開啟 Presto 用戶端,並使用「Iceberg」連接器取得 Metastore:

    --catalog iceberg --schema default
    
  2. 建立資料表,用於插入及更新資料。以下為範例。

    1. default 資料庫下方建立名為 example 的資料表:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. 插入範例資料:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. 讀取表格中的資料:

      SELECT * FROM iceberg.default.example;
      
    4. 插入更多新資料來查看快照:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. 查看快照:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      只要新增 ORDER BY committed_at DESC LIMIT 1; 指令,即可找出最新的快照 ID。

    6. 復原為表格的特定版本:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      snapshot-id 替換為要回復的版本。

後續步驟