Apache Spark에서 메타데이터에 액세스

이 페이지에서는 Spark를 실행하는 Dataproc 클러스터를 만드는 방법을 설명합니다.

개요

Dataproc Metastore 서비스 인스턴스가 Dataplex 레이크와 연결되면 클러스터가 Hive Metastore 엔드포인트를 사용하여 Dataplex 메타데이터에 액세스할 수 있도록 클러스터를 만듭니다.

Hive Metastore와 같은 표준 인터페이스를 사용하여 Dataplex 내에서 관리되는 메타데이터에 액세스하여 Spark 쿼리를 지원할 수 있습니다. 쿼리는 Dataproc 클러스터에서 실행됩니다.

Parquet 데이터의 경우 실행 오류를 방지하기 위해 Spark 속성 spark.sql.hive.convertMetastoreParquetfalse로 설정합니다. 자세히 알아보기

Dataproc 클러스터 만들기

다음 명령어를 실행하여 Dataproc 클러스터를 만들고 Dataplex 레이크와 연결된 Dataproc Metastore 서비스를 지정합니다.

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

메타데이터 살펴보기

DQL 쿼리를 실행하여 메타데이터를 탐색하고 Spark 쿼리를 실행하여 데이터를 쿼리합니다.

시작하기 전에

  1. Dataproc 클러스터의 기본 노드에서 SSH 세션을 엽니다.

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. 기본 노드 명령 프롬프트에서 새 Python REPL을 엽니다.

    python3
    

데이터베이스 나열

레이크 내의 각 Dataplex 영역은 메타스토어 데이터베이스에 매핑됩니다.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

표 나열

영역 중 하나의 테이블을 나열합니다.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

데이터 쿼리

테이블 중 하나에서 데이터를 쿼리합니다.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

메타데이터에서 테이블 및 파티션 만들기

DDL 쿼리를 실행하여 Apache Spark를 사용하여 Dataplex 메타데이터에 테이블과 파티션을 만듭니다.

지원되는 데이터 유형, 파일 형식, 행 형식에 대한 자세한 내용은 지원되는 값을 참조하세요.

시작하기 전에

테이블을 만들기 전에 기반 데이터가 포함된 Cloud Storage 버킷에 매핑되는 Dataplex 애셋을 만듭니다. 자세한 내용은 애셋 추가를 참조하세요.

테이블 만들기

Parquet, ORC, AVRO, CSV, JSON 테이블이 지원됩니다.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

테이블 변경

Dataplex에서는 테이블의 위치를 변경하거나 테이블의 파티션 열을 수정할 수 없습니다. 테이블을 변경해도 userManagedtrue로 자동 설정되지는 않습니다.

Spark SQL에서는 테이블 이름을 바꾸고, 열을 추가하고, 테이블의 파일 형식을 설정할 수 있습니다.

테이블 이름 바꾸기

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

열 추가

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

파일 형식 설정

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

테이블 삭제

Dataplex의 메타데이터 API에서 테이블을 삭제해도 Cloud Storage의 기본 데이터는 삭제되지 않습니다.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

파티션 추가

Dataplex에서는 생성된 파티션을 변경할 수 없습니다. 하지만 파티션은 삭제할 수 있습니다.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
  df.show()

위 예시와 같이 동일한 파티션 키와 다른 파티션 값의 파티션을 여러 개 추가할 수 있습니다.

파티션 삭제

파티션을 삭제하려면 다음 명령어를 실행합니다.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

Iceberg 테이블 쿼리

Apache Spark를 사용하여 Iceberg 테이블을 쿼리할 수 있습니다.

시작하기 전에

Iceberg가 포함된 Spark SQL 세션을 설정합니다.

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Iceberg 테이블 만들기

Iceberg 테이블을 만들려면 다음 명령어를 실행합니다.

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

Iceberg 스냅샷 및 기록 살펴보기

Apache Spark를 사용하여 Iceberg 테이블의 스냅샷과 기록을 가져올 수 있습니다.

시작하기 전에

Iceberg 지원이 포함된 PySpark 세션을 설정합니다.

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Iceberg 테이블의 기록 가져오기

Iceberg 테이블의 기록을 가져오려면 다음 명령어를 실행합니다.

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

Iceberg 테이블의 스냅샷 가져오기

Iceberg 테이블의 스냅샷을 가져오려면 다음 명령어를 실행합니다.

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

지원되는 데이터 유형 및 파일 형식

지원되는 데이터 유형은 다음과 같이 정의됩니다.

데이터 유형
기본 역할
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
배열 ARRAY < DATA_TYPE >
구조 STRUCT < COLUMN : DATA_TYPE >

지원되는 파일 형식은 다음과 같습니다.

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

파일 형식에 관한 자세한 내용은 스토리지 형식을 참조하세요.

지원되는 행 형식은 다음과 같습니다.

  • DELIMITED [FIELDS TERMINATED BY CHAR]
  • SERDE SERDE_NAME [WITH SERDEPROPERTIES(PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

다음 단계