このページでは、Spark を実行する Dataproc クラスタを作成する方法について説明します。このクラスタを使用して、レイク、ゾーン、アセットの Dataplex Universal Catalog メタデータを操作できます。
概要
Dataproc Metastore サービス インスタンスが Dataplex Universal Catalog レイクに関連付けられた後にクラスタを作成します。これにより、クラスタが Hive Metastore エンドポイントを使用して Dataplex Universal Catalog メタデータにアクセスできるようになります。
Dataplex Universal Catalog 内で管理されているメタデータに Hive Metastore などの標準インターフェースを使用してアクセスし、Spark クエリを強化できます。クエリは Dataproc クラスタで実行されます。
Parquet データの場合は、Spark プロパティ spark.sql.hive.convertMetastoreParquet
を false
に設定して、実行エラーを回避します。詳細。
Dataproc クラスタを作成する
次のコマンドを実行して Dataproc クラスタを作成し、Dataplex Universal Catalog レイクに関連付けられた Dataproc Metastore サービスを指定します。
GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(endpointUri)" | cut -c9-)
WHDIR=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")
METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.version)")
# This command creates a cluster with default settings. You can customize
# it as needed. The --optional-components, --initialization-actions,
# --metadata and --properties flags are used to to connect with
# the associated metastore.
gcloud dataproc clusters create CLUSTER_ID \
--project PROJECT \
--region LOCATION \
--scopes "https://www.googleapis.com/auth/cloud-platform" \
--image-version 2.0-debian10 \
--optional-components=DOCKER \
--initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
--metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
--properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"
メタデータを探索する
DQL クエリを実行してメタデータを探索し、Spark クエリを実行してデータをクエリします。
始める前に
Dataproc クラスタのプライマリ ノードで SSH セッションを開きます。
VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \ --project PROJECT \ --region LOCATION \ --format "value(config.gceClusterConfig.zoneUri)") gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
プライマリ ノードのコマンド プロンプトで、新しい Python REPL を開きます。
python3
データベースの一覧を取得する
レイク内の各 Dataplex Universal Catalog ゾーンは、メタストア データベースにマッピングされます。
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW DATABASES")
df.show()
テーブルの一覧を取得する
いずれかのゾーン内のテーブルをリスト表示します。
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW TABLES IN ZONE_ID")
df.show()
データのクエリ
いずれかのテーブルのデータをクエリします。
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
# Modify the SQL statement to retrieve or filter on table columns.
df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
df.show()
メタデータにテーブルとパーティションを作成する
DDL クエリを実行して、Apache Spark を使用して Dataplex Universal Catalog メタデータにテーブルとパーティションを作成します。
サポートされているデータ型、ファイル形式、行形式の詳細については、サポートされている値をご覧ください。
始める前に
テーブルを作成する前に、基になるデータを含む Cloud Storage バケットにマッピングする Dataplex Universal Catalog アセットを作成します。詳しくは、アセットを追加するをご覧ください。
テーブルを作成する
Parquet、ORC、AVRO、CSV、JSON テーブルがサポートされています。
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
df.show()
テーブルを変更する
Dataplex Universal Catalog では、テーブルのロケーションを変更する、またはテーブルのパーティション列を編集することはできません。テーブルを変更しても、userManaged が自動的に true
に設定されることはありません。
Spark SQL では、テーブルの名前の変更、列の追加、テーブルのファイル形式を設定することが可能です。
テーブルの名前を変更する
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
df.show()
列を追加する
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
df.show()
ファイル形式を設定する
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
df.show()
テーブルを削除する
Dataplex Universal Catalog Metadata API からテーブルを削除しても、Cloud Storage の基盤となるデータは削除されません。
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
df.show()
パーティションを追加する
Dataplex Universal Catalog では、作成後にパーティションを変更することはできません。ただし、パーティションは削除される可能性があります。
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
df.show()
上記の例に示すように、同じパーティション キーと異なるパーティション値の複数のパーティションを追加できます。
パーティションを削除する
パーティションを削除するには、次のコマンドを実行します。
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
df.show()
Iceberg テーブルをクエリする
Apache Spark を使用して Iceberg テーブルにクエリを実行できます。
始める前に
Iceberg を使用して Spark SQL セッションを設定します。
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Iceberg テーブルを作成する
Iceberg テーブルを作成するには、次のコマンドを実行します。
CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');
Iceberg のスナップショットと履歴を確認する
Apache Spark を使用して、Iceberg テーブルのスナップショットと履歴を取得できます。
始める前に
Iceberg をサポートする PySpark セッションを設定します。
pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Iceberg テーブルの履歴を取得する
Iceberg テーブルの履歴を取得するには、次のコマンドを実行します。
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)
Iceberg テーブルのスナップショットを取得する
Iceberg テーブルのスナップショットを取得するには、次のコマンドを実行します。
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)
サポートされているデータ型とファイル形式
サポートされているデータ型は次のように定義されます。
データ型 | 値 |
---|---|
プリミティブ |
|
配列 | ARRAY < DATA_TYPE > |
構造 | STRUCT < COLUMN : DATA_TYPE > |
サポートされているファイル形式は次のとおりです。
TEXTFILE
ORC
PARQUET
AVRO
JSONFILE
ファイル形式の詳細については、ストレージ形式をご覧ください。
サポートされている行形式は次のとおりです。
- DELIMITED [FIELDS TERMINATED BY CHAR]
- SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]
次のステップ
- レイク、ゾーン、アセットのメタデータの管理の詳細を確認する。