Zusätzliche BigQuery-Metastore-Funktionen

Sie können die BigQuery-Metastore-Konfiguration mit den folgenden zusätzlichen Funktionen anpassen:

  • Apache Spark Iceberg-Prozeduren
  • Filteroption für nicht unterstützte Tabellen
  • BigQuery-Verbindungsüberschreibungen
  • Zugriffssteuerungsrichtlinien für BigQuery-Metastore-Iceberg-Tabellen

Iceberg-Spark-Prozeduren verwenden

Wenn Sie Iceberg-Spark-Prozeduren verwenden möchten, müssen Sie die Iceberg-SQL-Erweiterungen in Ihre Spark-Konfiguration aufnehmen. Sie können beispielsweise ein Verfahren zum Zurücksetzen auf einen vorherigen Zustand erstellen.

Interaktives Spark-SQL verwenden, um zu einem vorherigen Zustand zurückzukehren

Mit einer Iceberg-Spark-Prozedur können Sie eine Tabelle erstellen, ändern und auf ihren vorherigen Zustand zurücksetzen. Beispiel:

  1. So erstellen Sie eine Spark-Tabelle:

    spark-sql \
       --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
       --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    Ersetzen Sie Folgendes:

    • CATALOG_NAME: Der Katalogname, der auf Ihre Spark-Tabelle verweist.
    • PROJECT_ID: die ID des Google Cloud Projekts.

    • WAREHOUSE_DIRECTORY: Der URI des Cloud Storage-Ordners, in dem Ihr Data Warehouse gespeichert ist.

    USE `CATALOG_NAME`;
    CREATE NAMESPACE NAMESPACE_NAME;
    USE NAMESPACE NAMESPACE_NAME;
    CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
    INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row");
    DESCRIBE EXTENDED TABLE_NAME;

    Ersetzen Sie Folgendes:

    • NAMESPACE_NAME: Der Name des Namespace, der auf Ihre Spark-Tabelle verweist.
    • TABLE_NAME: ein Tabellenname, der auf Ihre Spark-Tabelle verweist.

    Die Ausgabe enthält Details zur Tabellenkonfiguration:

    ...
    Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd]
    ...
  2. Ändern Sie die Tabelle noch einmal und führen Sie dann ein Rollback auf den zuvor erstellten Snapshot 1659239298328512231 aus:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
    INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5);
    SELECT * FROM TABLE_NAME;
    CALL CATALOG_NAME.system.set_current_snapshot('NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID);
    SELECT * FROM TABLE_NAME;

    Ersetzen Sie Folgendes:

    • SNAPSHOT_ID: die ID des Snapshots, zu dem Sie zurückgehen möchten.

    Die Ausgabe sieht etwa so aus:

    1 first row
    Time taken: 0.997 seconds, Fetched 1 row(s)

Nicht unterstützte Tabellen aus Funktionen zum Auflisten von Tabellen herausfiltern

Wenn Sie Spark SQL mit dem BigQuery-Metastore-Katalog verwenden, werden mit dem Befehl SHOW TABLES alle Tabellen im angegebenen Namensbereich angezeigt, auch solche, die nicht mit Spark kompatibel sind.

Wenn Sie nur unterstützte Tabellen anzeigen lassen möchten, aktivieren Sie die Option filter_unsupported_tables:

spark-sql
  --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
  --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
  --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
  --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"

Ersetzen Sie Folgendes:

  • CATALOG_NAME: der Name des zu verwendenden Spark-Katalogs.
  • PROJECT_ID: die ID des zu verwendenden Google Cloud Projekts.
  • LOCATION: den Speicherort der BigQuery-Ressourcen.
  • WAREHOUSE_DIRECTORY: Der Cloud Storage-Ordner, der als Data Warehouse verwendet werden soll.

BigQuery-Verbindung überschreiben

Sie können BigQuery-Verbindungen verwenden, um auf Daten zuzugreifen, die außerhalb von BigQuery gespeichert sind, z. B. in Cloud Storage.

So legen Sie eine BigQuery-Verbindungsüberschreibung fest, die Zugriff auf einen Cloud Storage-Bucket gewährt:

  1. Erstellen Sie in Ihrem BigQuery-Projekt eine neue Verbindung zu Ihrer Cloud Storage-Ressource. Über diese Verbindung wird festgelegt, wie BigQuery auf Ihre Daten zugreift.

  2. Weisen Sie dem Nutzer oder Dienstkonto, das auf die Daten zugreift, die Rolle roles/bigquery.connectionUser für die Verbindung zu.

    Die Verbindungsressource muss sich am selben Speicherort wie die Zielressourcen in BigQuery befinden. Weitere Informationen finden Sie unter Verbindungen verwalten.

  3. Geben Sie die Verbindung in Ihrer Iceberg-Tabelle mit dem Attribut bq_connection an:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');

    Ersetzen Sie Folgendes:

    • TABLE_NAME: ein Tabellenname für Ihre Spark-Tabelle.
    • WAREHOUSE_DIRECTORY: der URI des Cloud Storage-Buckets, in dem Ihre Daten gespeichert sind.
    • PROJECT_ID: die ID des zu verwendenden Google Cloud Projekts.
    • LOCATION: der Standort der Verbindung.
    • CONNECTION_ID: die ID der Verbindung.

Zugriffssteuerungsrichtlinien festlegen

Sie können die detaillierte Zugriffssteuerung (FGAC) für BigQuery-Metastore-Iceberg-Tabellen aktivieren, indem Sie Zugriffssteuerungsrichtlinien konfigurieren. Sie können Zugriffssteuerungsrichtlinien nur für Tabellen festlegen, für die eine BigQuery-Verbindungsüberschreibung verwendet wird. Sie haben folgende Möglichkeiten, diese Richtlinien festzulegen:

Nachdem Sie Ihre FGAC-Richtlinien konfiguriert haben, können Sie die Tabelle mit dem folgenden Beispiel aus Spark abfragen:

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
.appName("Metastore Iceberg") \
.config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
.config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
.getOrCreate()

spark.sql("USE `CATALOG_NAME`;")

# Configure spark for storing temp results
spark.conf.set("viewsEnabled","true")
spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")

spark.sql("USE NAMESPACE DATASET_NAME;")

sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
df = spark.read.format("bigquery").load(sql)
df.show()

Ersetzen Sie Folgendes:

  • CATALOG_NAME: Der Name Ihres Katalogs.
  • PROJECT_ID: die ID des Projekts, das Ihre BigQuery-Ressourcen enthält.
  • LOCATION: den Speicherort der BigQuery-Ressourcen.
  • WAREHOUSE_DIRECTORY: der URI des Cloud Storage-Ordners, der Ihr Data Warehouse enthält.
  • MATERIALIZATION_NAMESPACE: Der Namespace, in dem Sie temporäre Ergebnisse speichern möchten.
  • DATASET_NAME: der Name des Datasets, das die abgefragte Tabelle enthält.
  • ICEBERG_TABLE_NAME: der Name der Tabelle, die Sie abfragen.

Nächste Schritte