Usa tablas de Apache Iceberg con Dataproc Metastore

En esta página, se explica cómo usar tablas de Apache Iceberg con un servicio de Dataproc Metastore adjunto a un clúster de Dataproc. Apache Iceberg es un formato de tabla abierta para grandes conjuntos de datos analíticos.

Compatibilidades

Las tablas de Iceberg admiten las siguientes funciones.

Controladores Selecciona Insertar Crear tabla
Spark
Hive
Presto

Antes de comenzar

Usar la tabla Iceberg con Spark

En el siguiente ejemplo, se muestra cómo usar tablas de Iceberg con Spark.

Las tablas de Iceberg admiten operaciones de lectura y escritura. Para obtener más información, consulta Apache Iceberg: Spark.

Configuraciones de Spark

Primero, inicia el shell de Spark y usa un bucket de Cloud Storage para almacenar datos. Para incluir Iceberg en la instalación de Spark, agrega el archivo JAR de Iceberg Spark Runtime a la carpeta de archivos JAR de Spark. Para descargar el archivo JAR, consulta Descargas de Apache Iceberg. El siguiente comando inicia el shell de Spark con compatibilidad para Apache Iceberg:

$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar

Usa Hive Catalog para crear tablas Iceberg

  1. Configura los parámetros de configuración del catálogo de Hive para crear tablas de Iceberg en Spark Scala:

    import org.apache.iceberg.hive.HiveCatalog
    import org.apache.iceberg.catalog._
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    import java.util.HashMap
    
  2. Crea una tabla para insertar y actualizar datos. A continuación, se muestra un ejemplo.

    1. Crea una tabla llamada example en la base de datos default:

      val catalog = new HiveCatalog();
      catalog.setConf(spark.sparkContext.hadoopConfiguration);
      catalog.initialize("hive", new HashMap[String,String]());
      
      val name = TableIdentifier.of("default","example");
      
    2. Inserta datos de muestra:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Especifica la estrategia de partición basada en la columna id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Crea la tabla:

      val table=catalog.createTable(name,df1_schema,partition_spec);
      
    5. Agrega el controlador de almacenamiento y el SerDe de Iceberg como la propiedad de la tabla:

      table.updateProperties().set("engine.hive.enabled", "true").commit();
      
    6. Escribe los datos en la tabla:

      df1.write.format("iceberg").mode("overwrite").save("default.example");
      
    7. Lee los datos:

      val read_df1=spark.read.format("iceberg").load("default.example");
      read_df1.show;
      
  3. Cambia el esquema de la tabla. A continuación, se muestra un ejemplo.

    1. Obtén la tabla y agrega una columna nueva grade:

      val table = catalog.loadTable(TableIdentifier.of("default", "example"));
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Verifica el esquema de la tabla nueva:

      table.schema.toString;
      
  4. Inserta más datos y consulta la evolución del esquema. A continuación, se muestra un ejemplo.

    1. Agrega datos nuevos a la tabla:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save("default.example");
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save("default.example");
      
    2. Verifica los datos nuevos insertados:

      val read_df2=spark.read.format("iceberg").load("default.example");
      read_df2.show;
      
    3. Sigue estos pasos para ver el historial de la tabla:

      spark.read.format("iceberg").load("default.example.history").show(truncate = false);
      
    4. Sigue estos pasos para ver las instantáneas:

      spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
      
    5. Consulta los archivos de manifiesto:

      spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
      
    6. Visualiza los archivos de datos:

      spark.read.format("iceberg").load("default.example.files").show(truncate = false);
      
    7. Supongamos que cometiste un error al agregar la fila con el valor de id=6 y quieres volver a ver una versión correcta de la tabla:

      spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
      

      Reemplaza snapshot-id por la versión a la que deseas volver.

Usa tablas de Hadoop para crear tablas de Iceberg

  1. Configura las tablas de Hadoop para crear tablas de Iceberg en Spark Scala:

    import org.apache.hadoop.conf.Configuration
    import org.apache.iceberg.hadoop.HadoopTables
    import org.apache.iceberg.Table
    import org.apache.iceberg.Schema
    import org.apache.iceberg.types.Types._
    import org.apache.iceberg.PartitionSpec
    import org.apache.iceberg.spark.SparkSchemaUtil
    import org.apache.spark.sql._
    
  2. Crea una tabla para insertar y actualizar datos. A continuación, se muestra un ejemplo.

    1. Crea una tabla llamada example en la base de datos default:

      val conf = new Configuration();
      val tables = new HadoopTables(conf);
      
    2. Inserta datos de muestra:

      val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major");
      val df1_schema = SparkSchemaUtil.convert(df1.schema);
      
    3. Especifica la estrategia de partición basada en la columna id:

      val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
      
    4. Crea la tabla:

      val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>";
      val table = tables.create(df1_schema, partition_spec, table_location);
      
    5. Escribe los datos en la tabla:

      df1.write.format("iceberg").mode("overwrite").save(table_location);
      
    6. Lee los datos:

      val read_df1=spark.read.format("iceberg").load(table_location);
      read_df1.show;
      
  3. Cambia el esquema de la tabla. A continuación, se muestra un ejemplo.

    1. Obtén la tabla y agrega una columna nueva grade:

      val table = tables.load(table_location);
      table.updateSchema.addColumn("grade", StringType.get()).commit();
      
    2. Verifica el esquema de la tabla nueva:

      table.schema.toString;
      
  4. Inserta más datos y consulta la evolución del esquema. A continuación, se muestra un ejemplo.

    1. Agrega datos nuevos a la tabla:

      val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade");
      df2.write.format("iceberg").mode("append").save(table_location);
      
      val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade");
      df3.write.format("iceberg").mode("append").save(table_location);
      
    2. Verifica los datos nuevos insertados:

      val read_df2=spark.read.format("iceberg").load(table_location);
      read_df2.show;
      
    3. Sigue estos pasos para ver el historial de la tabla:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
      
    4. Sigue estos pasos para ver las instantáneas:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
      
    5. Consulta los archivos de manifiesto:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
      
    6. Visualiza los archivos de datos:

      spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
      
    7. Para volver a ver una versión específica de la tabla, haz lo siguiente:

      spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
      

      Reemplaza snapshot-id por la versión a la que deseas volver y agrega "L" al final. Por ejemplo, "3943776515926014142L".

