Utiliser BigLake Metastore avec Dataproc
Ce document explique comment utiliser le métastore BigLake avec Dataproc sur Compute Engine. Cette connexion vous fournit un métastore unique et partagé qui fonctionne avec les moteurs logiciels Open Source, tels qu'Apache Spark ou Apache Flink.
Avant de commencer
- Activez la facturation pour votre projet Google Cloud . Découvrez comment vérifier si la facturation est activée sur un projet.
Activez les API BigQuery et Dataproc.
(Facultatif) Découvrez le fonctionnement de BigLake Metastore et pourquoi vous devriez l'utiliser.
Rôles requis
Pour obtenir les autorisations nécessaires à l'utilisation de Spark ou Flink et de Dataproc avec le metastore BigLake comme magasin de métadonnées, demandez à votre administrateur de vous accorder les rôles IAM suivants :
-
Créez un cluster Dataproc :
Nœud de calcul Dataproc (
roles/dataproc.worker
) sur le compte de service Compute Engine par défaut du projet -
Créez des tables BigLake Metastore dans Spark ou Flink :
-
Nœud de calcul Dataproc (
roles/dataproc.worker
) sur le compte de service de VM Dataproc dans le projet -
Éditeur de données BigQuery (
roles/bigquery.dataEditor
) sur le compte de service de VM Dataproc dans le projet -
Administrateur des objets de l'espace de stockage (
roles/storage.objectAdmin
) sur le compte de service de la VM Dataproc dans le projet
-
Nœud de calcul Dataproc (
-
Interrogez les tables du metastore BigLake dans BigQuery :
-
Lecteur de données BigQuery (
roles/bigquery.dataViewer
) sur le projet -
Utilisateur BigQuery (
roles/bigquery.user
) sur le projet -
Lecteur des objets Storage (
roles/storage.objectViewer
) sur le projet
-
Lecteur de données BigQuery (
Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.
Vous pouvez également obtenir les autorisations requises avec des rôles personnalisés ou d'autres rôles prédéfinis.
Workflow général
Pour utiliser Dataproc sur Compute Engine avec un metastore BigLake, procédez comme suit :
- Créez un cluster Dataproc ou configurez-en un existant.
- Connectez-vous à votre moteur logiciel Open Source préféré, tel que Spark ou Flink.
- Utilisez un fichier JAR pour installer le plug-in de catalogue Apache Iceberg sur le cluster.
- Créez et gérez vos ressources BigLake Metastore selon vos besoins, en fonction du moteur logiciel Open Source que vous utilisez.
- Dans BigQuery, accédez à vos ressources BigLake Metastore et utilisez-les.
Connecter BigLake Metastore à Spark
Les instructions suivantes vous expliquent comment connecter Dataproc au metastore BigLake à l'aide de SparkSQL interactif.
Télécharger le plug-in de catalogue Iceberg
Pour connecter BigLake Metastore à Dataproc et Spark, vous devez utiliser le fichier jar du plug-in de catalogue Iceberg BigLake Metastore.
Ce fichier est inclus par défaut dans la version 2.2 de l'image Dataproc. Si vos clusters Dataproc n'ont pas d'accès direct à Internet, vous devez télécharger le plug-in et l'importer dans un bucket Cloud Storage auquel votre cluster Dataproc peut accéder.
Téléchargez le plug-in de catalogue Iceberg BigLake Metastore.
Configurer un cluster Dataproc
Avant de vous connecter à BigLake Metastore, vous devez configurer un cluster Dataproc.
Pour ce faire, vous pouvez créer un cluster ou en utiliser un existant. Vous utiliserez ensuite ce cluster pour exécuter Spark SQL de manière interactive et gérer vos ressources BigLake Metastore.
L'accès privé à Google (APG) doit être activé sur le sous-réseau de la région dans laquelle le cluster est créé. Par défaut, les VM de cluster Dataproc créées avec une version d'image 2.2 (par défaut) ou ultérieure n'ont que des adresses IP internes. Pour permettre aux VM du cluster de communiquer avec les API Google, activez l'accès privé à Google sur le sous-réseau
default
(ou le nom de réseau spécifié par l'utilisateur, le cas échéant) dans la région où le cluster est créé.Si vous souhaitez exécuter l'exemple d'interface Web Zeppelin de ce guide, vous devez utiliser ou créer un cluster Dataproc avec le composant Zeppelin facultatif activé.
Nouveau cluster
Pour créer un cluster Dataproc, exécutez la commande gcloud
dataproc clusters create
suivante. Cette configuration contient les paramètres dont vous avez besoin pour utiliser BigLake Metastore.
gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=LOCATION \ --optional-components=ZEPPELIN \ --enable-component-gateway \ --single-node
Remplacez les éléments suivants :
CLUSTER_NAME
: nom de votre cluster Dataproc.PROJECT_ID
: ID du Google Cloud projet dans lequel vous créez le cluster.LOCATION
: région Google Cloud dans laquelle vous créez le cluster.
Cluster existant
Pour configurer un cluster existant, ajoutez le runtime Iceberg Spark suivant à votre cluster.
org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1
Vous pouvez ajouter le runtime en utilisant l'une des options suivantes :
Script d'initialisation : Ajoutez la dépendance d'exécution à un script d'initialisation personnalisé qui s'exécute lorsque le est créé.
Après avoir ajouté la dépendance d'exécution au script, suivez les instructions pour créer, recréer et mettre à jour un cluster.
Installation manuelle. Ajoutez manuellement le fichier JAR du plug-in de catalogue Iceberg et configurez les propriétés Spark pour inclure le runtime sur votre cluster.
Envoyer une tâche Spark
Pour envoyer une tâche Spark, utilisez l'une des méthodes suivantes :
CLI gcloud
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region==REGION \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --execute="SPARK_SQL_COMMAND"
Remplacez les éléments suivants :
PROJECT_ID
: ID du Google Cloud projet contenant le cluster Dataproc.CLUSTER_NAME
: nom du cluster Dataproc que vous utilisez pour exécuter le job Spark SQL.REGION
: région Compute Engine dans laquelle se trouve votre cluster.LOCATION
: emplacement des ressources BigQuery.CATALOG_NAME
: nom du catalogue Spark que vous utilisez avec votre job SQL.WAREHOUSE_DIRECTORY
: dossier Cloud Storage contenant votre entrepôt de données. Cette valeur commence pargs://
.SPARK_SQL_COMMAND
: requête SparkSQL que vous souhaitez exécuter. Cette requête inclut les commandes permettant de créer vos ressources. Par exemple, pour créer un espace de noms et une table.
Étincelle interactive
Se connecter à Spark et installer le plug-in de catalogue
Pour installer le plug-in de catalogue pour BigLake Metastore, connectez-vous à votre cluster Dataproc à l'aide de SSH.
- Dans la console Google Cloud , accédez à la page Instances de VM.
Pour vous connecter à une instance de VM Dataproc, cliquez sur SSH dans la liste des instances de machines virtuelles. Le résultat ressemble à ce qui suit :
Connected, host fingerprint: ssh-rsa ... Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ... ... example-cluster@cluster-1-m:~$
Dans le terminal, exécutez la commande d'initialisation du metastore BigLake suivante :
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
Remplacez les éléments suivants :
CATALOG_NAME
: nom du catalogue Spark que vous utilisez avec votre job SQL.PROJECT_ID
: ID du projet Google Cloud du catalogue BigLake Metastore auquel votre catalogue Spark est associé.LOCATION
: Google Cloud emplacement du metastore BigLake.WAREHOUSE_DIRECTORY
: dossier Cloud Storage contenant votre entrepôt de données. Cette valeur commence pargs://
.
Une fois que vous vous êtes connecté à un cluster, l'invite
spark-sql
s'affiche dans votre terminal Spark.spark-sql (default)>
Gérer les ressources BigLake Metastore
Vous êtes maintenant connecté à BigLake Metastore. Vous pouvez afficher vos ressources existantes ou en créer en fonction de vos métadonnées stockées dans BigLake Metastore.
Par exemple, essayez d'exécuter les commandes suivantes dans la session SparkSQL interactive pour créer un espace de noms et une table Iceberg.
Utilisez le catalogue Iceberg personnalisé :
USE `CATALOG_NAME`;
Créez un espace de noms :
CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;
Utilisez l'espace de noms créé :
USE NAMESPACE_NAME;
Créez une table Iceberg :
CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;
Insérer une ligne dans un tableau :
INSERT INTO TABLE_NAME VALUES (1, "first row");
Ajouter une colonne de tableau :
ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
Afficher les métadonnées de table :
DESCRIBE EXTENDED TABLE_NAME;
Répertoriez les tables dans l'espace de noms :
SHOW TABLES;
Notebook Zeppelin
Dans la console Google Cloud , accédez à la page Clusters Dataproc.
Cliquez sur le nom du cluster que vous souhaitez utiliser.
La page Détails du cluster s'ouvre.
Dans le menu de navigation, cliquez sur Interfaces Web.
Sous Passerelle des composants, cliquez sur Zeppelin. La page du notebook Zeppelin s'ouvre.
Dans le menu de navigation, cliquez sur Notebook, puis sur + Créer une note.
Dans la boîte de dialogue, saisissez le nom du notebook. Laissez Spark sélectionné comme interpréteur par défaut.
Cliquez sur Créer. Un notebook est créé.
Dans le notebook, cliquez sur le menu des paramètres, puis sur Interprète.
Dans le champ Rechercher des interprètes, recherchez Spark.
Cliquez sur Modifier.
Dans le champ Spark.jars, saisissez l'URI du fichier jar Spark.
https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar
Cliquez sur Enregistrer.
Cliquez sur OK.
Copiez le code PySpark suivant dans votre notebook Zeppelin.
%pyspark from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("BigLake Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("select version()").show() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;") spark.sql("DESCRIBE TABLE_NAME;").show()
Remplacez les éléments suivants :
CATALOG_NAME
: nom du catalogue Spark à utiliser pour le job SQL.PROJECT_ID
: ID du Google Cloud projet contenant le cluster Dataproc.WAREHOUSE_DIRECTORY
: dossier Cloud Storage contenant votre entrepôt de données. Cette valeur commence pargs://
.NAMESPACE_NAME
: nom de l'espace de noms qui fait référence à votre table Spark.WAREHOUSE_DIRECTORY
: URI du dossier Cloud Storage dans lequel votre entrepôt de données est stocké.TABLE_NAME
: nom de table pour votre table Spark.
Cliquez sur l'icône d'exécution ou appuyez sur
Shift-Enter
pour exécuter le code. Une fois le job terminé, le message d'état "Spark Job Finished" (Job Spark terminé) s'affiche, et le résultat indique le contenu de la table :
Connecter BigLake Metastore à Flink
Les instructions suivantes vous expliquent comment connecter Dataproc au metastore BigLake à l'aide du client Flink SQL.
Installer le plug-in de catalogue et se connecter à une session Flink
Pour connecter BigLake Metastore à Flink, procédez comme suit :
- Créez un cluster Dataproc avec le composant Flink facultatif activé et assurez-vous d'utiliser Dataproc 2.2 ou version ultérieure.
Dans la console Google Cloud , accédez à la page Instances de VM.
Dans la liste des instances de machine virtuelle, cliquez sur SSH pour vous connecter à une instance de VM Dataproc.
Configurez le plug-in de catalogue personnalisé Iceberg pour BigLake Metastore :
FLINK_VERSION=1.17 ICEBERG_VERSION=1.5.2 cd /usr/lib/flink sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
Démarrez la session Flink sur YARN :
HADOOP_CLASSPATH=`hadoop classpath` sudo bin/yarn-session.sh -nm flink-dataproc -d sudo bin/sql-client.sh embedded \ -s yarn-session
Créez un catalogue dans Flink :
CREATE CATALOG CATALOG_NAME WITH ( 'type'='iceberg', 'warehouse'='WAREHOUSE_DIRECTORY', 'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', 'gcp_project'='PROJECT_ID', 'gcp_location'='LOCATION' );
Remplacez les éléments suivants :
CATALOG_NAME
: identifiant du catalogue Flink, associé à un catalogue BigLake Metastore.WAREHOUSE_DIRECTORY
: chemin de base du répertoire de l'entrepôt (dossier Cloud Storage dans lequel Flink crée des fichiers). Cette valeur commence pargs://
.PROJECT_ID
: ID du projet du catalogue BigLake Metastore auquel le catalogue Flink est associé.LOCATION
: emplacement des ressources BigQuery.
Votre session Flink est désormais connectée au metastore BigLake. Vous pouvez exécuter des commandes Flink SQL.
Gérer les ressources BigLake Metastore
Maintenant que vous êtes connecté à BigLake Metastore, vous pouvez créer et afficher des ressources en fonction des métadonnées stockées dans BigLake Metastore.
Par exemple, essayez d'exécuter les commandes suivantes dans votre session Flink SQL interactive pour créer une base de données et une table Iceberg.
Utilisez le catalogue Iceberg personnalisé :
USE CATALOG CATALOG_NAME;
Remplacez
CATALOG_NAME
par l'identifiant de votre catalogue Flink.Créez une base de données, ce qui crée un ensemble de données dans BigQuery :
CREATE DATABASE IF NOT EXISTS DATABASE_NAME;
Remplacez
DATABASE_NAME
par le nom de votre nouvelle base de données.Utilisez la base de données que vous avez créée :
USE DATABASE_NAME;
Créez une table Iceberg. L'exemple suivant crée une table de ventes :
CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) );
Remplacez
ICEBERG_TABLE_NAME
par le nom de votre nouvelle table.Afficher les métadonnées de table :
DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
Répertoriez les tables de la base de données :
SHOW TABLES;
Ingérer des données dans votre table
Après avoir créé une table Iceberg dans la section précédente, vous pouvez utiliser Flink DataGen comme source de données pour ingérer des données en temps réel dans votre table. Voici un exemple de ce workflow :
Créez une table temporaire à l'aide de DataGen :
CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.order_number.kind' = 'sequence', 'fields.order_number.start' = '1', 'fields.order_number.end' = '1000000', 'fields.price.min' = '0', 'fields.price.max' = '10000', 'fields.buyer.first_name.length' = '10', 'fields.buyer.last_name.length' = '10' ) LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);
Remplacez les éléments suivants :
DATABASE_NAME
: nom de la base de données dans laquelle stocker votre table temporaire.TEMP_TABLE_NAME
: nom de votre table temporaire.ICEBERG_TABLE_NAME
: nom de la table Iceberg que vous avez créée dans la section précédente.
Définissez le parallélisme sur 1 :
SET 'parallelism.default' = '1';
Définissez l'intervalle de point de contrôle :
SET 'execution.checkpointing.interval' = '10second';
Définissez le point de contrôle :
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
Démarrez le job de streaming en temps réel :
INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;
Le résultat ressemble à ce qui suit :
[INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 0de23327237ad8a811d37748acd9c10b
Pour vérifier l'état du job de streaming, procédez comme suit :
Dans la console Google Cloud , accédez à la page Clusters.
Sélectionnez votre cluster.
Cliquez sur l'onglet Interfaces Web.
Cliquez sur le lien Gestionnaire de ressources YARN.
Dans l'interface YARN ResourceManager, recherchez votre session Flink et cliquez sur le lien ApplicationMaster sous Tracking UI.
Dans la colonne État, vérifiez que l'état de votre job est En cours d'exécution.
Interrogez les données de streaming dans le client Flink SQL :
SELECT * FROM ICEBERG_TABLE_NAME /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ ORDER BY order_time desc LIMIT 20;
Interroger les flux de données dans BigQuery :
SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME` ORDER BY order_time desc LIMIT 20;
Arrêtez le job de streaming dans le client Flink SQL :
STOP JOB 'JOB_ID';
Remplacez
JOB_ID
par l'ID de tâche qui s'est affiché dans le résultat lorsque vous avez créé la tâche de streaming.
Étapes suivantes
- Configurez les fonctionnalités BigLake Metastore facultatives.
- Affichez et interrogez des tables depuis Spark dans BigQuery.