Outros recursos do metastore do BigQuery

Para personalizar a configuração da metastore do BigQuery, use os seguintes recursos:

  • Procedimentos do Apache Spark Iceberg
  • A opção de filtro para tabelas sem suporte
  • Substituições de conexão do BigQuery
  • Políticas de controle de acesso para tabelas Iceberg do metastore do BigQuery

Usar procedimentos do Iceberg Spark

Para usar os procedimentos do Iceberg Spark, é necessário incluir as extensões SQL do Iceberg na configuração do Spark. Por exemplo, é possível criar um procedimento para reverter para um estado anterior.

Usar o Spark-SQL interativo para reverter para um estado anterior

É possível usar um procedimento do Iceberg Spark para criar, modificar e reverter uma tabela para o estado anterior. Exemplo:

  1. Crie uma tabela do Spark:

    spark-sql \
       --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
       --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    Substitua:

    • CATALOG_NAME: o nome do catálogo que faz referência à sua tabela do Spark.
    • PROJECT_ID: o ID do projeto Google Cloud .

    • WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage em que o data warehouse é armazenado.

    USE `CATALOG_NAME`;
    CREATE NAMESPACE NAMESPACE_NAME;
    USE NAMESPACE NAMESPACE_NAME;
    CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
    INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row");
    DESCRIBE EXTENDED TABLE_NAME;

    Substitua:

    • NAMESPACE_NAME: o nome do namespace que faz referência à sua tabela do Spark.
    • TABLE_NAME: um nome de tabela que faz referência à sua tabela do Spark.

    A saída contém detalhes sobre a configuração da tabela:

    ...
    Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd]
    ...
  2. Altere a tabela novamente e volte para o snapshot 1659239298328512231 criado anteriormente:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
    INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5);
    SELECT * FROM TABLE_NAME;
    CALL CATALOG_NAME.system.set_current_snapshot('NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID);
    SELECT * FROM TABLE_NAME;

    Substitua:

    • SNAPSHOT_ID: o ID do snapshot para o qual você está retornando.

    O resultado será assim:

    1 first row
    Time taken: 0.997 seconds, Fetched 1 row(s)

Filtrar tabelas sem suporte das funções de listagem de tabelas

Quando você usa o Spark SQL com o catálogo do Metastore do BigQuery, o comando SHOW TABLES mostra todas as tabelas no espaço de nomes especificado, mesmo aquelas que não são compatíveis com o Spark.

Para mostrar apenas as tabelas com suporte, ative a opção filter_unsupported_tables:

spark-sql
  --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
  --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
  --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
  --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"

Substitua:

  • CATALOG_NAME: o nome do catálogo do Spark a ser usado.
  • PROJECT_ID: o ID do projeto Google Cloud a ser usado.
  • LOCATION: o local dos recursos do BigQuery.
  • WAREHOUSE_DIRECTORY: a pasta do Cloud Storage a ser usada como data warehouse.

Definir uma substituição de conexão do BigQuery

É possível usar as conexões do BigQuery para acessar dados armazenados fora do BigQuery, como no Cloud Storage.

Para definir uma modificação de conexão do BigQuery que ofereça acesso a um bucket do Cloud Storage, siga estas etapas:

  1. No seu projeto do BigQuery, crie uma nova conexão com o recurso do Cloud Storage. Essa conexão define como o BigQuery acessa seus dados.

  2. Conceda à conta de usuário ou de serviço que acessa os dados o papel roles/bigquery.connectionUser na conexão.

    Verifique se o recurso de conexão compartilha o mesmo local que os recursos de destino no BigQuery. Para mais informações, consulte Gerenciar conexões.

  3. Especifique a conexão na tabela Iceberg com a propriedade bq_connection:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');

    Substitua:

    • TABLE_NAME: um nome de tabela para a tabela do Spark.
    • WAREHOUSE_DIRECTORY: o URI do bucket do Cloud Storage que armazena seus dados.
    • PROJECT_ID: o ID do projeto Google Cloud a ser usado.
    • LOCATION: o local da conexão.
    • CONNECTION_ID: o ID da conexão.

Definir políticas de controle de acesso

É possível ativar o controle de acesso refinado (FGAC, na sigla em inglês) nas tabelas do Iceberg da metastore do BigQuery configurando políticas de controle de acesso. Só é possível definir políticas de controle de acesso em tabelas que usam uma substituição de conexão do BigQuery. É possível definir essas políticas das seguintes maneiras:

Depois de configurar as políticas de FGAC, você pode consultar a tabela do Spark usando este exemplo:

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
.appName("Metastore Iceberg") \
.config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
.config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
.getOrCreate()

spark.sql("USE `CATALOG_NAME`;")

# Configure spark for storing temp results
spark.conf.set("viewsEnabled","true")
spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")

spark.sql("USE NAMESPACE DATASET_NAME;")

sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
df = spark.read.format("bigquery").load(sql)
df.show()

Substitua:

  • CATALOG_NAME: o nome do seu catálogo.
  • PROJECT_ID: o ID do projeto que contém seus recursos do BigQuery.
  • LOCATION: o local dos recursos do BigQuery.
  • WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage que contém seu data warehouse.
  • MATERIALIZATION_NAMESPACE: o namespace em que você quer armazenar resultados temporários.
  • DATASET_NAME: o nome do conjunto de dados que contém a tabela que você está consultando.
  • ICEBERG_TABLE_NAME: o nome da tabela que você está consultando.

A seguir