Utiliser BigLake Metastore avec des procédures stockées Spark

Ce document explique comment utiliser les procédures stockées Apache Spark avec le métastore BigLake.

Avant de commencer

  1. Activez la facturation pour votre projet Google Cloud . Découvrez comment vérifier si la facturation est activée sur un projet.
  2. Activez les API BigQuery et Dataflow.

    Activer les API

  3. Facultatif : En savoir plus sur les points suivants :

Rôles requis

Pour utiliser les procédures stockées Spark, consultez les rôles requis pour les procédures stockées et accordez les rôles nécessaires.

Pour obtenir les autorisations nécessaires pour utiliser Spark et les procédures stockées avec le metastore BigLake comme metastore, demandez à votre administrateur de vous accorder les rôles IAM suivants :

Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Vous pouvez également obtenir les autorisations requises avec des rôles personnalisés ou d'autres rôles prédéfinis.

Créer et exécuter une procédure stockée

L'exemple suivant montre comment créer et exécuter une procédure stockée avec le metastore BigLake.

  1. Accédez à la page BigQuery.

    Accéder à BigQuery

  2. Dans l'éditeur de requête, ajoutez l'exemple de code suivant pour l'instruction CREATE PROCEDURE.

    CREATE OR REPLACE PROCEDURE
    `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`()
    WITH CONNECTION `PROJECT_ID.REGION.SPARK_CONNECTION_ID` OPTIONS (engine='SPARK',
    runtime_version='1.1',
    properties=[("spark.sql.catalog.CATALOG_NAME.warehouse",
    "WAREHOUSE_DIRECTORY"),
    ("spark.sql.catalog.CATALOG_NAME.gcp_location",
    "LOCATION"),
    ("spark.sql.catalog.CATALOG_NAME.gcp_project",
    "PROJECT_ID"),
    ("spark.sql.catalog.CATALOG_NAME",
    "org.apache.iceberg.spark.SparkCatalog"),
    ("spark.sql.catalog.CATALOG_NAME.catalog-impl",
    "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"),
    ("spark.jars.packages",
    "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1")],
    jar_uris=["gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar"])
    LANGUAGE python AS R"""
    from pyspark.sql import SparkSession
    spark = SparkSession \
    .builder \
    .appName("BigLake Metastore Iceberg") \
    .getOrCreate()
    spark.sql("USE CATALOG_NAME;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'")
    spark.sql("DESCRIBE TABLE_NAME;")
    spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"first row\");")
    spark.sql("SELECT * from TABLE_NAME;")
    spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);")
    spark.sql("DESCRIBE TABLE_NAME;")
    """;
    CALL `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`();

    Remplacez les éléments suivants :

    • PROJECT_ID : ID de votre projet Google Cloud .
    • BQ_DATASET_ID : ID de l'ensemble de données BigQuery contenant la procédure.
    • PROCEDURE_NAME : nom de la procédure que vous créez ou remplacez.
    • REGION : emplacement de votre connexion Spark.
    • LOCATION : emplacement de vos ressources BigQuery.
    • SPARK_CONNECTION_ID : ID de votre connexion Spark.
    • CATALOG_NAME : nom du catalogue que vous utilisez.
    • WAREHOUSE_DIRECTORY : URI du dossier Cloud Storage contenant votre entrepôt de données.
    • NAMESPACE_NAME : espace de noms que vous utilisez.

Étapes suivantes