Accedere ai metadati in Apache Spark

Questa pagina descrive come creare un cluster Dataproc su cui è in esecuzione Spark. Puoi utilizzare questo cluster per lavorare con i metadati del Catalogo universale Dataplex per lake, zone e asset.

Panoramica

Crea un cluster dopo che l'istanza del servizio Dataproc Metastore è associata al lake Dataplex Universal Catalog per assicurarti che il cluster possa fare affidamento sull'endpoint Hive Metastore per accedere ai metadati di Dataplex Universal Catalog.

È possibile accedere ai metadati gestiti in Dataplex Universal Catalog utilizzando interfacce standard, come Hive Metastore, per eseguire query su Spark. Le query vengono eseguite sul cluster Dataproc.

Per i dati Parquet, imposta la proprietà Spark spark.sql.hive.convertMetastoreParquet su false per evitare errori di esecuzione. Ulteriori dettagli.

Crea un cluster Dataproc

Esegui i seguenti comandi per creare un cluster Dataproc, specificando il servizio Dataproc Metastore associato al lake Dataplex Universal Catalog:

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

Esplorare i metadati

Esegui query DQL per esplorare i metadati ed eseguire query Spark per eseguire query sui dati.

Prima di iniziare

  1. Apri una sessione SSH sul nodo principale del cluster Dataproc.

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. Al prompt dei comandi del nodo principale, apri una nuova REPL Python.

    python3
    

Elenco database

Ogni zona di Dataplex Universal Catalog all'interno del lake viene mappata a un database del metastore.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

Elenca tabelle

Elenca le tabelle in una delle zone.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

Query sui dati

Esegui una query sui dati di una delle tabelle.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

Crea tabelle e partizioni nei metadati

Esegui query DDL per creare tabelle e partizioni nei metadati del Catalogo universale Dataplex utilizzando Apache Spark.

Per ulteriori informazioni sui tipi di dati, sui formati file e sui formati di riga supportati, consulta Valori supportati.

Prima di iniziare

Prima di creare una tabella, crea una risorsa del Catalogo universale Dataplex che si mappa al bucket Cloud Storage contenente i dati sottostanti. Per ulteriori informazioni, consulta Aggiungere un asset.

Creare una tabella

Sono supportate tabelle Parquet, ORC, AVRO, CSV e JSON.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

Modificare una tabella

Dataplex Universal Catalog non ti consente di modificare la posizione di una tabella o di modificare le colonne di partizione di una tabella. La modifica di una tabella non imposta automaticamente userManaged su true.

In Spark SQL, puoi rinominare una tabella, aggiungere colonne e impostare il formato del file di una tabella.

Rinominare una tabella

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

Aggiungi colonne

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

Impostare il formato file

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

Eliminare una tabella

L'eliminazione di una tabella dall'API di metadati di Dataplex Universal Catalog non comporta l'eliminazione dei dati di base in Cloud Storage.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

Aggiungere una partizione

Dataplex Universal Catalog non consente di modificare una partizione una volta creata. Tuttavia, la partizione può essere eliminata.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
  df.show()

Puoi aggiungere più partizioni della stessa chiave di partizione e valori di partizione diversi, come mostrato nell'esempio precedente.

Inserire una partizione

Per eliminare una partizione, esegui il seguente comando:

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

Esegui query sulle tabelle Iceberg

Puoi eseguire query sulle tabelle Iceberg utilizzando Apache Spark.

Prima di iniziare

Configura una sessione Spark SQL con Iceberg.

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Creare una tabella Iceberg

Per creare una tabella Iceberg, esegui il seguente comando:

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

Esplorare lo snapshot e la cronologia di Iceberg

Puoi ottenere istantanee e cronologia delle tabelle Iceberg utilizzando Apache Spark.

Prima di iniziare

Configura una sessione PySpark con il supporto di Iceberg:

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Visualizza la cronologia delle tabelle Iceberg

Per ottenere la cronologia di una tabella Iceberg, esegui il seguente comando:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

Ottenere snapshot delle tabelle Iceberg

Per ottenere uno snapshot di una tabella Iceberg, esegui il seguente comando:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

Tipi di dati e formati di file supportati

I tipi di dati supportati sono definiti come segue:

Tipo di dati Valori
originario
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
Array ARRAY < DATA_TYPE >
Strutturazione STRUCT < COLUMN : DATA_TYPE >

Di seguito sono riportati i formati file supportati:

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

Per saperne di più sui formati file, consulta Formati di archiviazione.

Di seguito sono riportati i formati di riga supportati:

  • DELIMITATI [CAMPI TERMINATI DA CHAR]
  • SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

Passaggi successivi