Utilizza BigLake Metastore con Dataproc
Questo documento spiega come utilizzare BigLake Metastore con Dataproc su Compute Engine. Questa connessione fornisce un metastore singolo e condiviso che funziona con motori software open source, come Apache Spark o Apache Flink.
Prima di iniziare
- Abilita la fatturazione per il tuo progetto Google Cloud . Scopri come verificare se la fatturazione è abilitata per un progetto.
Abilita le API BigQuery e Dataproc.
(Facoltativo) Scopri come funziona BigLake Metastore e perché dovresti utilizzarlo.
Ruoli obbligatori
Per ottenere le autorizzazioni necessarie per utilizzare Spark o Flink e Dataproc con BigLake Metastore come archivio metadati, chiedi all'amministratore di concederti i seguenti ruoli IAM:
-
Crea un cluster Dataproc:
Dataproc Worker (
roles/dataproc.worker
) sul account di servizio Compute Engine predefinito nel progetto -
Crea tabelle BigLake Metastore in Spark o Flink:
-
Dataproc Worker (
roles/dataproc.worker
) sull'account di servizio VM Dataproc nel progetto -
Editor dati BigQuery (
roles/bigquery.dataEditor
) nell'account di servizio VM Dataproc nel progetto -
Storage Object Admin (
roles/storage.objectAdmin
) sul account di servizio VM Dataproc nel progetto
-
Dataproc Worker (
-
Esegui query sulle tabelle del metastore BigLake in BigQuery:
-
Visualizzatore dati BigQuery (
roles/bigquery.dataViewer
) sul progetto -
Utente BigQuery (
roles/bigquery.user
) sul progetto -
Visualizzatore oggetti Storage (
roles/storage.objectViewer
) del progetto
-
Visualizzatore dati BigQuery (
Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.
Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.
Flusso di lavoro generale
Per utilizzare Dataproc su Compute Engine con BigLake Metastore, segui questi passaggi generali:
- Crea un cluster Dataproc o configura un cluster esistente.
- Connettiti al motore software open source che preferisci, ad esempio Spark o Flink.
- Utilizza un file JAR per installare il plug-in del catalogo Apache Iceberg sul cluster.
- Crea e gestisci le risorse del metastore BigLake in base alle esigenze, a seconda del motore software open source che utilizzi.
- In BigQuery, accedi e utilizza le risorse del metastore BigLake.
Connetti BigLake Metastore a Spark
Le seguenti istruzioni mostrano come connettere Dataproc a BigLake Metastore utilizzando Spark SQL interattivo.
Scaricare il plug-in del catalogo Iceberg
Per connettere BigLake Metastore a Dataproc e Spark, devi utilizzare il file JAR del plug-in del catalogo Iceberg di BigLake Metastore.
Questo file è incluso per impostazione predefinita nella versione 2.2 dell'immagine Dataproc. Se i cluster Dataproc non hanno accesso diretto a internet, devi scaricare il plug-in e caricarlo in un bucket Cloud Storage a cui il cluster Dataproc può accedere.
Scarica il plug-in del catalogo Iceberg di BigLake Metastore.
Configura un cluster Dataproc
Prima di connetterti a BigLake Metastore, devi configurare un cluster Dataproc.
Per farlo, puoi creare un nuovo cluster o utilizzarne uno esistente. Successivamente, utilizzi questo cluster per eseguire Spark SQL interattivo e gestire le risorse del metastore BigLake.
La subnet nella regione in cui viene creato il cluster deve avere l'accesso privato Google (PGA) abilitato. Per impostazione predefinita, le VM del cluster Dataproc, create con una versione dell'immagine 2.2 (predefinita) o successiva, hanno solo indirizzi IP interni. Per consentire alle VM del cluster di comunicare con le API di Google, attiva l'accesso privato Google nella subnet di rete
default
(o nel nome della rete specificato dall'utente, se applicabile) nella regione in cui viene creato il cluster.Se vuoi eseguire l'esempio di interfaccia web Zeppelin in questa guida, devi utilizzare o creare un cluster Dataproc con il componente facoltativo Zeppelin abilitato.
Nuovo cluster
Per creare un nuovo cluster Dataproc, esegui il seguente comando gcloud
dataproc clusters create
. Questa configurazione contiene le
impostazioni necessarie per utilizzare BigLake Metastore.
gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=LOCATION \ --optional-components=ZEPPELIN \ --enable-component-gateway \ --single-node
Sostituisci quanto segue:
CLUSTER_NAME
: un nome per il tuo cluster Dataproc.PROJECT_ID
: l'ID del Google Cloud progetto in cui stai creando il cluster.LOCATION
: la Google Cloud regione in cui stai creando il cluster.
Cluster esistente
Per configurare un cluster esistente, aggiungi il runtime Iceberg Spark al cluster.
org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1
Puoi aggiungere la durata utilizzando una delle seguenti opzioni:
Initialization Script. Aggiungi la dipendenza dal runtime a uno script di inizializzazione personalizzato che viene eseguito quando viene creato.
Dopo aver aggiunto la dipendenza di runtime allo script, segui le istruzioni per creare, ricreare e aggiornare un cluster.
Installazione manuale. Aggiungi manualmente il file JAR del plug-in del catalogo Iceberg e configura le proprietà Spark in modo da includere il runtime nel cluster.
Invia un job Spark
Per inviare un job Spark, utilizza uno dei seguenti metodi:
Interfaccia a riga di comando gcloud
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region==REGION \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --execute="SPARK_SQL_COMMAND"
Sostituisci quanto segue:
PROJECT_ID
: l'ID del Google Cloud progetto che contiene il cluster Dataproc.CLUSTER_NAME
: il nome del cluster Dataproc che utilizzi per eseguire il job Spark SQL.REGION
: la regione di Compute Engine in cui si trova il cluster.LOCATION
: la posizione delle risorse BigQuery.CATALOG_NAME
: il nome del catalogo Spark che utilizzi con il tuo job SQL.WAREHOUSE_DIRECTORY
: la cartella Cloud Storage che contiene il data warehouse. Questo valore inizia congs://
.SPARK_SQL_COMMAND
: la query Spark SQL che vuoi eseguire. Questa query include i comandi per creare le risorse. Ad esempio, per creare uno spazio dei nomi e una tabella.
Interactive Spark
Connettiti a Spark e installa il plug-in del catalogo
Per installare il plug-in del catalogo per BigLake Metastore, connettiti al tuo cluster Dataproc utilizzando SSH.
- Nella console Google Cloud , vai alla pagina Istanze VM.
Per connetterti a un'istanza VM Dataproc, fai clic su SSH nell'elenco delle istanze della macchina virtuale. L'output è simile al seguente:
Connected, host fingerprint: ssh-rsa ... Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ... ... example-cluster@cluster-1-m:~$
Nel terminale, esegui il seguente comando di inizializzazione del metastore BigLake:
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
Sostituisci quanto segue:
CATALOG_NAME
: il nome del catalogo Spark che utilizzi con il tuo job SQL.PROJECT_ID
: l'ID progetto Google Cloud del catalogo BigLake Metastore a cui si collega il catalogo Spark.LOCATION
: la posizione Google Cloud del metastore BigLake.WAREHOUSE_DIRECTORY
: la cartella Cloud Storage che contiene il data warehouse. Questo valore inizia congs://
.
Dopo aver eseguito la connessione a un cluster, il terminale Spark visualizza il prompt
spark-sql
.spark-sql (default)>
Gestire le risorse BigLake Metastore
Ora sei connesso al metastore BigLake. Puoi visualizzare le risorse esistenti o crearne di nuove in base ai metadati memorizzati nel metastore BigLake.
Ad esempio, prova a eseguire i seguenti comandi nella sessione Spark SQL interattiva per creare uno spazio dei nomi e una tabella Iceberg.
Utilizza il catalogo Iceberg personalizzato:
USE `CATALOG_NAME`;
Creare uno spazio dei nomi:
CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;
Utilizza lo spazio dei nomi creato:
USE NAMESPACE_NAME;
Crea una tabella Iceberg:
CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;
Inserisci una riga della tabella:
INSERT INTO TABLE_NAME VALUES (1, "first row");
Aggiungi una colonna della tabella:
ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
Visualizza i metadati della tabella:
DESCRIBE EXTENDED TABLE_NAME;
Elenca tabelle nello spazio dei nomi:
SHOW TABLES;
Notebook Zeppelin
Nella console Google Cloud , vai alla pagina Cluster Dataproc.
Fai clic sul nome del cluster che vuoi utilizzare.
Viene visualizzata la pagina Dettagli cluster.
Nel menu di navigazione, fai clic su Interfacce web.
In Component gateway, fai clic su Zeppelin. Si apre la pagina del notebook Zeppelin.
Nel menu di navigazione, fai clic su Blocco note e poi su +Crea nuova nota.
Nella finestra di dialogo, inserisci un nome per il blocco note. Lascia selezionata l'opzione Spark come interprete predefinito.
Fai clic su Crea. Viene creato un nuovo notebook.
Nel notebook, fai clic sul menu delle impostazioni e poi su Interprete.
Nel campo Cerca interpreti, cerca Spark.
Fai clic su Modifica.
Nel campo Spark.jars, inserisci l'URI del file jar di Spark.
https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar
Fai clic su Salva.
Fai clic su OK.
Copia il seguente codice PySpark nel tuo notebook Zeppelin.
%pyspark from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("BigLake Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("select version()").show() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;") spark.sql("DESCRIBE TABLE_NAME;").show()
Sostituisci quanto segue:
CATALOG_NAME
: il nome del catalogo Spark da utilizzare per il job SQL.PROJECT_ID
: l'ID del Google Cloud progetto che contiene il cluster Dataproc.WAREHOUSE_DIRECTORY
: la cartella Cloud Storage che contiene il data warehouse. Questo valore inizia congs://
.NAMESPACE_NAME
: il nome dello spazio dei nomi che fa riferimento alla tabella Spark.WAREHOUSE_DIRECTORY
: l'URI della cartella Cloud Storage in cui è archiviato il data warehouse.TABLE_NAME
: un nome di tabella per la tabella Spark.
Fai clic sull'icona di esecuzione o premi
Shift-Enter
per eseguire il codice. Al termine del job, il messaggio di stato indica "Spark Job Finished" e l'output mostra i contenuti della tabella:
Connetti BigLake Metastore a Flink
Le seguenti istruzioni mostrano come connettere Dataproc al metastore BigLake utilizzando il client Flink SQL.
Installa il plug-in del catalogo e connettiti a una sessione Flink
Per connettere BigLake Metastore a Flink:
- Crea un cluster Dataproc con il componente Flink facoltativo abilitato e assicurati di utilizzare Dataproc 2.2 o versioni successive.
Nella console Google Cloud , vai alla pagina Istanze VM.
Nell'elenco delle istanze di macchine virtuali, fai clic su SSH per connetterti a un'istanza VM Dataproc.
Configura il plug-in del catalogo personalizzato Iceberg per BigLake Metastore:
FLINK_VERSION=1.17 ICEBERG_VERSION=1.5.2 cd /usr/lib/flink sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
Avvia la sessione Flink su YARN:
HADOOP_CLASSPATH=`hadoop classpath` sudo bin/yarn-session.sh -nm flink-dataproc -d sudo bin/sql-client.sh embedded \ -s yarn-session
Crea un catalogo in Flink:
CREATE CATALOG CATALOG_NAME WITH ( 'type'='iceberg', 'warehouse'='WAREHOUSE_DIRECTORY', 'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', 'gcp_project'='PROJECT_ID', 'gcp_location'='LOCATION' );
Sostituisci quanto segue:
CATALOG_NAME
: l'identificatore del catalogo Flink, collegato a un catalogo BigLake Metastore.WAREHOUSE_DIRECTORY
: il percorso di base per la directory del warehouse (la cartella Cloud Storage in cui Flink crea i file). Questo valore inizia congs://
.PROJECT_ID
: l'ID progetto del catalogo BigLake Metastore a cui è collegato il catalogo Flink.LOCATION
: la posizione delle risorse BigQuery.
La sessione Flink è ora connessa a BigLake Metastore e puoi eseguire comandi Flink SQL.
Gestire le risorse BigLake Metastore
Ora che hai stabilito la connessione al metastore BigLake, puoi creare e visualizzare risorse in base ai metadati archiviati nel metastore BigLake.
Ad esempio, prova a eseguire i seguenti comandi nella sessione SQL di Flink interattiva per creare un database e una tabella Iceberg.
Utilizza il catalogo Iceberg personalizzato:
USE CATALOG CATALOG_NAME;
Sostituisci
CATALOG_NAME
con l'identificatore del catalogo Flink.Crea un database, che crea un set di dati in BigQuery:
CREATE DATABASE IF NOT EXISTS DATABASE_NAME;
Sostituisci
DATABASE_NAME
con il nome del nuovo database.Utilizza il database che hai creato:
USE DATABASE_NAME;
Crea una tabella Iceberg. Il seguente codice crea una tabella delle vendite di esempio:
CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) );
Sostituisci
ICEBERG_TABLE_NAME
con un nome per la nuova tabella.Visualizza i metadati della tabella:
DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
Elenca le tabelle nel database:
SHOW TABLES;
Importa i dati nella tabella
Dopo aver creato una tabella Iceberg nella sezione precedente, puoi utilizzare Flink DataGen come origine dati per importare dati in tempo reale nella tabella. I seguenti passaggi sono un esempio di questo flusso di lavoro:
Crea una tabella temporanea utilizzando DataGen:
CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.order_number.kind' = 'sequence', 'fields.order_number.start' = '1', 'fields.order_number.end' = '1000000', 'fields.price.min' = '0', 'fields.price.max' = '10000', 'fields.buyer.first_name.length' = '10', 'fields.buyer.last_name.length' = '10' ) LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);
Sostituisci quanto segue:
DATABASE_NAME
: il nome del database in cui archiviare la tabella temporanea.TEMP_TABLE_NAME
: un nome per la tabella temporanea.ICEBERG_TABLE_NAME
: il nome della tabella Iceberg che hai creato nella sezione precedente.
Imposta il parallelismo su 1:
SET 'parallelism.default' = '1';
Imposta l'intervallo del checkpoint:
SET 'execution.checkpointing.interval' = '10second';
Imposta il checkpoint:
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
Avvia il job di streaming in tempo reale:
INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;
L'output è simile al seguente:
[INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 0de23327237ad8a811d37748acd9c10b
Per controllare lo stato del job di streaming:
Nella console Google Cloud , vai alla pagina Cluster.
Seleziona il cluster.
Fai clic sulla scheda Interfacce web.
Fai clic sul link YARN ResourceManager.
Nell'interfaccia YARN ResourceManager, trova la sessione Flink e fai clic sul link ApplicationMaster in UI di monitoraggio.
Nella colonna Stato, verifica che lo stato del job sia In esecuzione.
Esegui query sui dati di streaming nel client Flink SQL:
SELECT * FROM ICEBERG_TABLE_NAME /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ ORDER BY order_time desc LIMIT 20;
Esegui query sui dati di streaming in BigQuery:
SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME` ORDER BY order_time desc LIMIT 20;
Termina il job di streaming nel client Flink SQL:
STOP JOB 'JOB_ID';
Sostituisci
JOB_ID
con l'ID job visualizzato nell'output quando hai creato il job di streaming.
Passaggi successivi
- Configura le funzionalità facoltative di BigLake Metastore.
- Visualizza ed esegui query sulle tabelle da Spark in BigQuery.