Altre funzionalità di BigLake Metastore

Per personalizzare la configurazione del metastore BigLake, puoi utilizzare le seguenti funzionalità aggiuntive:

  • Procedure Apache Spark Iceberg
  • L'opzione di filtro per le tabelle non supportate
  • Sostituzioni della connessione BigQuery
  • Criteri di controllo dell'accesso per le tabelle Iceberg del metastore BigLake

Utilizzare le procedure Spark di Iceberg

Per utilizzare le procedure Spark di Iceberg, devi includere le estensioni SQL di Iceberg nella configurazione di Spark. Ad esempio, puoi creare una procedura per eseguire il rollback a uno stato precedente.

Utilizza Spark-SQL interattivo per eseguire il rollback a uno stato precedente

Puoi utilizzare una procedura Iceberg Spark per creare, modificare e rollback una tabella allo stato precedente. Ad esempio:

  1. Crea una tabella Spark:

    spark-sql \
       --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
       --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    Sostituisci quanto segue:

    • CATALOG_NAME: il nome del catalogo che fa riferimento alla tabella Spark.
    • PROJECT_ID: l'ID del Google Cloud progetto.

    • WAREHOUSE_DIRECTORY: l'URI della cartella Cloud Storage in cui è archiviato il data warehouse.

    USE `CATALOG_NAME`;
    CREATE NAMESPACE NAMESPACE_NAME;
    USE NAMESPACE NAMESPACE_NAME;
    CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
    INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row");
    DESCRIBE EXTENDED TABLE_NAME;

    Sostituisci quanto segue:

    • NAMESPACE_NAME: il nome dello spazio dei nomi che fa riferimento alla tabella Spark.
    • TABLE_NAME: un nome di tabella che fa riferimento alla tabella Spark.

    L'output contiene i dettagli sulla configurazione della tabella:

    ...
    Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd]
    ...
  2. Modifica di nuovo la tabella, quindi esegui il rollback allo snapshot 1659239298328512231 creato in precedenza:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
    INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5);
    SELECT * FROM TABLE_NAME;
    CALL CATALOG_NAME.system.set_current_snapshot('NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID);
    SELECT * FROM TABLE_NAME;

    Sostituisci quanto segue:

    • SNAPSHOT_ID: l'ID dello snapshot a cui eseguire il rollback.

    L'output è simile al seguente:

    1 first row
    Time taken: 0.997 seconds, Fetched 1 row(s)

Filtrare le tabelle non supportate dalle funzioni di elenco delle tabelle

Quando utilizzi Spark SQL con il catalogo del metastore BigLake, il comando SHOW TABLES mostra tutte le tabelle nello spazio dei nomi specificato, anche quelle non compatibili con Spark.

Per visualizzare solo le tabelle supportate, attiva l'opzione filter_unsupported_tables:

spark-sql
  --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
  --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
  --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
  --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"

Sostituisci quanto segue:

  • CATALOG_NAME: il nome del catalogo Spark da utilizzare.
  • PROJECT_ID: l'ID del Google Cloud progetto da utilizzare.
  • LOCATION: la posizione delle risorse BigQuery.
  • WAREHOUSE_DIRECTORY: la cartella Cloud Storage da utilizzare come data warehouse.

Impostare un'override della connessione BigQuery

Puoi utilizzare le connessioni BigQuery per accedere ai dati archiviati al di fuori di BigQuery, ad esempio in Cloud Storage.

Per impostare un'override della connessione BigQuery che fornisce l'accesso a un bucket Cloud Storage, completa i seguenti passaggi:

  1. Nel progetto BigQuery, crea una nuova connessione alla risorsa Cloud Storage. Questa connessione definisce in che modo BigQuery accede ai tuoi dati.

  2. Concedi all'account utente o di servizio che accede ai dati il ruolo roles/bigquery.connectionUser per la connessione.

    Assicurati che la risorsa di connessione condivida la stessa posizione delle risorse di destinazione in BigQuery. Per saperne di più, consulta Gestire le connessioni.

  3. Specifica la connessione nella tabella Iceberg con la proprietà bq_connection:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');

    Sostituisci quanto segue:

    • TABLE_NAME: un nome per la tabella Spark.
    • WAREHOUSE_DIRECTORY: l'URI del bucket Cloud Storage in cui sono archiviati i dati.
    • PROJECT_ID: l'ID del Google Cloud progetto da utilizzare.
    • LOCATION: la posizione della connessione.
    • CONNECTION_ID: l'ID della connessione.

Imposta i criteri di controllo dell'accesso

Puoi attivare il controllo dell'accesso granulare (FGAC) sulle tabelle Iceberg del metastore BigLake configurando i criteri di controllo dell'accesso'accesso. Puoi impostare i criteri di controllo dell'accesso solo sulle tabelle che utilizzano un'sostituzione della connessione BigQuery. Puoi impostare questi criteri nei seguenti modi:

Dopo aver configurato i criteri FGAC, puoi eseguire query sulla tabella da Spark utilizzando il seguente esempio:

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
.appName("BigLake Metastore Iceberg") \
.config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
.config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
.getOrCreate()

spark.sql("USE `CATALOG_NAME`;")

# Configure spark for storing temp results
spark.conf.set("viewsEnabled","true")
spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")

spark.sql("USE NAMESPACE DATASET_NAME;")

sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
df = spark.read.format("bigquery").load(sql)
df.show()

Sostituisci quanto segue:

  • CATALOG_NAME: il nome del tuo catalogo.
  • PROJECT_ID: l'ID del progetto che contiene le risorse BigQuery.
  • LOCATION: la posizione delle risorse BigQuery.
  • WAREHOUSE_DIRECTORY: l'URI della cartella Cloud Storage contenente il data warehouse.
  • MATERIALIZATION_NAMESPACE: lo spazio dei nomi in cui vuoi memorizzare i risultati temporanei.
  • DATASET_NAME: il nome del set di dati che contiene la tabella su cui stai eseguendo una query.
  • ICEBERG_TABLE_NAME: il nome della tabella su cui stai eseguendo una query.

Passaggi successivi