Composant Iceberg facultatif de Dataproc

Vous pouvez installer des composants supplémentaires tels qu'Iceberg lorsque vous créez un cluster Dataproc à l'aide de la fonctionnalité Composants facultatifs. Cette page explique comment installer le composant Iceberg sur un cluster Dataproc.

Présentation

Apache Iceberg est un format de table ouvert pour les grands ensembles de données analytiques. Il apporte la fiabilité et la simplicité des tables SQL au Big Data, tout en permettant aux moteurs tels que Spark, Trino, PrestoDB, Flink et Hive de fonctionner en toute sécurité avec les mêmes tables en même temps.

Lorsqu'il est installé sur un cluster Dataproc, le composant Apache Iceberg installe les bibliothèques Iceberg et configure Spark et Hive pour qu'ils fonctionnent avec Iceberg sur le cluster.

Principales fonctionnalités d'Iceberg

Voici quelques fonctionnalités d'Iceberg :

  • Évolution du schéma : ajoutez, supprimez ou renommez des colonnes sans réécrire l'intégralité de la table.
  • Déplacements : interrogez les instantanés historiques des tables à des fins d'audit ou de restauration.
  • Partitionnement masqué : optimisez la mise en page des données pour accélérer les requêtes sans exposer les détails des partitions aux utilisateurs.
  • Transactions ACID : elles garantissent la cohérence des données et évitent les conflits.

Versions d'image Dataproc compatibles

Vous pouvez installer le composant Iceberg sur les clusters Dataproc créés avec la version d'image 2.2.47 ou ultérieure. La version d'Iceberg installée sur le cluster est indiquée sur la page Versions 2.2.

Lorsque vous créez un cluster Dataproc avec Iceberg, les propriétés Spark et Hive suivantes sont configurées pour fonctionner avec Iceberg.

Fichier de configuration Propriété Valeur par défaut
/etc/spark/conf/spark-defaults.conf spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.driver.extraClassPath /usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar
spark.executor.extraClassPath /usr/lib/iceberg/lib/iceberg-spark-runtime-spark-version_scala-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/iceberg/lib/iceberg-hive-runtime.jar
iceberg.engine.hive.enabled true

Installer le composant facultatif Iceberg

Installez le composant Iceberg lorsque vous créez un cluster Dataproc. Les pages de la liste des versions d'image de cluster Dataproc indiquent la version du composant Iceberg incluse dans les dernières versions d'image de cluster Dataproc.

ConsoleGoogle Cloud

Pour créer un cluster Dataproc qui installe le composant Iceberg, procédez comme suit dans la console Google Cloud  :

  1. Ouvrez la page Dataproc Créer un cluster. Le panneau Configurer un cluster est sélectionné.
  2. Dans la section Composants, sous Composants facultatifs, sélectionnez le composant Iceberg.
  3. Confirmez ou spécifiez d'autres paramètres de cluster, puis cliquez sur Créer.

Google Cloud CLI

Pour créer un cluster Dataproc qui installe le composant Iceberg, utilisez la commande gcloud dataproc clusters create avec l'option --optional-components.

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=ICEBERG \
     other flags ...

Remplacez les éléments suivants :

API REST

Pour créer un cluster Dataproc qui installe le composant facultatif Iceberg, spécifiez Iceberg SoftwareConfig.Component dans une requête clusters.create.

Utiliser des tables Iceberg avec Spark et Hive

Après avoir créé un cluster Dataproc sur lequel le composant facultatif Iceberg est installé, vous pouvez utiliser Spark et Hive pour lire et écrire des données de table Iceberg.

Spark

Configurer une session Spark pour Iceberg

Vous pouvez utiliser la commande gcloud CLI en local, ou les REPL (boucles Read-Eval-Print) spark-shell ou pyspark exécutés sur le nœud maître du cluster Dataproc pour activer les extensions Spark d'Iceberg et configurer le catalogue Spark afin d'utiliser les tables Iceberg.

gcloud

Exécutez l'exemple de commande gcloud CLI suivant dans une fenêtre de terminal local ou dans Cloud Shell pour envoyer une tâche Spark et définir les propriétés Spark afin de configurer la session Spark pour Iceberg.

gcloud dataproc jobs submit spark  \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --properties="spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --properties="spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --properties="spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER" \
     other flags ...

Remplacez les éléments suivants :

  • CLUSTER_NAME : nom du cluster.
  • REGION : région Compute Engine.
  • CATALOG_NAME : nom du catalogue Iceberg.
  • BUCKET et FOLDER : emplacement du catalogue Iceberg dans Cloud Storage.

spark-shell

