其他 BigLake Metastore 功能
如需自定义 BigLake Metastore 配置,您可以使用以下其他功能:
- Apache Spark Iceberg 过程
- 不支持的表的过滤选项
- BigQuery 连接替换
- BigLake Metastore Iceberg 表的访问权限控制政策
使用 Iceberg Spark 过程
如需使用 Iceberg Spark 存储过程,您必须在 Spark 配置中添加 Iceberg SQL 扩展程序。例如,您可以创建一个过程来回滚到以前的状态。
使用交互式 Spark-SQL 回滚到先前的状态
您可以使用 Iceberg Spark 过程创建、修改表并将其回滚到之前的状态。例如:
创建 Spark 表:
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
替换以下内容:
CATALOG_NAME
:引用 Spark 表的目录名称。PROJECT_ID
: Google Cloud 项目的 ID。WAREHOUSE_DIRECTORY
:存储数据仓库的 Cloud Storage 文件夹的 URI。
USE `CATALOG_NAME`; CREATE NAMESPACE NAMESPACE_NAME; USE NAMESPACE NAMESPACE_NAME; CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'; INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row"); DESCRIBE EXTENDED TABLE_NAME;
替换以下内容:
NAMESPACE_NAME
:引用 Spark 表的命名空间名称。TABLE_NAME
:引用 Spark 表的表名。
输出内容包含有关表配置的详细信息:
... Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd] ...
再次更改该表,然后将其回滚到之前创建的快照
1659239298328512231
:ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double); INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5); SELECT * FROM TABLE_NAME; CALL CATALOG_NAME.system.set_current_snapshot('NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID); SELECT * FROM TABLE_NAME;
替换以下内容:
SNAPSHOT_ID
:要回滚到的快照的 ID。
输出类似于以下内容:
1 first row Time taken: 0.997 seconds, Fetched 1 row(s)
从表列出函数中过滤不受支持的表
将 Spark SQL 与 BigLake Metastore 目录搭配使用时,SHOW TABLES
命令会显示指定命名空间中的所有表,即使这些表与 Spark 不兼容也是如此。
如需仅显示受支持的表,请启用 filter_unsupported_tables
选项:
spark-sql --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"
替换以下内容:
CATALOG_NAME
:要使用的 Spark 目录的名称。PROJECT_ID
:要使用的 Google Cloud 项目的 ID。LOCATION
:BigQuery 资源的位置。WAREHOUSE_DIRECTORY
:要用作数据仓库的 Cloud Storage 文件夹。
设置 BigQuery 连接替换
您可以使用 BigQuery 连接来访问存储在 BigQuery 之外的数据,例如存储在 Cloud Storage 中的数据。
如需设置可提供对 Cloud Storage 存储桶的访问权限的 BigQuery 连接替换,请完成以下步骤:
在 BigQuery 项目中,创建一个与 Cloud Storage 资源的新连接。此连接定义了 BigQuery 如何访问您的数据。
向访问数据的用户或服务账号授予连接的
roles/bigquery.connectionUser
角色。确保连接资源与 BigQuery 中的目标资源位于同一位置。如需了解详情,请参阅管理连接。
使用
bq_connection
属性在 Iceberg 表中指定连接:CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');
替换以下内容:
TABLE_NAME
:Spark 表的表名。WAREHOUSE_DIRECTORY
:存储数据的 Cloud Storage 存储桶的 URI。PROJECT_ID
:要使用的 Google Cloud 项目的 ID。LOCATION
:连接的位置。CONNECTION_ID
:连接的 ID。
设置访问权限控制政策
您可以通过配置访问权限控制政策,在 BigLake 元数据库 Iceberg 表上启用精细访问权限控制 (FGAC)。您只能对使用 BigQuery 连接替换的表设置访问权限控制政策。您可以通过以下方式设置这些政策:
配置 FGAC 政策后,您可以使用以下示例从 Spark 查询该表:
from pyspark.sql import SparkSession # Create a Spark session spark = SparkSession.builder \ .appName("BigLake Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("USE `CATALOG_NAME`;") # Configure spark for storing temp results spark.conf.set("viewsEnabled","true") spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE"); spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE") spark.sql("USE NAMESPACE DATASET_NAME;") sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show()
替换以下内容:
CATALOG_NAME
:您的目录的名称。PROJECT_ID
:包含您的 BigQuery 资源的项目的 ID。LOCATION
:BigQuery 资源的位置。WAREHOUSE_DIRECTORY
:包含数据仓库的 Cloud Storage 文件夹的 URI。MATERIALIZATION_NAMESPACE
:要在其中存储临时结果的命名空间。DATASET_NAME
:包含您要查询的表的数据集的名称。ICEBERG_TABLE_NAME
:您要查询的表的名称。
后续步骤
- 将 Dataproc Metastore 数据迁移到 BigLake metastore
- 将 BigLake metastore 与 Dataproc 搭配使用
- 将 BigLake metastore 与 Dataproc Serverless 搭配使用