Menggunakan metastore BigLake dengan Dataproc

Dokumen ini menjelaskan cara menggunakan metastore BigLake dengan Dataproc di Compute Engine. Koneksi ini memberi Anda metastore tunggal dan bersama yang berfungsi di seluruh mesin software open source, seperti Apache Spark atau Apache Flink.

Sebelum memulai

  1. Aktifkan penagihan untuk Google Cloud project Anda. Pelajari cara memeriksa apakah penagihan telah diaktifkan pada suatu project.
  2. Aktifkan BigQuery dan Dataproc API.

    Aktifkan API

  3. Opsional: Pahami cara kerja BigLake metastore dan alasan Anda harus menggunakannya.

Peran yang diperlukan

Untuk mendapatkan izin yang Anda perlukan untuk menggunakan Spark atau Flink dan Dataproc dengan metastore BigLake sebagai penyimpanan metadata, minta administrator Anda untuk memberi Anda peran IAM berikut:

Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.

Alur kerja umum

Untuk menggunakan Dataproc di Compute Engine dengan metastore BigLake, ikuti langkah-langkah umum berikut:

  1. Buat cluster Dataproc atau konfigurasi cluster yang ada.
  2. Hubungkan ke mesin software open source pilihan Anda, seperti Spark atau Flink.
  3. Gunakan file JAR untuk menginstal plugin katalog Apache Iceberg di cluster.
  4. Buat dan kelola resource metastore BigLake sesuai kebutuhan, bergantung pada mesin software open source yang Anda gunakan.
  5. Di BigQuery, akses dan gunakan resource metastore BigLake Anda.

Menghubungkan metastore BigLake ke Spark

Petunjuk berikut menunjukkan cara menghubungkan Dataproc ke metastore BigLake menggunakan Spark SQL interaktif.

Mendownload plugin katalog Iceberg

Untuk menghubungkan metastore BigLake dengan Dataproc dan Spark, Anda harus menggunakan file jar plugin katalog Iceberg metastore BigLake.

File ini disertakan secara default dalam versi image Dataproc 2.2. Jika cluster Dataproc Anda tidak memiliki akses langsung ke internet, Anda harus mendownload plugin dan menguploadnya ke bucket Cloud Storage yang dapat diakses oleh cluster Dataproc Anda.

Download plugin katalog Iceberg BigLake Metastore.

Mengonfigurasi cluster Dataproc

Sebelum terhubung ke metastore BigLake, Anda harus menyiapkan cluster Dataproc.

Untuk melakukannya, Anda dapat membuat cluster baru atau menggunakan cluster yang ada. Setelah itu, Anda menggunakan cluster ini untuk menjalankan Spark SQL interaktif dan mengelola resource metastore BigLake.

  • Subnet di region tempat cluster dibuat harus mengaktifkan Akses Google Pribadi (PGA). Secara default, VM cluster Dataproc, yang dibuat dengan versi image 2.2 (default) atau yang lebih baru, hanya memiliki alamat IP internal. Agar VM cluster dapat berkomunikasi dengan Google API, aktifkan Akses Google Pribadi di subnet jaringan default (atau nama jaringan yang ditentukan pengguna, jika berlaku) di region tempat cluster dibuat.

  • Jika ingin menjalankan contoh antarmuka web Zeppelin dalam panduan ini, Anda harus menggunakan atau membuat cluster Dataproc dengan komponen opsional Zeppelin diaktifkan.

Cluster baru

Untuk membuat cluster Dataproc baru, jalankan perintah gcloud dataproc clusters create berikut. Konfigurasi ini berisi setelan yang perlu Anda gunakan untuk metastore BigLake.

gcloud dataproc clusters create CLUSTER_NAME \
  --project=PROJECT_ID \
  --region=LOCATION \
  --optional-components=ZEPPELIN \
  --enable-component-gateway \
  --single-node

Ganti kode berikut:

  • CLUSTER_NAME: nama untuk cluster Dataproc Anda.
  • PROJECT_ID: ID Google Cloud project tempat Anda membuat cluster.
  • LOCATION: region Google Cloud tempat Anda membuat cluster.

Cluster yang ada

Untuk mengonfigurasi cluster yang ada, tambahkan runtime Iceberg Spark berikut ke cluster Anda.

org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1

Anda dapat menambahkan runtime menggunakan salah satu opsi berikut:

Mengirimkan tugas Spark

Untuk mengirimkan tugas Spark, gunakan salah satu metode berikut:

gcloud CLI

gcloud dataproc jobs submit spark-sql \
--project=PROJECT_ID \
--cluster=CLUSTER_NAME \
--region==REGION \
--jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
--properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
--execute="SPARK_SQL_COMMAND"