Pour configurer une session Spark pour Iceberg à l'aide du REPL spark-shell sur le cluster Dataproc, procédez comme suit :

  1. Utilisez SSH pour vous connecter au nœud maître du cluster Dataproc.

  2. Exécutez la commande suivante dans le terminal de la session SSH pour configurer la session Spark pour Iceberg.

spark-shell \
    --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"

Remplacez les éléments suivants :

  • CLUSTER_NAME : nom du cluster.
  • REGION : région Compute Engine.
  • CATALOG_NAME : nom du catalogue Iceberg.
  • BUCKET et FOLDER : emplacement du catalogue Iceberg dans Cloud Storage.

Interface système pyspark

Pour configurer une session Spark pour Iceberg à l'aide du REPL pyspark sur le cluster Dataproc, procédez comme suit :

  1. Utilisez SSH pour vous connecter au nœud maître du cluster Dataproc.

  2. Exécutez la commande suivante dans le terminal de la session SSH pour configurer la session Spark pour Iceberg :

pyspark \
    --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
    --conf "spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog" \
    --conf "spark.sql.catalog.CATALOG_NAME.type=hadoop" \
    --conf "spark.sql.catalog.CATALOG_NAME.warehouse=gs://BUCKET/FOLDER"

Remplacez les éléments suivants :

  • CLUSTER_NAME : nom du cluster.
  • REGION : région Compute Engine.
  • CATALOG_NAME : nom du catalogue Iceberg.
  • BUCKET et FOLDER : emplacement du catalogue Iceberg dans Cloud Storage.

Écrire des données dans une table Iceberg

Vous pouvez écrire des données dans une table Iceberg à l'aide de Spark. Les extraits de code suivants créent un DataFrame avec des exemples de données, créent une table Iceberg dans Cloud Storage, puis écrivent les données dans la table Iceberg.

PySpark

# Create a DataFrame with sample data.
data = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])

# Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
    id integer,
    name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")

# Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()

Scala

// Create a DataFrame with sample data.
val data = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")

// Create an Iceberg table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS CATALOG_NAME.NAMESPACE.TABLE_NAME (
    id integer,
    name string)
USING iceberg
LOCATION 'gs://BUCKET/FOLDER/NAMESPACE/TABLE_NAME'""")

// Write the DataFrame to the Iceberg table in Cloud Storage.
data.writeTo("CATALOG_NAME.NAMESPACE.TABLE_NAME").append()

Lire des données à partir d'une table Iceberg

Vous pouvez lire les données d'une table Iceberg à l'aide de Spark. Les extraits de code suivants lisent le tableau, puis affichent son contenu.

PySpark

# Read Iceberg table data into a DataFrame.
df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")
# Display the data.
df.show()

Scala

// Read Iceberg table data into a DataFrame.
val df = spark.read.format("iceberg").load("CATALOG_NAME.NAMESPACE.TABLE_NAME")

// Display the data.
df.show()

Spark SQL

SELECT * FROM CATALOG_NAME.NAMESPACE.TABLE_NAME

Hive

Créer une table Iceberg dans Hive

Les clusters Dataproc préconfigurent Hive pour qu'il fonctionne avec Iceberg.

Pour exécuter les extraits de code de cette section, procédez comme suit :

  1. Utilisez SSH pour vous connecter au nœud maître de votre cluster Dataproc.

  2. Affichez beeline dans la fenêtre du terminal SSH.

    beeline -u jdbc:hive2://
    

Vous pouvez créer une table Iceberg partitionnée ou non partitionnée dans Hive.

Table non partitionnée

Créez une table Iceberg non partitionnée dans Hive.

CREATE TABLE my_table (
  id INT,
  name STRING,
  created_at TIMESTAMP
) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

Table partitionnée

Créez une table Iceberg partitionnée dans Hive en spécifiant les colonnes de partition dans la clause PARTITIONED BY.

CREATE TABLE my_partitioned_table (
  id INT,
  name STRING
) PARTITIONED BY (date_sk INT)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

Insérer des données dans une table Iceberg dans Hive

Vous pouvez insérer des données dans une table Iceberg à l'aide des instructions INSERT Hive standards.

SET hive.execution.engine=mr;

INSERT INTO my_table
SELECT 1, 'Alice', current_timestamp();

Limites

  • Seul le moteur d'exécution MR (MapReduce) est compatible avec les opérations LMD (langage de manipulation de données).
  • L'exécution MR est obsolète dans Hive 3.1.3.

Lire les données d'une table Iceberg dans Hive

Pour lire les données d'une table Iceberg, utilisez une instruction SELECT.

SELECT * FROM my_table;

Supprimez une table Iceberg dans Hive.

Pour supprimer une table Iceberg dans Hive, utilisez l'instruction DROP TABLE.

DROP TABLE my_table;

Étapes suivantes