Componente opcional de Iceberg de Dataproc

Puedes instalar componentes adicionales, como Iceberg, cuando creas un clúster de Dataproc con la función de componentes opcionales. En esta página, se describe cómo instalar opcionalmente el componente Iceberg en un clúster de Dataproc.

Descripción general

Apache Iceberg es un formato de tabla abierta para grandes conjuntos de datos analíticos. Ofrece la confiabilidad y la simplicidad de las tablas SQL a Big Data, a la vez que permite que motores como Spark, Trino, PrestoDB, Flink y Hive trabajen de forma segura con las mismas tablas al mismo tiempo.

Cuando se instala en un clúster de Dataproc, el componente Apache Iceberg instala bibliotecas de Iceberg y configura Spark y Hive para que funcionen con Iceberg en el clúster.

Funciones clave del iceberg

Entre las funciones del iceberg, se incluyen las siguientes:

  • Evolución del esquema: Agrega, quita o cambia el nombre de las columnas sin volver a escribir toda la tabla.
  • Viaje en el tiempo: Consulta instantáneas de tablas históricas con fines de auditoría o reversión.
  • Partición oculta: Optimiza el diseño de los datos para realizar consultas más rápidas sin exponer los detalles de la partición a los usuarios.
  • Transacciones ACID: Garantizan la coherencia de los datos y evitan conflictos.

Versiones con imágenes de Dataproc compatibles

Puedes instalar el componente Iceberg en clústeres de Dataproc creados con la versión 2.2.47 y versiones posteriores de imagen. La versión de Iceberg instalada en el clúster aparece en la página Versiones de actualización 2.2.

Cuando creas un clúster de Dataproc con Iceberg, las siguientes propiedades de Spark y Hive se configuran para funcionar con Iceberg.

Archivo de configuración Propiedad Valor predeterminado
/etc/spark/conf/spark-defaults.conf spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.driver.extraClassPath /usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar
spark.executor.extraClassPath /usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/iceberg/lib/iceberg-hive-runtime.jar
iceberg.engine.hive.enabled true

Instala el componente opcional Iceberg

Instala el componente Iceberg cuando crees un clúster de Dataproc. En las páginas de la lista de versiones de imágenes de clúster de Dataproc, se muestra la versión del componente Iceberg incluida en las versiones más recientes de las imágenes de clúster de Dataproc.

Consola de Google Cloud

Para crear un clúster de Dataproc que instale el componente Iceberg, completa los siguientes pasos en la consola de Google Cloud:

  1. Abre la página Create a cluster de Dataproc. Se selecciona el panel Configura el clúster.
  2. En la sección Componentes, en Componentes opcionales, selecciona el componente Iceberg.
  3. Confirma o especifica otros parámetros de configuración del clúster y, luego, haz clic en Crear.

Google Cloud CLI

Para crear un clúster de Dataproc que instale el componente Iceberg, usa el comando gcloud dataproc clusters create con la marca --optional-components.

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=ICEBERG \
     other flags ...

Reemplaza lo siguiente:

API de REST

Para crear un clúster de Dataproc que instale el componente opcional Iceberg, especifica el SoftwareConfig.Component de Iceberg como parte de una solicitud clusters.create.

Usa tablas Iceberg con Spark y Hive

Después de crear un clúster de Dataproc que tenga el componente opcional de Iceberg instalado en el clúster, puedes usar Spark y Hive para leer y escribir datos de tablas de Iceberg.

Spark

Configura una sesión de Spark para Iceberg

Puedes usar el comando gcloud CLI de forma local o las REPL (bucle de lectura, evaluación e impresión) spark-shell o pyspark que se ejecutan en el nodo principal del clúster de dataproc para habilitar las extensiones de Spark de Iceberg y configurar el catálogo de Spark para usar tablas de Iceberg.

gcloud

Ejecuta el siguiente ejemplo de la CLI de gcloud en una ventana de terminal local o en Cloud Shell para enviar un trabajo de Spark y establecer propiedades de Spark para configurar la sesión de Spark para Iceberg.

gcloud dataproc jobs submit spark  \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --properties="spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --properties="spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --properties="spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER" \
     other flags ...

Reemplaza lo siguiente:

  • CLUSTER_NAME: Es el nombre del clúster.
  • REGION: La región de Compute Engine
  • CATALOG_NAME: Es el nombre del catálogo de Iceberg.
  • BUCKET y FOLDER: La ubicación del catálogo de Iceberg en Cloud Storage.

