En esta página se explica cómo usar tablas de Apache Iceberg con un servicio Dataproc Metastore asociado a un clúster de Dataproc. Apache Iceberg es un formato de tabla abierta para conjuntos de datos analíticos de gran tamaño.
Compatibilidades
Las tablas Iceberg admiten las siguientes funciones.
Controladores | Seleccionar | Insertar | Crear tabla |
---|---|---|---|
Spark | ✓ | ✓ | ✓ |
Hive | ✓ | ✓ | |
Presto | ✓ | ✓ | ✓ |
Antes de empezar
Usar una tabla de Iceberg con Spark
En el siguiente ejemplo se muestra cómo usar tablas de Iceberg con Spark.
Las tablas Iceberg admiten operaciones de lectura y escritura. Para obtener más información, consulta Apache Iceberg - Spark.
Configuraciones de Spark
Primero, inicia el shell de Spark y usa un segmento de Cloud Storage para almacenar datos. Para incluir Iceberg en la instalación de Spark, añade el archivo JAR de Iceberg Spark Runtime a la carpeta JARs de Spark. Para descargar el archivo JAR, consulta Descargas de Apache Iceberg. El siguiente comando inicia el shell de Spark con compatibilidad con Apache Iceberg:
$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar
Usar el catálogo de Hive para crear tablas de Iceberg
Configura Hive Catalog para crear tablas Iceberg en Spark Scala:
import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.catalog._ import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._ import java.util.HashMap
Crea una tabla para insertar y actualizar datos. A continuación, se muestra un ejemplo.
Crea una tabla llamada
example
en la base de datosdefault
:val catalog = new HiveCatalog(); catalog.setConf(spark.sparkContext.hadoopConfiguration); catalog.initialize("hive", new HashMap[String,String]()); val name = TableIdentifier.of("default","example");
Insertar datos de muestra:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Especifica la estrategia de partición basada en la columna
id
:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Crea la tabla:
val table=catalog.createTable(name,df1_schema,partition_spec);
Añada el controlador de almacenamiento y SerDe de Iceberg como propiedad de la tabla:
table.updateProperties().set("engine.hive.enabled", "true").commit();
Escribe los datos en la tabla:
df1.write.format("iceberg").mode("overwrite").save("default.example");
Lee los datos:
val read_df1=spark.read.format("iceberg").load("default.example"); read_df1.show;
Cambia el esquema de la tabla. A continuación, se muestra un ejemplo.
Obtén la tabla y añade una nueva columna
grade
:val table = catalog.loadTable(TableIdentifier.of("default", "example")); table.updateSchema.addColumn("grade", StringType.get()).commit();
Comprueba el nuevo esquema de la tabla:
table.schema.toString;
Inserta más datos y consulta la evolución del esquema. A continuación, se muestra un ejemplo.
Añadir datos a la tabla:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save("default.example"); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save("default.example");
Comprueba los nuevos datos insertados:
val read_df2=spark.read.format("iceberg").load("default.example"); read_df2.show;
Para ver el historial de la tabla, sigue estos pasos:
spark.read.format("iceberg").load("default.example.history").show(truncate = false);
Para ver las instantáneas, sigue estos pasos:
spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
Ver los archivos de manifiesto:
spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
Ver los archivos de datos:
spark.read.format("iceberg").load("default.example.files").show(truncate = false);
Supongamos que has cometido un error al añadir la fila con el valor
id=6
y quieres volver a ver la versión correcta de la tabla:spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
Sustituye
snapshot-id
por la versión a la que quieras volver.
Usar tablas de Hadoop para crear tablas de Iceberg
Configura las tablas de Hadoop para crear tablas de Iceberg en Spark Scala:
import org.apache.hadoop.conf.Configuration import org.apache.iceberg.hadoop.HadoopTables import org.apache.iceberg.Table import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._
Crea una tabla para insertar y actualizar datos. A continuación, se muestra un ejemplo.
Crea una tabla llamada
example
en la base de datosdefault
:val conf = new Configuration(); val tables = new HadoopTables(conf);
Insertar datos de muestra:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Especifica la estrategia de partición basada en la columna
id
:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Crea la tabla:
val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>"; val table = tables.create(df1_schema, partition_spec, table_location);
Escribe los datos en la tabla:
df1.write.format("iceberg").mode("overwrite").save(table_location);
Lee los datos:
val read_df1=spark.read.format("iceberg").load(table_location); read_df1.show;
Cambia el esquema de la tabla. A continuación, se muestra un ejemplo.
Obtén la tabla y añade una nueva columna
grade
:val table = tables.load(table_location); table.updateSchema.addColumn("grade", StringType.get()).commit();
Comprueba el nuevo esquema de la tabla:
table.schema.toString;
Inserta más datos y consulta la evolución del esquema. A continuación, se muestra un ejemplo.
Añadir datos a la tabla:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save(table_location); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save(table_location);
Comprueba los nuevos datos insertados:
val read_df2=spark.read.format("iceberg").load(table_location); read_df2.show;
Para ver el historial de la tabla, sigue estos pasos:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
Para ver las instantáneas, sigue estos pasos:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
Ver los archivos de manifiesto:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
Ver los archivos de datos:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
Para volver a una versión específica de la tabla, sigue estos pasos:
spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
Sustituye
snapshot-id
por la versión a la que quieras volver y añade"L"
al final. Por ejemplo,"3943776515926014142L"
.
Usar tablas de Iceberg en Hive
Iceberg admite la lectura de tablas mediante Hive con un StorageHandler
. Ten en cuenta que solo se admiten las versiones 2.x y 3.1.2 de Hive. Para obtener más información, consulta Apache Iceberg - Hive. Además, añade el archivo JAR de Iceberg Hive Runtime a la ruta de clases de Hive. Para descargar el archivo JAR, consulta Descargas de Apache Iceberg.
Para superponer una tabla de Hive sobre una tabla de Iceberg, debes crear la tabla de Iceberg con un catálogo de Hive o una tabla de Hadoop. Además, debes configurar Hive para leer los datos de la tabla de Iceberg.
Leer tabla de Iceberg (catálogo de Hive) en Hive
Abre el cliente de Hive y configura la lectura de tablas de Iceberg en la sesión del cliente de Hive:
add jar /path/to/iceberg-hive-runtime.jar; set iceberg.engine.hive.enabled=true; set engine.hive.enabled=true; set iceberg.mr.catalog=hive; set hive.vectorized.execution.enabled=false;
Leer el esquema y los datos de la tabla. A continuación, se muestra un ejemplo.
Comprueba el esquema de la tabla y si el formato de la tabla es Iceberg:
describe formatted example;
Lee los datos de la tabla:
select * from example;
Leer tabla de Iceberg (tabla de Hadoop) en Hive
Abre el cliente de Hive y configura la lectura de tablas de Iceberg en la sesión del cliente de Hive:
add jar /path/to/iceberg-hive-runtime.jar; set engine.hive.enabled=true; set hive.vectorized.execution.enabled=false;
Leer el esquema y los datos de la tabla. A continuación, se muestra un ejemplo.
Crea una tabla externa (superpone una tabla de Hive sobre la tabla de Iceberg):
CREATE EXTERNAL TABLE hadoop_table STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>' TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
Comprueba el esquema de la tabla y si el formato de la tabla es Iceberg:
describe formatted hadoop_table;
Lee los datos de la tabla:
select * from hadoop_table;
Usar tablas Iceberg en Presto
Las consultas de Presto usan el conector de Hive para obtener las ubicaciones de las particiones, por lo que debes configurar Presto para leer y escribir datos en la tabla de Iceberg. Para obtener más información, consulta Presto/Trino - Hive Connector y Presto/Trino - Iceberg Connector.
Configuraciones de Presto
En cada nodo del clúster de Dataproc, crea un archivo llamado
iceberg.properties
/etc/presto/conf/catalog/iceberg.properties
y configurahive.metastore.uri
de la siguiente manera:connector.name=iceberg hive.metastore.uri=thrift://<example.net:9083>
Sustituye
example.net:9083
por el host y el puerto correctos de tu servicio Thrift de metastore de Hive.Reinicia el servicio Presto para enviar las configuraciones:
sudo systemctl restart presto.service
Crear una tabla de Iceberg en Presto
Abre el cliente de Presto y usa el conector "Iceberg" para obtener el metastore:
--catalog iceberg --schema default
Crea una tabla para insertar y actualizar datos. A continuación, se muestra un ejemplo.
Crea una tabla llamada
example
en la base de datosdefault
:CREATE TABLE iceberg.default.example ( id integer, name VARCHAR, major VARCHAR, grade VARCHAR) WITH (partitioning = ARRAY['major', 'grade']);
Insertar datos de muestra:
INSERT INTO iceberg.default.example VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
Leer datos de la tabla:
SELECT * FROM iceberg.default.example;
Inserta más datos nuevos para comprobar las capturas:
INSERT INTO example VALUES (4, 'Cindy', 'UX Design', 'Junior'); INSERT INTO example VALUES (5, 'Amy', 'UX Design', 'Sophomore');
Para ver las instantáneas, sigue estos pasos:
SELECT snapshot_id FROM iceberg.default."example$snapshots";
Si añades el comando
ORDER BY committed_at DESC LIMIT 1;
, puedes encontrar el ID de la última instantánea.Para volver a una versión específica de la tabla, sigue estos pasos:
CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
Sustituye
snapshot-id
por la versión a la que quieras volver.