其他 BigLake Metastore 功能

如需自定义 BigLake Metastore 配置,您可以使用以下其他功能:

  • Apache Spark Iceberg 过程
  • 不支持的表的过滤选项
  • BigQuery 连接替换
  • BigLake Metastore Iceberg 表的访问权限控制政策

使用 Iceberg Spark 过程

如需使用 Iceberg Spark 存储过程,您必须在 Spark 配置中添加 Iceberg SQL 扩展程序。例如,您可以创建一个过程来回滚到以前的状态。

使用交互式 Spark-SQL 回滚到先前的状态

您可以使用 Iceberg Spark 过程创建、修改表并将其回滚到之前的状态。例如:

  1. 创建 Spark 表:

    spark-sql \
       --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
       --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    替换以下内容:

    • CATALOG_NAME:引用 Spark 表的目录名称。
    • PROJECT_ID: Google Cloud 项目的 ID。

    • WAREHOUSE_DIRECTORY:存储数据仓库的 Cloud Storage 文件夹的 URI。

    USE `CATALOG_NAME`;
    CREATE NAMESPACE NAMESPACE_NAME;
    USE NAMESPACE NAMESPACE_NAME;
    CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
    INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row");
    DESCRIBE EXTENDED TABLE_NAME;

    替换以下内容:

    • NAMESPACE_NAME:引用 Spark 表的命名空间名称。
    • TABLE_NAME:引用 Spark 表的表名。

    输出内容包含有关表配置的详细信息:

    ...
    Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd]
    ...
  2. 再次更改该表,然后将其回滚到之前创建的快照 1659239298328512231

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
    INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5);
    SELECT * FROM TABLE_NAME;
    CALL CATALOG_NAME.system.set_current_snapshot('NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID);
    SELECT * FROM TABLE_NAME;

    替换以下内容:

    • SNAPSHOT_ID:要回滚到的快照的 ID。

    输出类似于以下内容:

    1 first row
    Time taken: 0.997 seconds, Fetched 1 row(s)

从表列出函数中过滤不受支持的表

将 Spark SQL 与 BigLake Metastore 目录搭配使用时,SHOW TABLES 命令会显示指定命名空间中的所有表,即使这些表与 Spark 不兼容也是如此。

如需仅显示受支持的表,请启用 filter_unsupported_tables 选项:

spark-sql
  --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
  --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
  --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
  --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"

替换以下内容:

  • CATALOG_NAME:要使用的 Spark 目录的名称。
  • PROJECT_ID:要使用的 Google Cloud 项目的 ID。
  • LOCATION:BigQuery 资源的位置。
  • WAREHOUSE_DIRECTORY:要用作数据仓库的 Cloud Storage 文件夹。

设置 BigQuery 连接替换

您可以使用 BigQuery 连接来访问存储在 BigQuery 之外的数据,例如存储在 Cloud Storage 中的数据。

如需设置可提供对 Cloud Storage 存储桶的访问权限的 BigQuery 连接替换,请完成以下步骤:

  1. 在 BigQuery 项目中,创建一个与 Cloud Storage 资源的新连接。此连接定义了 BigQuery 如何访问您的数据。

  2. 向访问数据的用户或服务账号授予连接的 roles/bigquery.connectionUser 角色。

    确保连接资源与 BigQuery 中的目标资源位于同一位置。如需了解详情,请参阅管理连接

  3. 使用 bq_connection 属性在 Iceberg 表中指定连接:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');

    替换以下内容:

    • TABLE_NAME:Spark 表的表名。
    • WAREHOUSE_DIRECTORY:存储数据的 Cloud Storage 存储桶的 URI。
    • PROJECT_ID:要使用的 Google Cloud 项目的 ID。
    • LOCATION:连接的位置
    • CONNECTION_ID:连接的 ID。

设置访问权限控制政策

您可以通过配置访问权限控制政策,在 BigLake 元数据库 Iceberg 表上启用精细访问权限控制 (FGAC)。您只能对使用 BigQuery 连接替换的表设置访问权限控制政策。您可以通过以下方式设置这些政策:

配置 FGAC 政策后,您可以使用以下示例从 Spark 查询该表:

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
.appName("BigLake Metastore Iceberg") \
.config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
.config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
.getOrCreate()

spark.sql("USE `CATALOG_NAME`;")

# Configure spark for storing temp results
spark.conf.set("viewsEnabled","true")
spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")

spark.sql("USE NAMESPACE DATASET_NAME;")

sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
df = spark.read.format("bigquery").load(sql)
df.show()

替换以下内容:

  • CATALOG_NAME:您的目录的名称。
  • PROJECT_ID:包含您的 BigQuery 资源的项目的 ID。
  • LOCATION:BigQuery 资源的位置
  • WAREHOUSE_DIRECTORY:包含数据仓库的 Cloud Storage 文件夹的 URI。
  • MATERIALIZATION_NAMESPACE:要在其中存储临时结果的命名空间。
  • DATASET_NAME:包含您要查询的表的数据集的名称。
  • ICEBERG_TABLE_NAME:您要查询的表的名称。

后续步骤