Outros recursos do metastore do BigQuery
Para personalizar a configuração da metastore do BigQuery, use os seguintes recursos:
- Procedimentos do Apache Spark Iceberg
- A opção de filtro para tabelas sem suporte
- Substituições de conexão do BigQuery
- Políticas de controle de acesso para tabelas Iceberg do metastore do BigQuery
Usar procedimentos do Iceberg Spark
Para usar os procedimentos do Iceberg Spark, é necessário incluir as extensões SQL do Iceberg na configuração do Spark. Por exemplo, é possível criar um procedimento para reverter para um estado anterior.
Usar o Spark-SQL interativo para reverter para um estado anterior
É possível usar um procedimento do Iceberg Spark para criar, modificar e reverter uma tabela para o estado anterior. Exemplo:
Crie uma tabela do Spark:
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
Substitua:
CATALOG_NAME
: o nome do catálogo que faz referência à sua tabela do Spark.PROJECT_ID
: o ID do projeto Google Cloud .WAREHOUSE_DIRECTORY
: o URI da pasta do Cloud Storage em que o data warehouse é armazenado.
USE `CATALOG_NAME`; CREATE NAMESPACE NAMESPACE_NAME; USE NAMESPACE NAMESPACE_NAME; CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'; INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row"); DESCRIBE EXTENDED TABLE_NAME;
Substitua:
NAMESPACE_NAME
: o nome do namespace que faz referência à sua tabela do Spark.TABLE_NAME
: um nome de tabela que faz referência à sua tabela do Spark.
A saída contém detalhes sobre a configuração da tabela:
... Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd] ...
Altere a tabela novamente e volte para o snapshot
1659239298328512231
criado anteriormente:ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double); INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5); SELECT * FROM TABLE_NAME; CALL CATALOG_NAME.system.set_current_snapshot('NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID); SELECT * FROM TABLE_NAME;
Substitua:
SNAPSHOT_ID
: o ID do snapshot para o qual você está retornando.
O resultado será assim:
1 first row Time taken: 0.997 seconds, Fetched 1 row(s)
Filtrar tabelas sem suporte das funções de listagem de tabelas
Quando você usa o Spark SQL com o catálogo do Metastore do BigQuery, o comando SHOW TABLES
mostra todas as tabelas no espaço de nomes especificado, mesmo aquelas que não são compatíveis com o Spark.
Para mostrar apenas as tabelas com suporte, ative a opção filter_unsupported_tables
:
spark-sql --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"
Substitua:
CATALOG_NAME
: o nome do catálogo do Spark a ser usado.PROJECT_ID
: o ID do projeto Google Cloud a ser usado.LOCATION
: o local dos recursos do BigQuery.WAREHOUSE_DIRECTORY
: a pasta do Cloud Storage a ser usada como data warehouse.
Definir uma substituição de conexão do BigQuery
É possível usar as conexões do BigQuery para acessar dados armazenados fora do BigQuery, como no Cloud Storage.
Para definir uma modificação de conexão do BigQuery que ofereça acesso a um bucket do Cloud Storage, siga estas etapas:
No seu projeto do BigQuery, crie uma nova conexão com o recurso do Cloud Storage. Essa conexão define como o BigQuery acessa seus dados.
Conceda à conta de usuário ou de serviço que acessa os dados o papel
roles/bigquery.connectionUser
na conexão.Verifique se o recurso de conexão compartilha o mesmo local que os recursos de destino no BigQuery. Para mais informações, consulte Gerenciar conexões.
Especifique a conexão na tabela Iceberg com a propriedade
bq_connection
:CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');
Substitua:
TABLE_NAME
: um nome de tabela para a tabela do Spark.WAREHOUSE_DIRECTORY
: o URI do bucket do Cloud Storage que armazena seus dados.PROJECT_ID
: o ID do projeto Google Cloud a ser usado.LOCATION
: o local da conexão.CONNECTION_ID
: o ID da conexão.
Definir políticas de controle de acesso
É possível ativar o controle de acesso refinado (FGAC, na sigla em inglês) nas tabelas do Iceberg da metastore do BigQuery configurando políticas de controle de acesso. Só é possível definir políticas de controle de acesso em tabelas que usam uma substituição de conexão do BigQuery. É possível definir essas políticas das seguintes maneiras:
Depois de configurar as políticas de FGAC, você pode consultar a tabela do Spark usando este exemplo:
from pyspark.sql import SparkSession # Create a Spark session spark = SparkSession.builder \ .appName("Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("USE `CATALOG_NAME`;") # Configure spark for storing temp results spark.conf.set("viewsEnabled","true") spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE"); spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE") spark.sql("USE NAMESPACE DATASET_NAME;") sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show()
Substitua:
CATALOG_NAME
: o nome do seu catálogo.PROJECT_ID
: o ID do projeto que contém seus recursos do BigQuery.LOCATION
: o local dos recursos do BigQuery.WAREHOUSE_DIRECTORY
: o URI da pasta do Cloud Storage que contém seu data warehouse.MATERIALIZATION_NAMESPACE
: o namespace em que você quer armazenar resultados temporários.DATASET_NAME
: o nome do conjunto de dados que contém a tabela que você está consultando.ICEBERG_TABLE_NAME
: o nome da tabela que você está consultando.
A seguir
- Migrar dados do metastore do Dataproc para o metastore do BigQuery
- Usar a metastore do BigQuery com o Dataproc
- Usar o metastore do BigQuery com o Dataproc Serverless