Menggunakan metastore BigLake dengan prosedur tersimpan Spark
Dokumen ini menjelaskan cara menggunakan prosedur tersimpan Apache Spark dengan metastore BigLake.
Sebelum memulai
- Aktifkan penagihan untuk Google Cloud project Anda. Pelajari cara memeriksa apakah penagihan telah diaktifkan pada suatu project.
Aktifkan BigQuery dan Dataflow API.
Opsional: Pelajari lebih lanjut hal-hal berikut:
- Pahami cara kerja BigLake metastore dan alasan Anda harus menggunakannya.
- Pelajari cara kerja prosedur tersimpan BigQuery Spark dan selesaikan tugas sebelum memulai.
Peran yang diperlukan
Untuk menggunakan prosedur tersimpan Spark, tinjau peran yang diperlukan untuk prosedur tersimpan dan berikan peran yang diperlukan.
Untuk mendapatkan izin yang Anda perlukan untuk menggunakan Spark dan prosedur tersimpan dengan metastore BigLake sebagai penyimpanan metadata, minta administrator Anda untuk memberi Anda peran IAM berikut:
-
Buat tabel metastore BigLake di Spark:
-
BigQuery Data Editor (
roles/bigquery.dataEditor
) di akun layanan Koneksi Spark dalam project -
Storage Object Admin (
roles/storage.objectAdmin
) di akun layanan Spark Connection dalam project
-
BigQuery Data Editor (
-
Menjalankan kueri pada tabel metastore BigLake di BigQuery:
-
BigQuery Data Viewer (
roles/bigquery.dataViewer
) di project -
Pengguna BigQuery (
roles/bigquery.user
) di project -
Storage Object Viewer (
roles/storage.objectViewer
) di project
-
BigQuery Data Viewer (
Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.
Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.
Membuat dan menjalankan prosedur tersimpan
Contoh berikut menunjukkan cara membuat dan menjalankan prosedur tersimpan dengan metastore BigLake.
Buka halaman BigQuery.
Di editor kueri, tambahkan kode contoh berikut untuk pernyataan
CREATE PROCEDURE
.CREATE OR REPLACE PROCEDURE `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`() WITH CONNECTION `PROJECT_ID.REGION.SPARK_CONNECTION_ID` OPTIONS (engine='SPARK', runtime_version='1.1', properties=[("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY"), ("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION"), ("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID"), ("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog"), ("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"), ("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1")], jar_uris=["gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar"]) LANGUAGE python AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("BigLake Metastore Iceberg") \ .getOrCreate() spark.sql("USE CATALOG_NAME;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'") spark.sql("DESCRIBE TABLE_NAME;") spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"first row\");") spark.sql("SELECT * from TABLE_NAME;") spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);") spark.sql("DESCRIBE TABLE_NAME;") """; CALL `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`();
Ganti kode berikut:
PROJECT_ID
: ID Google Cloud project Anda.BQ_DATASET_ID
: ID set data di BigQuery yang berisi prosedur.PROCEDURE_NAME
: nama prosedur yang Anda buat atau ganti.REGION
: lokasi koneksi Spark Anda.LOCATION
: lokasi resource BigQuery Anda.SPARK_CONNECTION_ID
: ID koneksi Spark Anda.CATALOG_NAME
: nama katalog yang Anda gunakan.WAREHOUSE_DIRECTORY
: URI folder Cloud Storage yang berisi data warehouse Anda.NAMESPACE_NAME
: namespace yang Anda gunakan.