BigLake Metastore のその他の機能

BigLake メタストアの構成をカスタマイズするには、次の追加機能を使用します。

  • Apache Spark Iceberg プロシージャ
  • サポートされていないテーブルのフィルタ オプション
  • BigQuery 接続のオーバーライド
  • BigLake Metastore Iceberg テーブルのアクセス制御ポリシー

Iceberg Spark プロシージャを使用する

Iceberg Spark プロシージャを使用するには、Spark 構成に Iceberg SQL 拡張機能を含める必要があります。たとえば、以前の状態にロールバックするプロシージャを作成できます。

インタラクティブな Spark-SQL を使用して以前の状態にロールバックする

Iceberg Spark プロシージャを使用して、テーブルの作成、変更、以前の状態へのロールバックを行うことができます。次に例を示します。

  1. Spark テーブルを作成します。

    spark-sql \
       --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
       --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    次のように置き換えます。

    • CATALOG_NAME: Spark テーブルを参照するカタログ名。
    • PROJECT_ID: Google Cloud プロジェクトの ID。

    • WAREHOUSE_DIRECTORY: データ ウェアハウスが保存されている Cloud Storage フォルダの URI。

    USE `CATALOG_NAME`;
    CREATE NAMESPACE NAMESPACE_NAME;
    USE NAMESPACE NAMESPACE_NAME;
    CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
    INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row");
    DESCRIBE EXTENDED TABLE_NAME;

    次のように置き換えます。

    • NAMESPACE_NAME: Spark テーブルを参照する名前空間名。
    • TABLE_NAME: Spark テーブルを参照するテーブル名。

    出力には、テーブル構成の詳細が含まれます。

    ...
    Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd]
    ...
  2. テーブルを再度変更し、前に作成したスナップショット 1659239298328512231 にロールバックします。

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
    INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5);
    SELECT * FROM TABLE_NAME;
    CALL CATALOG_NAME.system.set_current_snapshot('NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID);
    SELECT * FROM TABLE_NAME;

    次のように置き換えます。

    • SNAPSHOT_ID: ロールバックするスナップショットの ID。

    出力は次のようになります。

    1 first row
    Time taken: 0.997 seconds, Fetched 1 row(s)

テーブル一覧表示関数からサポートされていないテーブルを除外する

BigLake メタストア カタログで Spark SQL を使用する場合、SHOW TABLES コマンドは、指定された名前空間内のすべてのテーブル(Spark と互換性のないテーブルも含む)を表示します。

サポートされているテーブルのみを表示するには、filter_unsupported_tables オプションをオンにします。

spark-sql
  --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
  --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
  --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
  --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"

次のように置き換えます。

  • CATALOG_NAME: 使用する Spark カタログの名前。
  • PROJECT_ID: 使用する Google Cloud プロジェクトの ID。
  • LOCATION: BigQuery リソースのロケーション。
  • WAREHOUSE_DIRECTORY: データ ウェアハウスとして使用する Cloud Storage フォルダ。

BigQuery 接続のオーバーライドを設定する

BigQuery 接続を使用して、BigQuery の外部(Cloud Storage など)に保存されているデータにアクセスできます。

Cloud Storage バケットへのアクセス権を付与する BigQuery 接続のオーバーライドを設定するには、次の操作を行います。

  1. BigQuery プロジェクトで、Cloud Storage リソースへの新しい接続を作成します。この接続により、BigQuery がデータにアクセスする方法が定義されます。

  2. データにアクセスするユーザーまたはサービス アカウントに、接続に対する roles/bigquery.connectionUser ロールを付与します。

    接続リソースが BigQuery のターゲット リソースと同じロケーションを共有していることを確認します。詳細については、接続を管理するをご覧ください。

  3. bq_connection プロパティを使用して、Iceberg テーブルの接続を指定します。

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');

    次のように置き換えます。

    • TABLE_NAME: Spark テーブルのテーブル名。
    • WAREHOUSE_DIRECTORY: データを保存する Cloud Storage バケットの URI。
    • PROJECT_ID: 使用する Google Cloud プロジェクトの ID。
    • LOCATION: 接続のロケーション
    • CONNECTION_ID: 接続の ID。

アクセス制御ポリシーを設定する

アクセス制御ポリシーを構成することで、BigLake メタストア Iceberg テーブルできめ細かいアクセス制御(FGAC)を有効にできます。アクセス制御ポリシーは、BigQuery 接続のオーバーライドを使用するテーブルにのみ設定できます。これらのポリシーは、次の方法で設定できます。

FGAC ポリシーを構成したら、次の例を使用して Spark からテーブルをクエリできます。

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
.appName("BigLake Metastore Iceberg") \
.config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
.config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
.getOrCreate()

spark.sql("USE `CATALOG_NAME`;")

# Configure spark for storing temp results
spark.conf.set("viewsEnabled","true")
spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")

spark.sql("USE NAMESPACE DATASET_NAME;")

sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
df = spark.read.format("bigquery").load(sql)
df.show()

次のように置き換えます。

  • CATALOG_NAME: カタログの名前。
  • PROJECT_ID: BigQuery リソースを含むプロジェクトの ID。
  • LOCATION: BigQuery リソースのロケーション
  • WAREHOUSE_DIRECTORY: データ ウェアハウスが格納されている Cloud Storage フォルダの URI。
  • MATERIALIZATION_NAMESPACE: 一時的な結果を保存する名前空間。
  • DATASET_NAME: クエリ対象のテーブルを含むデータセットの名前。
  • ICEBERG_TABLE_NAME: クエリ対象のテーブルの名前。

次のステップ