Altre funzionalità di BigLake Metastore
Per personalizzare la configurazione del metastore BigLake, puoi utilizzare le seguenti funzionalità aggiuntive:
- Procedure Apache Spark Iceberg
- L'opzione di filtro per le tabelle non supportate
- Sostituzioni della connessione BigQuery
- Criteri di controllo dell'accesso per le tabelle Iceberg del metastore BigLake
Utilizzare le procedure Spark di Iceberg
Per utilizzare le procedure Spark di Iceberg, devi includere le estensioni SQL di Iceberg nella configurazione di Spark. Ad esempio, puoi creare una procedura per eseguire il rollback a uno stato precedente.
Utilizza Spark-SQL interattivo per eseguire il rollback a uno stato precedente
Puoi utilizzare una procedura Iceberg Spark per creare, modificare e rollback una tabella allo stato precedente. Ad esempio:
Crea una tabella Spark:
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
Sostituisci quanto segue:
CATALOG_NAME
: il nome del catalogo che fa riferimento alla tabella Spark.PROJECT_ID
: l'ID del Google Cloud progetto.WAREHOUSE_DIRECTORY
: l'URI della cartella Cloud Storage in cui è archiviato il data warehouse.
USE `CATALOG_NAME`; CREATE NAMESPACE NAMESPACE_NAME; USE NAMESPACE NAMESPACE_NAME; CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'; INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row"); DESCRIBE EXTENDED TABLE_NAME;
Sostituisci quanto segue:
NAMESPACE_NAME
: il nome dello spazio dei nomi che fa riferimento alla tabella Spark.TABLE_NAME
: un nome di tabella che fa riferimento alla tabella Spark.
L'output contiene i dettagli sulla configurazione della tabella:
... Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd] ...
Modifica di nuovo la tabella, quindi esegui il rollback allo snapshot
1659239298328512231
creato in precedenza:ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double); INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5); SELECT * FROM TABLE_NAME; CALL CATALOG_NAME.system.set_current_snapshot('NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID); SELECT * FROM TABLE_NAME;
Sostituisci quanto segue:
SNAPSHOT_ID
: l'ID dello snapshot a cui eseguire il rollback.
L'output è simile al seguente:
1 first row Time taken: 0.997 seconds, Fetched 1 row(s)
Filtrare le tabelle non supportate dalle funzioni di elenco delle tabelle
Quando utilizzi Spark SQL con il catalogo del metastore BigLake, il comando SHOW TABLES
mostra tutte le tabelle nello spazio dei nomi specificato, anche quelle non compatibili con Spark.
Per visualizzare solo le tabelle supportate, attiva l'opzione filter_unsupported_tables
:
spark-sql --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"
Sostituisci quanto segue:
CATALOG_NAME
: il nome del catalogo Spark da utilizzare.PROJECT_ID
: l'ID del Google Cloud progetto da utilizzare.LOCATION
: la posizione delle risorse BigQuery.WAREHOUSE_DIRECTORY
: la cartella Cloud Storage da utilizzare come data warehouse.
Impostare un'override della connessione BigQuery
Puoi utilizzare le connessioni BigQuery per accedere ai dati archiviati al di fuori di BigQuery, ad esempio in Cloud Storage.
Per impostare un'override della connessione BigQuery che fornisce l'accesso a un bucket Cloud Storage, completa i seguenti passaggi:
Nel progetto BigQuery, crea una nuova connessione alla risorsa Cloud Storage. Questa connessione definisce in che modo BigQuery accede ai tuoi dati.
Concedi all'account utente o di servizio che accede ai dati il ruolo
roles/bigquery.connectionUser
per la connessione.Assicurati che la risorsa di connessione condivida la stessa posizione delle risorse di destinazione in BigQuery. Per saperne di più, consulta Gestire le connessioni.
Specifica la connessione nella tabella Iceberg con la proprietà
bq_connection
:CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');
Sostituisci quanto segue:
TABLE_NAME
: un nome per la tabella Spark.WAREHOUSE_DIRECTORY
: l'URI del bucket Cloud Storage in cui sono archiviati i dati.PROJECT_ID
: l'ID del Google Cloud progetto da utilizzare.LOCATION
: la posizione della connessione.CONNECTION_ID
: l'ID della connessione.
Imposta i criteri di controllo dell'accesso
Puoi attivare il controllo dell'accesso granulare (FGAC) sulle tabelle Iceberg del metastore BigLake configurando i criteri di controllo dell'accesso'accesso. Puoi impostare i criteri di controllo dell'accesso solo sulle tabelle che utilizzano un'sostituzione della connessione BigQuery. Puoi impostare questi criteri nei seguenti modi:
Dopo aver configurato i criteri FGAC, puoi eseguire query sulla tabella da Spark utilizzando il seguente esempio:
from pyspark.sql import SparkSession # Create a Spark session spark = SparkSession.builder \ .appName("BigLake Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("USE `CATALOG_NAME`;") # Configure spark for storing temp results spark.conf.set("viewsEnabled","true") spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE"); spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE") spark.sql("USE NAMESPACE DATASET_NAME;") sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show()
Sostituisci quanto segue:
CATALOG_NAME
: il nome del tuo catalogo.PROJECT_ID
: l'ID del progetto che contiene le risorse BigQuery.LOCATION
: la posizione delle risorse BigQuery.WAREHOUSE_DIRECTORY
: l'URI della cartella Cloud Storage contenente il data warehouse.MATERIALIZATION_NAMESPACE
: lo spazio dei nomi in cui vuoi memorizzare i risultati temporanei.DATASET_NAME
: il nome del set di dati che contiene la tabella su cui stai eseguendo una query.ICEBERG_TABLE_NAME
: il nome della tabella su cui stai eseguendo una query.
Passaggi successivi
- Eseguire la migrazione dei dati di Dataproc Metastore al metastore BigLake
- Utilizzare il metastore BigLake con Dataproc
- Utilizzare il metastore BigLake con Dataproc Serverless