存取 Apache Spark 中的中繼資料

本頁說明如何建立執行 Spark 的 Dataproc 叢集。您可以使用這個叢集,搭配湖泊、可用區和資產的 Dataplex 通用目錄中繼資料。

總覽

您必須先將 Dataproc Metastore 服務例項與 Dataplex Universal Catalog 湖泊建立關聯,再建立叢集,才能確保叢集可依靠 Hive Metastore 端點存取 Dataplex Universal Catalog 中繼資料。

您可以使用標準介面 (例如 Hive Metastore) 存取 Dataplex 通用目錄中管理的中繼資料,以便執行 Spark 查詢。查詢會在 Dataproc 叢集中執行。

針對 Parquet 資料,請將 Spark 屬性 spark.sql.hive.convertMetastoreParquet 設為 false,以免發生執行錯誤。詳情

建立 Dataproc 叢集

請執行下列指令建立 Dataproc 叢集,並指定與 Dataplex 通用目錄資料湖相關聯的 Dataproc Metastore 服務:

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

探索中繼資料

執行 DQL 查詢來探索中繼資料,並執行 Spark 查詢來查詢資料。

事前準備

  1. 在 Dataproc 叢集的主要節點上開啟 SSH 工作階段。

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. 在主要節點命令提示字元中,開啟新的 Python REPL。

    python3
    

可列出資料庫

湖泊中的每個 Dataplex 通用目錄區域都會對應至中繼資料庫。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

列出表格

列出其中一個區域的資料表。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

查詢資料

查詢其中一個資料表中的資料。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

在中繼資料中建立資料表和分區

執行 DDL 查詢,使用 Apache Spark 在 Dataplex 通用目錄中建立資料表和分區的結構描述。

如要進一步瞭解支援的資料類型、檔案格式和資料列格式,請參閱「支援的值」。

事前準備

建立資料表之前,請先建立 Dataplex 通用目錄資產,並將其對應至含有基礎資料的 Cloud Storage 值區。詳情請參閱「新增資產」。

建立表格

支援 Parquet、ORC、AVRO、CSV 和 JSON 資料表。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

變更資料表

Dataplex Universal Catalog 不允許您變更資料表的位置,或編輯資料表的分區欄位。變更資料表不會自動將 userManaged 設為 true

在 Spark SQL 中,您可以重新命名資料表新增資料欄,以及設定資料表的檔案格式

重新命名資料表

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

新增欄

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

設定檔案格式

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

捨棄資料表

從 Dataplex Universal Catalog 中繼資料 API 中刪除表格,不會刪除 Cloud Storage 中的基礎資料。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

新增分區

Dataplex 通用目錄不允許在建立後變更分割區。不過,您可以刪除分區。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
  df.show()

您可以新增多個具有相同分割鍵和不同分割值的分割區,如上例所示。

放置分區

如要捨棄分割區,請執行下列指令:

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

查詢 Iceberg 資料表

您可以使用 Apache Spark 查詢 Iceberg 資料表。

事前準備

使用 Iceberg 設定 Spark SQL 工作階段。

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

建立 Iceberg 資料表

如要建立 Iceberg 資料表,請執行下列指令:

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

探索 Iceberg 快照和歷程

您可以使用 Apache Spark 取得 Iceberg 資料表的快照和記錄。

事前準備

設定支援 Iceberg 的 PySpark 工作階段:

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

取得 Iceberg 資料表的歷程記錄

如要取得 Iceberg 資料表的記錄,請執行下列指令:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

取得 Iceberg 資料表的快照

如要取得 Iceberg 資料表的快照,請執行下列指令:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

支援的資料類型和檔案格式

支援的資料類型定義如下:

資料類型
原始
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
陣列 ARRAY < DATA_TYPE >
結構 STRUCT < COLUMN : DATA_TYPE >

系統支援的檔案格式如下:

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

如要進一步瞭解檔案格式,請參閱「儲存格式」。

系統支援的資料列格式如下:

  • DELIMITED [FIELDS TERMINATED BY CHAR]
  • SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

後續步驟