Menggunakan metastore BigQuery dengan tabel di BigQuery

Dokumen ini menjelaskan cara menggunakan metastore BigQuery dengan tabel BigQuery dan Spark.

Dengan metastore BigQuery, Anda dapat membuat dan menggunakan tabel standar (bawaan), tabel BigQuery untuk Apache Iceberg, dan tabel eksternal BigLake untuk Apache Iceberg dari BigQuery.

Sebelum memulai

  1. Aktifkan penagihan untuk project Google Cloud Anda. Pelajari cara memeriksa apakah penagihan telah diaktifkan pada suatu project.
  2. Aktifkan BigQuery dan Dataproc API.

    Aktifkan API

  3. Opsional: Pahami cara kerja metastore BigQuery dan alasan Anda harus menggunakannya.

Peran yang diperlukan

Untuk mendapatkan izin yang Anda perlukan untuk menggunakan Spark dan Dataproc dengan metastore BigQuery sebagai penyimpanan metadata, minta administrator untuk memberi Anda peran IAM berikut:

Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.

Menghubungkan ke tabel

  1. Buat set data di Google Cloud konsol.

    CREATE SCHEMA `PROJECT_ID`.DATASET_NAME;

    Ganti kode berikut:

    • PROJECT_ID: ID Google Cloud project untuk membuat set data.
    • DATASET_NAME: nama untuk set data Anda.
  2. Buat Koneksi Resource Cloud.

  3. Buat tabel BigQuery standar.

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.TABLE_NAME (name STRING,id INT64);

    Ganti kode berikut:

    • TABLE_NAME: nama untuk tabel Anda.
  4. Menyisipkan data ke dalam tabel BigQuery standar.

    INSERT INTO `PROJECT_ID`.DATASET_NAME.TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  5. Buat tabel BigQuery untuk Apache Iceberg.

    Misalnya, untuk membuat tabel, jalankan pernyataan CREATE berikut.

    CREATE TABLE `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME(
    name STRING,id INT64
    )
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
    file_format = 'PARQUET',
    table_format = 'ICEBERG',
    storage_uri = 'STORAGE_URI');

    Ganti kode berikut:

    • ICEBERG_TABLE_NAME: nama untuk tabel BigQuery Anda untuk Apache Iceberg. Contoh, iceberg_managed_table.
    • CONNECTION_NAME: nama koneksi Anda. Anda membuatnya di langkah sebelumnya. Contoh, myproject.us.myconnection.
    • STORAGE_URI: Cloud Storage URI yang sepenuhnya memenuhi syarat. Contoh, gs://mybucket/table.
  6. Menyisipkan data ke tabel BigQuery untuk Apache Iceberg.

    INSERT INTO `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
  7. Buat tabel eksternal BigLake untuk Apache Iceberg.

    Misalnya, untuk membuat tabel BigLake Iceberg, jalankan pernyataan CREATE berikut.

    CREATE OR REPLACE EXTERNAL TABLE  `PROJECT_ID`.DATASET_NAME.READONLY_ICEBERG_TABLE_NAME
    WITH CONNECTION `CONNECTION_NAME`
    OPTIONS (
      format = 'ICEBERG',
      uris =
        ['BUCKET_PATH'],
      require_partition_filter = FALSE);

    Ganti kode berikut:

    • READONLY_ICEBERG_TABLE_NAME: nama untuk tabel hanya baca.
    • BUCKET_PATH: jalur ke bucket Cloud Storage yang berisi data untuk tabel eksternal, dalam format ['gs://bucket_name/[folder_name/]file_name'].
  8. Dari PySpark, buat kueri tabel standar, tabel BigQuery untuk Apache Iceberg, dan tabel eksternal BigLake untuk Apache Iceberg.

    from pyspark.sql import SparkSession
    
    # Create a spark session
    spark = SparkSession.builder \
    .appName("Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.conf.set("viewsEnabled","true")
    
    # Use the bqms_catalog
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("USE NAMESPACE DATASET_NAME;")
    
    # Configure spark for temp results
    spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
    spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")
    
    # List the tables in the dataset
    df = spark.sql("SHOW TABLES;")
    df.show();
    
    # Query the tables
    sql = """SELECT * FROM DATASET_NAME.TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()
    
    sql = """SELECT * FROM DATASET_NAME.READONLY_ICEBERG_TABLE_NAME"""
    df = spark.read.format("bigquery").load(sql)
    df.show()

    Ganti kode berikut:

    • WAREHOUSE_DIRECTORY: URI folder Cloud Storage yang terhubung ke tabel Iceberg dan tabel BigLake Iceberg Anda.
    • CATALOG_NAME: nama katalog yang Anda gunakan.
    • MATERIALIZATION_NAMESPACE: namespace untuk menyimpan hasil sementara.
  9. Jalankan skrip PySpark menggunakan Serverless Spark.

    gcloud dataproc batches submit pyspark SCRIPT_PATH \
      --version=2.2 \
      --project=PROJECT_ID \
      --region=REGION \
      --deps-bucket=YOUR_BUCKET \

    Ganti kode berikut:

    • SCRIPT_PATH: jalur ke skrip yang digunakan tugas batch.
    • PROJECT_ID: ID Google Cloud project tempat menjalankan tugas batch.
    • REGION: region tempat workload Anda berjalan.
    • YOUR_BUCKET: lokasi bucket Cloud Storage untuk mengupload dependensi beban kerja. Awalan URI gs:// bucket tidak diperlukan. Anda dapat menentukan jalur bucket atau nama bucket, misalnya, mybucketname1.

Langkah berikutnya