Funciones adicionales del metastore de BigQuery

Para personalizar la configuración de tu metastore de BigQuery, puedes usar las siguientes funciones adicionales:

  • Procedimientos de Apache Spark Iceberg
  • La opción de filtro para tablas no compatibles
  • Anulaciones de conexión de BigQuery
  • Políticas de control de acceso para las tablas Iceberg del metastore de BigQuery

Usa procedimientos de Spark de Iceberg

Para usar los procedimientos de Spark de Iceberg, debes incluir las extensiones de SQL de Iceberg en tu configuración de Spark. Por ejemplo, puedes crear un procedimiento para revertir a un estado anterior.

Usa Spark-SQL interactivo para revertir a un estado anterior

Puedes usar un procedimiento de Spark de Iceberg para crear, modificar y revertir una tabla a su estado anterior. Por ejemplo:

  1. Crea una tabla de Spark:

    spark-sql \
       --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
       --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    Reemplaza lo siguiente:

    • CATALOG_NAME: Es el nombre del catálogo que hace referencia a tu tabla de Spark.
    • PROJECT_ID: El ID del proyecto de Google Cloud.

    • WAREHOUSE_DIRECTORY: Es el URI de la carpeta de Cloud Storage en la que se almacena tu almacén de datos.

    USE `CATALOG_NAME`;
    CREATE NAMESPACE NAMESPACE_NAME;
    USE NAMESPACE NAMESPACE_NAME;
    CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
    INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row");
    DESCRIBE EXTENDED TABLE_NAME;

    Reemplaza lo siguiente:

    • NAMESPACE_NAME: Es el nombre del espacio de nombres que hace referencia a tu tabla de Spark.
    • TABLE_NAME: Es un nombre de tabla que hace referencia a tu tabla de Spark.

    El resultado contiene detalles sobre la configuración de la tabla:

    ...
    Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd]
    ...
  2. Vuelve a modificar la tabla y, luego, revierte a la instantánea 1659239298328512231 creada anteriormente:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
    INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5);
    SELECT * FROM TABLE_NAME;
    CALL CATALOG_NAME.system.set_current_snapshot('NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID);
    SELECT * FROM TABLE_NAME;

    Reemplaza lo siguiente:

    • SNAPSHOT_ID: Es el ID de la instantánea a la que te vas a revertir.

    El resultado es similar a este:

    1 first row
    Time taken: 0.997 seconds, Fetched 1 row(s)

Filtra las tablas no compatibles de las funciones de lista de tablas

Cuando usas Spark SQL con el catálogo de metastore de BigQuery, el comando SHOW TABLES muestra todas las tablas del espacio de nombres especificado, incluso aquellas que no son compatibles con Spark.

Para mostrar solo las tablas compatibles, activa la opción filter_unsupported_tables:

spark-sql
  --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
  --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
  --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
  --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"

Reemplaza lo siguiente:

  • CATALOG_NAME: Es el nombre del catálogo de Spark que se usará.
  • PROJECT_ID: El ID del proyecto de Google Cloud que se usará.
  • LOCATION: La ubicación de los recursos de BigQuery.
  • WAREHOUSE_DIRECTORY: Es la carpeta de Cloud Storage que se usará como almacén de datos.

Establece una anulación de conexión de BigQuery

Puedes usar las conexiones de BigQuery para acceder a datos almacenados fuera de BigQuery, como en Cloud Storage.

Para establecer una anulación de conexión de BigQuery que proporcione acceso a un bucket de Cloud Storage, completa los siguientes pasos:

  1. En tu proyecto de BigQuery, crea una conexión nueva a tu recurso de Cloud Storage. Esta conexión define cómo accede BigQuery a tus datos.

  2. Otorga al usuario o a la cuenta de servicio que accede a los datos el rol roles/bigquery.connectionUser en la conexión.

    Asegúrate de que el recurso de conexión comparta la misma ubicación que los recursos de destino en BigQuery. Para obtener más información, consulta Administra conexiones.

  3. Especifica la conexión en tu tabla de Iceberg con la propiedad bq_connection:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');

    Reemplaza lo siguiente:

    • TABLE_NAME: Es un nombre de tabla para tu tabla de Spark.
    • WAREHOUSE_DIRECTORY: Es el URI del bucket de Cloud Storage que almacena tus datos.
    • PROJECT_ID: El ID del proyecto de Google Cloud que se usará.
    • LOCATION: Es la ubicación de la conexión.
    • CONNECTION_ID: el ID de la conexión.

Configura políticas de control de acceso

Puedes habilitar el control de acceso detallado (FGAC) en las tablas de Iceberg del metastore de BigQuery si configuras las políticas de control de acceso. Solo puedes configurar políticas de control de acceso en tablas que usen una anulación de conexión de BigQuery. Puedes establecer estas políticas de las siguientes maneras:

Después de configurar tus políticas de FGAC, puedes consultar la tabla desde Spark con el siguiente ejemplo:

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
.appName("BigQuery Metastore Iceberg") \
.config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
.config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
.getOrCreate()

spark.sql("USE `CATALOG_NAME`;")

# Configure spark for storing temp results
spark.conf.set("viewsEnabled","true")
spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")

spark.sql("USE NAMESPACE DATASET_NAME;")

sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
df = spark.read.format("bigquery").load(sql)
df.show()

Reemplaza lo siguiente:

  • CATALOG_NAME: Es el nombre de tu catálogo.
  • PROJECT_ID: El ID del proyecto que contiene tus recursos de BigQuery.
  • LOCATION: la ubicación de los recursos de BigQuery.
  • WAREHOUSE_DIRECTORY: Es el URI de la carpeta de Cloud Storage que contiene tu almacén de datos.
  • MATERIALIZATION_NAMESPACE: Es el espacio de nombres en el que deseas almacenar los resultados temporales.
  • DATASET_NAME: Es el nombre del conjunto de datos que contiene la tabla que consultas.
  • ICEBERG_TABLE_NAME: Es el nombre de la tabla a la que le estás realizando la consulta.

¿Qué sigue?