Usar la tabla Iceberg en Hive

Iceberg admite tablas leídas con Hive a través de un StorageHandler. Ten en cuenta que solo se admiten las versiones 2.x y 3.1.2 de Hive. Para obtener más información, consulta Apache Iceberg: Hive. Además, agrega el archivo JAR de Iceberg Hive Runtime a la ruta de clase de Hive. Para descargar el archivo JAR, consulta Descargas de Apache Iceberg.

Para superponer una tabla de Hive sobre una tabla de Iceberg, debes crear la tabla de Iceberg con un catálogo de Hive o una tabla de Hadoop. Además, debes configurar Hive de manera adecuada para leer datos de la tabla de Iceberg.

Leer la tabla de Iceberg (catálogo de Hive) en Hive

  1. Abre el cliente de Hive y establece las configuraciones para leer las tablas de Iceberg en la sesión de cliente de Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set iceberg.engine.hive.enabled=true;
    set engine.hive.enabled=true;
    set iceberg.mr.catalog=hive;
    set hive.vectorized.execution.enabled=false;
    
  2. Leer el esquema y los datos de la tabla A continuación, se muestra un ejemplo.

    1. Verifica el esquema de la tabla y si el formato de la tabla es Iceberg:

      describe formatted example;
      
    2. Lee los datos de la tabla:

      select * from example;
      

Cómo leer una tabla de Iceberg (tabla de Hadoop) en Hive

  1. Abre el cliente de Hive y establece las configuraciones para leer las tablas de Iceberg en la sesión de cliente de Hive:

    add jar /path/to/iceberg-hive-runtime.jar;
    set engine.hive.enabled=true;
    set hive.vectorized.execution.enabled=false;
    
  2. Leer el esquema y los datos de la tabla A continuación, se muestra un ejemplo.

    1. Crea una tabla externa (superpón una tabla de Hive a la tabla de Iceberg):

      CREATE EXTERNAL TABLE hadoop_table
      STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
      LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>'
      TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
      
    2. Verifica el esquema de la tabla y si el formato de la tabla es Iceberg:

      describe formatted hadoop_table;
      
    3. Lee los datos de la tabla:

      select * from hadoop_table;
      

Usar la tabla Iceberg en Presto

Las consultas de Presto usan el conector de Hive para obtener las ubicaciones de las particiones, por lo que debes configurar Presto de manera adecuada para leer y escribir datos en la tabla de Iceberg. Para obtener más información, consulta Presto/Trino: conector de Hive y Presto/Trino: conector de Iceberg.

Configuraciones de Presto

  1. En cada nodo del clúster de Dataproc, crea un archivo llamado iceberg.properties /etc/presto/conf/catalog/iceberg.properties y configura hive.metastore.uri de la siguiente manera:

    connector.name=iceberg
    hive.metastore.uri=thrift://<example.net:9083>
    

    Reemplaza example.net:9083 por el host y el puerto correctos para tu servicio Thrift de metastore de Hive.

  2. Reinicia el servicio de Presto para enviar las configuraciones:

    sudo systemctl restart presto.service
    

Crea una tabla de Iceberg en Presto

  1. Abre el cliente de Presto y usa el conector "Iceberg" para obtener el almacén de metadatos:

    --catalog iceberg --schema default
    
  2. Crea una tabla para insertar y actualizar datos. A continuación, se muestra un ejemplo.

    1. Crea una tabla llamada example en la base de datos default:

      CREATE TABLE iceberg.default.example (
        id integer,
        name VARCHAR,
        major VARCHAR,
        grade VARCHAR)
      WITH (partitioning = ARRAY['major', 'grade']);
      
    2. Inserta datos de muestra:

      INSERT INTO iceberg.default.example
        VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
      
    3. Lee datos de la tabla:

      SELECT * FROM iceberg.default.example;
      
    4. Inserta más datos nuevos para verificar las instantáneas:

      INSERT INTO example
        VALUES (4, 'Cindy', 'UX Design', 'Junior');
      
      INSERT INTO example
        VALUES (5, 'Amy', 'UX Design', 'Sophomore');
      
    5. Sigue estos pasos para ver las instantáneas:

      SELECT snapshot_id FROM iceberg.default."example$snapshots";
      

      Si agregas el comando ORDER BY committed_at DESC LIMIT 1;, puedes encontrar el ID de instantánea más reciente.

    6. Para revertir a una versión específica de la tabla, haz lo siguiente:

      CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
      

      Reemplaza snapshot-id por la versión a la que deseas volver.

¿Qué sigue?