Ganti kode berikut:

  • PROJECT_ID: ID Google Cloud project yang berisi cluster Dataproc.
  • CLUSTER_NAME: nama cluster Dataproc yang Anda gunakan untuk menjalankan tugas Spark SQL.
  • REGION: region Compute Engine tempat cluster Anda berada.
  • LOCATION: lokasi resource BigQuery.
  • CATALOG_NAME: nama katalog Spark yang Anda gunakan dengan tugas SQL.
  • WAREHOUSE_DIRECTORY: folder Cloud Storage yang berisi data warehouse Anda. Nilai ini diawali dengan gs://.
  • SPARK_SQL_COMMAND: kueri Spark SQL yang ingin Anda jalankan. Kueri ini mencakup perintah untuk membuat resource Anda. Misalnya, untuk membuat namespace dan tabel.

Spark Interaktif

Hubungkan ke Spark dan instal plugin katalog

Untuk menginstal plugin katalog untuk metastore BigLake, hubungkan ke cluster Dataproc menggunakan SSH.

  1. Di Google Cloud konsol, buka halaman VM Instances.
  2. Untuk terhubung ke instance VM Dataproc, klik SSH di daftar instance virtual machine. Outputnya mirip dengan hal berikut ini:

    Connected, host fingerprint: ssh-rsa ...
    Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
    ...
    example-cluster@cluster-1-m:~$
    
  3. Di terminal, jalankan perintah inisialisasi metastore BigLake berikut:

    spark-sql \
    --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
    --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
    --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    Ganti kode berikut:

    • CATALOG_NAME: nama katalog Spark yang Anda gunakan dengan tugas SQL.
    • PROJECT_ID: ID Google Cloud project dari katalog metastore BigLake yang ditautkan dengan katalog Spark Anda.
    • LOCATION: Google Cloud lokasi BigLake Metastore.
    • WAREHOUSE_DIRECTORY: folder Cloud Storage yang berisi data warehouse Anda. Nilai ini diawali dengan gs://.

    Setelah Anda berhasil terhubung ke cluster, terminal Spark akan menampilkan perintah spark-sql.

    spark-sql (default)>
    

Mengelola resource metastore BigLake

Anda kini terhubung ke metastore BigLake. Anda dapat melihat resource yang ada atau membuat resource baru berdasarkan metadata yang disimpan di metastore BigLake.

Misalnya, coba jalankan perintah berikut di sesi Spark SQL interaktif untuk membuat namespace dan tabel Iceberg.

  • Menggunakan katalog Iceberg kustom:

    USE `CATALOG_NAME`;
  • Buat namespace

    CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;
  • Gunakan namespace yang dibuat:

    USE NAMESPACE_NAME;
  • Buat tabel Iceberg:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;
  • Menyisipkan baris tabel:

    INSERT INTO TABLE_NAME VALUES (1, "first row");
  • Menambahkan kolom tabel:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
  • Melihat metadata tabel:

    DESCRIBE EXTENDED TABLE_NAME;
  • Mencantumkan tabel dalam namespace:

    SHOW TABLES;

Notebook Zeppelin

  1. Di konsol Google Cloud , buka halaman Cluster Dataproc.

    Buka cluster Dataproc

  2. Klik nama cluster yang ingin Anda gunakan.

    Halaman Cluster Details akan terbuka.

  3. Di menu navigasi, klik Web interfaces.

  4. Di bagian Component gateway, klik Zeppelin. Halaman notebook Zeppelin akan terbuka.

  5. Di menu navigasi, klik Notebook, lalu klik +Create new note.

  6. Dalam dialog, masukkan nama notebook. Biarkan Spark dipilih sebagai interpreter default.

  7. Klik Buat. Notebook baru dibuat.

  8. Di notebook, klik menu setelan, lalu klik Interpreter.

  9. Di kolom Cari juru bahasa, telusuri Spark.

  10. Klik Edit.

  11. Di kolom Spark.jars, masukkan URI jar Spark.

    https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar
    
  12. Klik Simpan.

  13. Klik Oke.

  14. Salin kode PySpark berikut ke notebook Zeppelin Anda.

    %pyspark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.sql("select version()").show()
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME;").show()

    Ganti kode berikut:

    • CATALOG_NAME: nama katalog Spark yang akan digunakan untuk tugas SQL.
    • PROJECT_ID: ID Google Cloud project yang berisi cluster Dataproc.
    • WAREHOUSE_DIRECTORY: folder Cloud Storage yang berisi data warehouse Anda. Nilai ini diawali dengan gs://.
    • NAMESPACE_NAME: nama namespace yang mereferensikan tabel Spark Anda.
    • WAREHOUSE_DIRECTORY: URI folder Cloud Storage tempat gudang data Anda disimpan.
    • TABLE_NAME: nama tabel untuk tabel Spark Anda.
  15. Klik ikon jalankan atau tekan Shift-Enter untuk menjalankan kode. Setelah tugas selesai, pesan status akan menampilkan "Spark Job Finished", dan output akan menampilkan isi tabel:

Petunjuk berikut menunjukkan cara menghubungkan Dataproc ke metastore BigLake menggunakan klien Flink SQL.

