Halaman ini menjelaskan cara menggunakan tabel Apache Iceberg dengan layanan Dataproc Metastore yang terpasang ke cluster Dataproc. Apache Iceberg adalah format tabel terbuka untuk set data analisis besar.
Kompatibilitas
Tabel Iceberg mendukung fitur berikut.
Driver | Pilih | Sisipkan | Buat Tabel |
---|---|---|---|
Spark | ✓ | ✓ | ✓ |
Hive | ✓ | ✓ | |
Presto | ✓ | ✓ | ✓ |
Sebelum memulai
- Buat layanan Dataproc Metastore.
- Lampirkan Dataproc Metastore ke cluster Dataproc.
Menggunakan tabel Iceberg dengan Spark
Contoh berikut menunjukkan cara menggunakan tabel Iceberg dengan Spark.
Tabel Iceberg mendukung operasi baca dan tulis. Untuk mengetahui informasi selengkapnya, lihat Apache Iceberg - Spark.
Konfigurasi Spark
Pertama, mulai shell Spark dan gunakan bucket Cloud Storage untuk menyimpan data. Untuk menyertakan Iceberg dalam penginstalan Spark, tambahkan file JAR Runtime Iceberg Spark ke folder JAR Spark. Untuk mendownload file JAR, lihat Download Apache Iceberg. Perintah berikut memulai shell Spark dengan dukungan untuk Apache Iceberg:
$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar
Menggunakan Hive Catalog untuk membuat tabel Iceberg
Siapkan konfigurasi Hive Catalog untuk membuat tabel Iceberg di spark scala:
import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.catalog._ import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._ import java.util.HashMap
Buat tabel untuk menyisipkan dan memperbarui data. Berikut adalah contohnya.
Buat tabel bernama
example
di databasedefault
:val catalog = new HiveCatalog(); catalog.setConf(spark.sparkContext.hadoopConfiguration); catalog.initialize("hive", new HashMap[String,String]()); val name = TableIdentifier.of("default","example");
Menyisipkan data sampel:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Tentukan strategi partisi berdasarkan kolom
id
:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Buat tabel:
val table=catalog.createTable(name,df1_schema,partition_spec);
Tambahkan Iceberg Storage Handler dan SerDe sebagai properti tabel:
table.updateProperties().set("engine.hive.enabled", "true").commit();
Tulis data ke tabel:
df1.write.format("iceberg").mode("overwrite").save("default.example");
Membaca data:
val read_df1=spark.read.format("iceberg").load("default.example"); read_df1.show;
Ubah skema tabel. Berikut adalah contohnya.
Dapatkan tabel dan tambahkan kolom baru
grade
:val table = catalog.loadTable(TableIdentifier.of("default", "example")); table.updateSchema.addColumn("grade", StringType.get()).commit();
Periksa skema tabel baru:
table.schema.toString;
Sisipkan lebih banyak data dan lihat evolusi skema. Berikut adalah contohnya.
Menambahkan data baru ke tabel:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save("default.example"); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save("default.example");
Periksa data baru yang dimasukkan:
val read_df2=spark.read.format("iceberg").load("default.example"); read_df2.show;
Melihat histori tabel:
spark.read.format("iceberg").load("default.example.history").show(truncate = false);
Melihat snapshot:
spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
Lihat file manifes:
spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
Melihat file data:
spark.read.format("iceberg").load("default.example.files").show(truncate = false);
Misalkan Anda melakukan kesalahan dengan menambahkan baris dengan nilai
id=6
dan ingin kembali melihat versi tabel yang benar:spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
Ganti
snapshot-id
dengan versi yang ingin Anda kembalikan.
Menggunakan Tabel Hadoop untuk membuat tabel Iceberg
Siapkan konfigurasi Tabel Hadoop untuk membuat tabel Iceberg di spark scala:
import org.apache.hadoop.conf.Configuration import org.apache.iceberg.hadoop.HadoopTables import org.apache.iceberg.Table import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._
Buat tabel untuk menyisipkan dan memperbarui data. Berikut adalah contohnya.
Buat tabel bernama
example
di databasedefault
:val conf = new Configuration(); val tables = new HadoopTables(conf);
Menyisipkan data sampel:
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
Tentukan strategi partisi berdasarkan kolom
id
:val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
Buat tabel:
val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>"; val table = tables.create(df1_schema, partition_spec, table_location);
Tulis data ke tabel:
df1.write.format("iceberg").mode("overwrite").save(table_location);
Membaca data:
val read_df1=spark.read.format("iceberg").load(table_location); read_df1.show;
Ubah skema tabel. Berikut adalah contohnya.
Dapatkan tabel dan tambahkan kolom baru
grade
:val table = tables.load(table_location); table.updateSchema.addColumn("grade", StringType.get()).commit();
Periksa skema tabel baru:
table.schema.toString;
Sisipkan lebih banyak data dan lihat evolusi skema. Berikut adalah contohnya.
Menambahkan data baru ke tabel:
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save(table_location); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save(table_location);
Periksa data baru yang dimasukkan:
val read_df2=spark.read.format("iceberg").load(table_location); read_df2.show;
Melihat histori tabel:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
Melihat snapshot:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
Lihat file manifes:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
Melihat file data:
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
Kembali untuk melihat versi tabel tertentu:
spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
Ganti
snapshot-id
dengan versi yang ingin Anda kembalikan dan tambahkan"L"
di bagian akhir. Misalnya,"3943776515926014142L"
.
Menggunakan tabel Iceberg di Hive
Iceberg mendukung tabel yang dibaca menggunakan Hive dengan menggunakan StorageHandler
. Perhatikan
bahwa hanya versi Hive 2.x dan 3.1.2 yang didukung. Untuk mengetahui informasi selengkapnya, lihat
Apache Iceberg - Hive. Selain itu, tambahkan file JAR Iceberg Hive Runtime ke classpath Hive. Untuk mendownload file JAR, lihat Download Apache Iceberg.
Untuk menempatkan tabel Hive di atas tabel Iceberg, Anda harus membuat tabel Iceberg menggunakan Katalog Hive atau Tabel Hadoop. Selain itu, Anda harus mengonfigurasi Hive dengan tepat untuk membaca data dari tabel Iceberg.
Membaca tabel Iceberg (Hive Catalog) di Hive
Buka klien Hive dan siapkan konfigurasi untuk membaca tabel Iceberg pada sesi klien Hive:
add jar /path/to/iceberg-hive-runtime.jar; set iceberg.engine.hive.enabled=true; set engine.hive.enabled=true; set iceberg.mr.catalog=hive; set hive.vectorized.execution.enabled=false;
Membaca skema dan data tabel. Berikut adalah contohnya.
Periksa skema tabel dan apakah format tabelnya adalah Iceberg:
describe formatted example;
Baca data dari tabel:
select * from example;
Membaca tabel Iceberg (Tabel Hadoop) di Hive
Buka klien Hive dan siapkan konfigurasi untuk membaca tabel Iceberg pada sesi klien Hive:
add jar /path/to/iceberg-hive-runtime.jar; set engine.hive.enabled=true; set hive.vectorized.execution.enabled=false;
Membaca skema dan data tabel. Berikut adalah contohnya.
Buat tabel eksternal (tumpang-tindih tabel Hive di atas tabel Iceberg):
CREATE EXTERNAL TABLE hadoop_table STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>' TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
Periksa skema tabel dan apakah format tabelnya adalah Iceberg:
describe formatted hadoop_table;
Baca data dari tabel:
select * from hadoop_table;
Menggunakan tabel Iceberg di Presto
Kueri Presto menggunakan konektor Hive untuk mendapatkan lokasi partisi, jadi Anda harus mengonfigurasi Presto dengan tepat untuk membaca dan menulis data pada tabel Iceberg. Untuk mengetahui informasi selengkapnya, lihat Presto/Trino - Hive Connector dan Presto/Trino - Iceberg Connector.
Konfigurasi Presto
Di setiap node cluster Dataproc, buat file bernama
iceberg.properties
/etc/presto/conf/catalog/iceberg.properties
dan konfigurasihive.metastore.uri
sebagai berikut:connector.name=iceberg hive.metastore.uri=thrift://<example.net:9083>
Ganti
example.net:9083
dengan host dan port yang benar untuk layanan Thrift metastore Hive Anda.Mulai ulang layanan Presto untuk mengirimkan konfigurasi:
sudo systemctl restart presto.service
Membuat tabel Iceberg di Presto
Buka klien Presto dan gunakan konektor "Iceberg" untuk mendapatkan metastore:
--catalog iceberg --schema default
Buat tabel untuk menyisipkan dan memperbarui data. Berikut adalah contohnya.
Buat tabel bernama
example
di databasedefault
:CREATE TABLE iceberg.default.example ( id integer, name VARCHAR, major VARCHAR, grade VARCHAR) WITH (partitioning = ARRAY['major', 'grade']);
Menyisipkan data sampel:
INSERT INTO iceberg.default.example VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
Membaca data dari tabel:
SELECT * FROM iceberg.default.example;
Masukkan lebih banyak data baru untuk memeriksa snapshot:
INSERT INTO example VALUES (4, 'Cindy', 'UX Design', 'Junior'); INSERT INTO example VALUES (5, 'Amy', 'UX Design', 'Sophomore');
Melihat snapshot:
SELECT snapshot_id FROM iceberg.default."example$snapshots";
Dengan menambahkan perintah
ORDER BY committed_at DESC LIMIT 1;
, Anda dapat menemukan ID snapshot terbaru.Melakukan roll back ke versi tabel tertentu:
CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
Ganti
snapshot-id
dengan versi yang ingin Anda kembalikan.