spark-shell

Para configurar una sesión de Spark para Iceberg con la REPL de spark-shell en el clúster de Dataproc, completa los siguientes pasos:

  1. Usa SSH para conectarte al nodo principal del clúster de Dataproc.

  2. Ejecuta el siguiente comando en la terminal de la sesión de SSH para configurar la sesión de Spark para Iceberg.

spark-shell \
    --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"

Reemplaza lo siguiente:

  • CLUSTER_NAME: Es el nombre del clúster.
  • REGION: La región de Compute Engine
  • CATALOG_NAME: Es el nombre del catálogo de Iceberg.
  • BUCKET y FOLDER: La ubicación del catálogo de Iceberg en Cloud Storage.

Shell de pyspark

Para configurar una sesión de Spark para Iceberg con la REPL de pyspark en el clúster de Dataproc, completa los siguientes pasos:

  1. Usa SSH para conectarte al nodo principal del clúster de Dataproc.

  2. Ejecuta el siguiente comando en la terminal de la sesión de SSH para configurar la sesión de Spark para Iceberg:

pyspark \
    --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"

Reemplaza lo siguiente:

  • CLUSTER_NAME: Es el nombre del clúster.
  • REGION: La región de Compute Engine
  • CATALOG_NAME: Es el nombre del catálogo de Iceberg.
  • BUCKET y FOLDER: La ubicación del catálogo de Iceberg en Cloud Storage.

Cómo escribir datos en una tabla de Iceberg

Puedes escribir datos en una tabla de Iceberg con Spark. Los siguientes fragmentos de código crean un DataFrame con datos de muestra, crean una tabla de Iceberg en Cloud Storage y, luego, escriben los datos en la tabla de Iceberg.

PySpark

# Create a DataFrame with sample data.
data = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])

# Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
    id integer,
    name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")

# Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()

Scala

// Create a DataFrame with sample data.
val data = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")

// Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
    id integer,
    name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")

// Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()

Cómo leer datos de una tabla de Iceberg

Puedes leer datos de una tabla de Iceberg con Spark. En los siguientes fragmentos de código, se lee la tabla y, luego, se muestra su contenido.

PySpark

# Read Iceberg table data into a DataFrame.
df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")
# Display the data.
df.show()

Scala

// Read Iceberg table data into a DataFrame.
val df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")

// Display the data.
df.show()

Spark SQL

SELECT * FROM CATALOG_NAME.NAMESPACE.TABLE_NAME

Hive

Crea una tabla de Iceberg en Hive

Los clústeres de Dataproc preconfiguran Hive para que funcione con Iceberg.

Para ejecutar los fragmentos de código de esta sección, completa los siguientes pasos:

  1. Usa SSH para conectarte al nodo principal de tu clúster de Dataproc.

  2. Muestra beeline en la ventana de la terminal SSH.

    beeline -u jdbc:hive2://
    

Puedes crear una tabla de Iceberg particionada o no particionada en Hive.

Tabla no particionada

Crea una tabla Iceberg sin particiones en Hive.

CREATE TABLE my_table (
  id INT,
  name STRING,
  created_at TIMESTAMP
) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

Tabla particionada

Para crear una tabla Iceberg particionada en Hive, especifica las columnas de partición en la cláusula PARTITIONED BY.

CREATE TABLE my_partitioned_table (
  id INT,
  name STRING
) PARTITIONED BY (date_sk INT)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

Cómo insertar datos en una tabla Iceberg en Hive

Puedes insertar datos en una tabla Iceberg con las declaraciones INSERT estándar de Hive.

SET hive.execution.engine=mr;

INSERT INTO my_table
SELECT 1, 'Alice', current_timestamp();

Limitaciones

  • El motor de ejecución de MR (MapReduce) solo es compatible con las operaciones de DML (lenguaje de manipulación de datos).
  • La ejecución de MR dejó de estar disponible en Hive 3.1.3.

Lee datos de una tabla Iceberg en Hive

Para leer datos de una tabla de Iceberg, usa una sentencia SELECT.

SELECT * FROM my_table;

Coloca una tabla Iceberg en Hive.

Para descartar una tabla Iceberg en Hive, usa la sentencia DROP TABLE.

DROP TABLE my_table;

Más información