Untuk menghubungkan metastore BigLake ke Flink, lakukan hal berikut:

  1. Buat cluster Dataproc dengan mengaktifkan komponen Flink opsional, dan pastikan Anda menggunakan Dataproc 2.2 atau yang lebih baru.
  2. Di konsol Google Cloud , buka halaman VM instances.

    Buka instance VM

  3. Dalam daftar instance virtual machine, klik SSH untuk terhubung ke instance VM Dataproc.

  4. Konfigurasi plugin katalog kustom Iceberg untuk BigLake Metastore:

    FLINK_VERSION=1.17
    ICEBERG_VERSION=1.5.2
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
  5. Mulai sesi Flink di YARN:

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
      -s yarn-session
  6. Buat katalog di Flink:

    CREATE CATALOG CATALOG_NAME WITH (
      'type'='iceberg',
      'warehouse'='WAREHOUSE_DIRECTORY',
      'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
      'gcp_project'='PROJECT_ID',
      'gcp_location'='LOCATION'
    );

    Ganti kode berikut:

    • CATALOG_NAME: ID katalog Flink, yang ditautkan ke katalog metastore BigLake.
    • WAREHOUSE_DIRECTORY: jalur dasar untuk direktori gudang (folder Cloud Storage tempat Flink membuat file). Nilai ini diawali dengan gs://.
    • PROJECT_ID: project ID katalog metastore BigLake yang ditautkan dengan katalog Flink.
    • LOCATION: lokasi resource BigQuery.

Sesi Flink Anda kini terhubung ke metastore BigLake, dan Anda dapat menjalankan perintah Flink SQL.

Setelah terhubung ke metastore BigLake, Anda dapat membuat dan melihat resource berdasarkan metadata yang disimpan di metastore BigLake.

Misalnya, coba jalankan perintah berikut di sesi Flink SQL interaktif Anda untuk membuat database dan tabel Iceberg.

  1. Menggunakan katalog Iceberg kustom:

    USE CATALOG CATALOG_NAME;

    Ganti CATALOG_NAME dengan ID katalog Flink Anda.

  2. Buat database, yang akan membuat set data di BigQuery:

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    Ganti DATABASE_NAME dengan nama database baru Anda.

  3. Gunakan database yang Anda buat:

    USE DATABASE_NAME;
  4. Buat tabel Iceberg. Contoh berikut membuat tabel penjualan contoh:

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
        order_number BIGINT,
        price        DECIMAL(32,2),
        buyer        ROW<first_name STRING, last_name STRING>,
        order_time   TIMESTAMP(3)
    );

    Ganti ICEBERG_TABLE_NAME dengan nama untuk tabel baru Anda.

  5. Melihat metadata tabel:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. Mencantumkan tabel dalam database:

    SHOW TABLES;

Menyerap data ke dalam tabel

Setelah membuat tabel Iceberg di bagian sebelumnya, Anda dapat menggunakan Flink DataGen sebagai sumber data untuk menyerap data real-time ke dalam tabel Anda. Langkah-langkah berikut adalah contoh alur kerja ini:

  1. Buat tabel sementara menggunakan DataGen:

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
        'connector' = 'datagen',
        'rows-per-second' = '10',
        'fields.order_number.kind' = 'sequence',
        'fields.order_number.start' = '1',
        'fields.order_number.end' = '1000000',
        'fields.price.min' = '0',
        'fields.price.max' = '10000',
        'fields.buyer.first_name.length' = '10',
        'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    Ganti kode berikut:

    • DATABASE_NAME: nama database untuk menyimpan tabel sementara Anda.
    • TEMP_TABLE_NAME: nama untuk tabel sementara Anda.
    • ICEBERG_TABLE_NAME: nama tabel Iceberg yang Anda buat di bagian sebelumnya.
  2. Tetapkan paralelisme ke 1:

    SET 'parallelism.default' = '1';
  3. Menetapkan interval titik pemeriksaan:

    SET 'execution.checkpointing.interval' = '10second';
  4. Tetapkan titik pemeriksaan:

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. Mulai tugas streaming real-time:

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    Outputnya mirip dengan hal berikut ini:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. Untuk memeriksa status tugas streaming, lakukan hal berikut:

    1. Di konsol Google Cloud , buka halaman Clusters.

      Buka Cluster

    2. Pilih cluster Anda.

    3. Klik tab Antarmuka web.

    4. Klik link YARN ResourceManager.

    5. Di antarmuka YARN ResourceManager, temukan sesi Flink Anda, lalu klik link ApplicationMaster di bagian Tracking UI.

    6. Di kolom Status, konfirmasi bahwa status tugas Anda adalah Berjalan.

  7. Buat kueri data streaming di klien Flink SQL:

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. Mengkueri data streaming di BigQuery:

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. Hentikan tugas streaming di klien Flink SQL:

    STOP JOB 'JOB_ID';

    Ganti JOB_ID dengan ID tugas yang ditampilkan di output saat Anda membuat tugas streaming.

Langkah berikutnya