이 페이지에서는 Dataproc 클러스터에 연결된 Dataproc Metastore 서비스에 Apache Iceberg 테이블을 사용하는 방법을 설명합니다. Apache Iceberg는 대규모 분석 데이터 세트를 위한 열린 테이블 형식입니다.
호환성
Iceberg 테이블은 다음 기능을 지원합니다.
요인 | 선택 | 삽입 | 테이블 만들기 |
---|---|---|---|
Spark | ✓ | ✓ | ✓ |
Hive | ✓ | ✓ | |
Presto | ✓ | ✓ | ✓ |
시작하기 전에
Spark에 Iceberg 테이블 사용
다음 예시는 Spark에 Iceberg 테이블을 사용하는 이유를 보여줍니다.
Iceberg 테이블은 읽기 및 쓰기 작업을 지원합니다. 자세한 내용은 Apache Iceberg - Spark를 참조하세요.
Spark 구성
먼저 Spark 셸을 시작하고 Cloud Storage 버킷을 사용하여 데이터를 저장합니다. Spark 설치에 Iceberg를 포함하려면 Iceberg Spark 런타임 JAR 파일을 Spark의 JAR 폴더에 추가합니다. JAR 파일을 다운로드하려면 Apache Iceberg 다운로드를 참조하세요. 다음 명령어는 Apache Iceberg를 지원하는 Spark 셸을 시작합니다.
$ spark-shell --conf spark.sql.warehouse.dir=gs://BUCKET_NAME/spark-warehouse --jars /path/to/iceberg-spark-runtime.jar
Hive 카탈로그를 사용하여 Iceberg 테이블 만들기
Hive 카탈로그 구성을 설정하여 Spark Scala에서 Iceberg 테이블을 만듭니다.
import org.apache.iceberg.hive.HiveCatalog import org.apache.iceberg.catalog._ import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._ import java.util.HashMap
데이터를 삽입하고 업데이트할 테이블을 만듭니다. 다음은 그 예시입니다.
default
데이터베이스 아래에example
이라는 테이블을 만듭니다.val catalog = new HiveCatalog(); catalog.setConf(spark.sparkContext.hadoopConfiguration); catalog.initialize("hive", new HashMap[String,String]()); val name = TableIdentifier.of("default","example");
샘플 데이터를 삽입합니다.
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
id
열을 기준으로 분할 전략을 지정합니다.val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
테이블을 만듭니다.
val table=catalog.createTable(name,df1_schema,partition_spec);
Iceberg 스토리지 핸들러와 SerDe를 테이블 속성으로 추가합니다.
table.updateProperties().set("engine.hive.enabled", "true").commit();
테이블에 데이터를 씁니다.
df1.write.format("iceberg").mode("overwrite").save("default.example");
데이터를 읽습니다.
val read_df1=spark.read.format("iceberg").load("default.example"); read_df1.show;
테이블 스키마를 변경합니다. 다음은 그 예시입니다.
테이블을 가져오고 새 열
grade
를 추가합니다.val table = catalog.loadTable(TableIdentifier.of("default", "example")); table.updateSchema.addColumn("grade", StringType.get()).commit();
새 테이블 스키마를 확인합니다.
table.schema.toString;
더 많은 데이터를 삽입하고 스키마 변경을 확인합니다. 다음은 그 예시입니다.
테이블에 새 데이터를 추가합니다.
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save("default.example"); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save("default.example");
삽입된 새 데이터를 확인합니다.
val read_df2=spark.read.format("iceberg").load("default.example"); read_df2.show;
테이블 기록을 봅니다.
spark.read.format("iceberg").load("default.example.history").show(truncate = false);
스냅샷을 확인합니다.
spark.read.format("iceberg").load("default.example.snapshots").show(truncate = false);
매니페스트 파일을 봅니다.
spark.read.format("iceberg").load("default.example.manifests").show(truncate = false);
데이터 파일을 확인합니다.
spark.read.format("iceberg").load("default.example.files").show(truncate = false);
실수로
id=6
값을 포함하는 행을 추가하여 뒤로 돌아가서 올바른 테이블 버전을 보려고 하는 경우를 가정해 보겠습니다.spark.read.format("iceberg").option("snapshot-id","2273922295095144317").load("default.example").show();
snapshot-id
를 되돌리려는 버전으로 바꿉니다.
Hadoop 테이블을 사용하여 Iceberg 테이블 만들기
Spark Scala에서 Iceberg 테이블을 만들도록 Hadoop 테이블 구성을 설정합니다.
import org.apache.hadoop.conf.Configuration import org.apache.iceberg.hadoop.HadoopTables import org.apache.iceberg.Table import org.apache.iceberg.Schema import org.apache.iceberg.types.Types._ import org.apache.iceberg.PartitionSpec import org.apache.iceberg.spark.SparkSchemaUtil import org.apache.spark.sql._
데이터를 삽입하고 업데이트할 테이블을 만듭니다. 다음은 그 예시입니다.
default
데이터베이스 아래에example
이라는 테이블을 만듭니다.val conf = new Configuration(); val tables = new HadoopTables(conf);
샘플 데이터를 삽입합니다.
val df1 = Seq((1,"Vincent","Computer Science"),(2,"Dan", "Economics"),(3,"Bob", "Politics"),(4,"Cindy", "UX Design")).toDF("id","name","major"); val df1_schema = SparkSchemaUtil.convert(df1.schema);
id
열을 기준으로 분할 전략을 지정합니다.val partition_spec=PartitionSpec.builderFor(df1_schema).identity("id").build;
테이블을 만듭니다.
val table_location = "gs://<gcs-bucket-name>/hive-warehouse/<database-name>"; val table = tables.create(df1_schema, partition_spec, table_location);
테이블에 데이터를 씁니다.
df1.write.format("iceberg").mode("overwrite").save(table_location);
데이터를 읽습니다.
val read_df1=spark.read.format("iceberg").load(table_location); read_df1.show;
테이블 스키마를 변경합니다. 다음은 그 예시입니다.
테이블을 가져오고 새 열
grade
를 추가합니다.val table = tables.load(table_location); table.updateSchema.addColumn("grade", StringType.get()).commit();
새 테이블 스키마를 확인합니다.
table.schema.toString;
더 많은 데이터를 삽입하고 스키마 변경을 확인합니다. 다음은 그 예시입니다.
테이블에 새 데이터를 추가합니다.
val df2=Seq((5,"Amy","UX Design","Sophomore")).toDF("id","name","major","grade"); df2.write.format("iceberg").mode("append").save(table_location); val df3=Seq((6,"Rachael","Economics","Freshman")).toDF("id","name","major","grade"); df3.write.format("iceberg").mode("append").save(table_location);
삽입된 새 데이터를 확인합니다.
val read_df2=spark.read.format("iceberg").load(table_location); read_df2.show;
테이블 기록을 봅니다.
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#history").show(truncate=false);
스냅샷을 확인합니다.
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#snapshots").show(truncate=false);
매니페스트 파일을 봅니다.
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#manifests").show(truncate=false);
데이터 파일을 확인합니다.
spark.read.format("iceberg").load("gs://<gcs-bucket-name>/hive-warehouse/<database-name>#files").show(truncate=false);
특정 버전의 테이블을 보기 위해 돌아가려면 다음 안내를 따르세요.
spark.read.format("iceberg").option("snapshot-id","3943776515926014142L").format("iceberg").load(table_location).show;
snapshot-id
를 되돌리려는 버전으로 바꾸고"L"
을 끝에 추가합니다. 예를 들면"3943776515926014142L"
입니다.
Hive에서 Iceberg 테이블 사용하기
Iceberg는 StorageHandler
를 사용하여 Hive를 사용하는 테이블을 지원합니다. Hive 2.x 및 3.1.2 버전만 지원됩니다. 자세한 내용은 Apache Iceberg - Hive를 참조하세요. 또한 Iceberg Hive 런타임 JAR 파일을 Hive 클래스 경로에 추가합니다. JAR 파일을 다운로드하려면 Apache Iceberg 다운로드를 참조하세요.
Iceberg 테이블 위에 Hive 테이블을 오버레이하려면 Hive 카탈로그 또는 Hadoop 테이블을 사용하여 Iceberg 테이블을 만들어야 합니다. 또한 Iceberg 테이블에서 데이터를 읽으려면 Hive를 구성해야 합니다.
Hive에서 Iceberg 테이블(Hive 카탈로그) 읽기
Hive 클라이언트를 열고 Hive 클라이언트 세션에서 Iceberg 테이블을 읽도록 구성을 설정합니다.
add jar /path/to/iceberg-hive-runtime.jar; set iceberg.engine.hive.enabled=true; set engine.hive.enabled=true; set iceberg.mr.catalog=hive; set hive.vectorized.execution.enabled=false;
테이블 스키마 및 데이터를 읽습니다. 다음은 그 예시입니다.
테이블 스키마를 확인하고 테이블 형식이 Iceberg인지 확인합니다.
describe formatted example;
테이블에서 데이터를 읽습니다.
select * from example;
Hive에서 Iceberg 테이블(Hadoop 테이블) 읽기
Hive 클라이언트를 열고 Hive 클라이언트 세션에서 Iceberg 테이블을 읽도록 구성을 설정합니다.
add jar /path/to/iceberg-hive-runtime.jar; set engine.hive.enabled=true; set hive.vectorized.execution.enabled=false;
테이블 스키마 및 데이터를 읽습니다. 다음은 그 예시입니다.
외부 테이블을 만듭니다(Iceberg 테이블 위에 Hive 테이블 오버레이).
CREATE EXTERNAL TABLE hadoop_table STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'gs://<gcs-bucket-name>/hive-warehouse/<database-name>' TBLPROPERTIES ('iceberg.catalog'='gs://<gcs-bucket-name>/hive-warehouse/<database-name>');
테이블 스키마를 확인하고 테이블 형식이 Iceberg인지 확인합니다.
describe formatted hadoop_table;
테이블에서 데이터를 읽습니다.
select * from hadoop_table;
Presto에서 Iceberg 테이블 사용하기
Presto 쿼리는 Hive 커넥터를 사용하여 파티션 위치를 가져오므로 Presto가 Iceberg 테이블에서 데이터를 읽고 쓰도록 구성해야 합니다. 자세한 내용은 Presto/Trino - Hive 커넥터 및 Presto/Trino - Iceberg 커넥터를 참조하세요.
Presto 구성
각 Dataproc 클러스터 노드에서
iceberg.properties
/etc/presto/conf/catalog/iceberg.properties
라는 파일을 만들고hive.metastore.uri
를 다음과 같이 구성합니다.connector.name=iceberg hive.metastore.uri=thrift://<example.net:9083>
example.net:9083
을 Hive Metastore Thrift 서비스의 올바른 호스트 및 포트로 바꿉니다.Presto 서비스를 다시 시작하여 구성을 푸시합니다.
sudo systemctl restart presto.service
Presto에서 Iceberg 테이블 만들기
Presto 클라이언트를 열고 'Iceberg' 커넥터를 사용하여 Metastore를 가져옵니다.
--catalog iceberg --schema default
데이터를 삽입하고 업데이트할 테이블을 만듭니다. 다음은 그 예시입니다.
default
데이터베이스 아래에example
이라는 테이블을 만듭니다.CREATE TABLE iceberg.default.example ( id integer, name VARCHAR, major VARCHAR, grade VARCHAR) WITH (partitioning = ARRAY['major', 'grade']);
샘플 데이터를 삽입합니다.
INSERT INTO iceberg.default.example VALUES (1, 'Vincent', 'Computer Science', 'Junior'), (2,'Dan', 'Economics', 'Senior'), (3,'Bob', 'Politics', 'Freshman');
테이블에서 데이터를 읽습니다.
SELECT * FROM iceberg.default.example;
새 데이터를 삽입하여 스냅샷을 확인합니다.
INSERT INTO example VALUES (4, 'Cindy', 'UX Design', 'Junior'); INSERT INTO example VALUES (5, 'Amy', 'UX Design', 'Sophomore');
스냅샷을 확인합니다.
SELECT snapshot_id FROM iceberg.default."example$snapshots";
ORDER BY committed_at DESC LIMIT 1;
명령어를 추가하면 최신 스냅샷 ID를 찾을 수 있습니다.테이블의 특정 버전으로 롤백합니다.
CALL iceberg.system.rollback_to_snapshot('default', 'example', 8424394414541782448);
snapshot-id
를 되돌리려는 버전으로 바꿉니다.