Acessar metadados no Apache Spark

Esta página descreve como criar um cluster do Dataproc que executa o Spark.

Visão geral

Você cria um cluster após o serviço Metastore do Dataproc está associada ao lake do Dataplex para garantir que a cluster pode confiar no endpoint Hive Metastore para ter acesso a Metadados do Dataplex.

Os metadados gerenciados no Dataplex podem ser acessados usando interfaces de rede, como Hive Metastore, para realizar consultas Spark. As consultas são executadas no cluster do Dataproc.

Para dados Parquet, defina a propriedade spark.sql.hive.convertMetastoreParquet do Spark como false para evitar erros de execução. Mais detalhes.

Criar um cluster do Dataproc

Execute os comandos abaixo para criar um cluster do Dataproc, especificando o serviço do Dataproc Metastore associado ao lago do Dataplex:

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

Analisar metadados

Executar consultas DQL para explorar os metadados e executar consultas Spark para consultar dados.

Antes de começar

  1. Abra uma sessão SSH no nó principal do cluster do Dataproc.

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. No prompt de comando do nó principal, abra um novo REPL do Python.

    python3
    

Listar bancos de dados

Cada zona do Dataplex no lake é mapeada para um banco de dados de metastore.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

Listar tabelas

Listar tabelas em uma das zonas.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

Consultar dados

Consulte os dados em uma das tabelas.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

Criar tabelas e partições em metadados

Executar consultas DDL para criar tabelas e partições nos metadados do Dataplex usando o Apache Spark.

Para mais informações sobre os tipos de dados, formatos de arquivo e formatos de linha aceitos, consulte Valores aceitos.

Antes de começar

Antes de criar uma tabela, crie um recurso do Dataplex que seja mapeado para o bucket do Cloud Storage que contém os dados. Para mais informações, consulte Adicionar um recurso.

Criar uma tabela

Tabelas Parquet, ORC, AVRO, CSV e JSON são compatíveis.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

Alterar uma tabela

O Dataplex não permite alterar o local de uma tabela ou editar a de partições de uma tabela. Alterar uma tabela não define automaticamente userManaged para true.

No Spark SQL, é possível renomear uma tabela, adicionar colunas e definir o formato de arquivo de uma tabela.

Renomear uma tabela

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

Adicionar colunas

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

Definir o formato do arquivo

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

Excluir uma tabela

A exclusão de uma tabela da API de metadados do Dataplex não exclui os dados no Cloud Storage.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

Adicionar uma partição

O Dataplex não permite alterar uma partição depois que ela é criada. No entanto, a partição pode ser descartada.

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
  df.show()

É possível adicionar várias partições da mesma chave de partição e valores de partição diferentes, como mostrado no exemplo anterior.

Excluir uma partição

Para excluir uma partição, execute o seguinte comando:

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

Consultar tabelas do Iceberg

É possível consultar tabelas do Iceberg usando o Apache Spark.

Antes de começar

Configurar uma sessão do Spark SQL com o Iceberg.

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Crie uma tabela de Iceberg

Para criar uma tabela do Iceberg, execute o seguinte comando:

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

Explore a história e o instantâneo do Iceberg

É possível acessar snapshots e o histórico das tabelas do Iceberg usando o Apache Spark.

Antes de começar

Configurar uma sessão do PySpark com suporte do Iceberg.

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Conferir o histórico das tabelas Iceberg

Para acessar o histórico de uma tabela Iceberg, execute o seguinte comando:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

Gerar snapshots de tabelas do Iceberg

Para conseguir um snapshot de uma tabela do Iceberg, execute o seguinte comando:

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

Tipos de dados e formatos de arquivo compatíveis

Os tipos de dados compatíveis são definidos da seguinte maneira:

Tipo de dado Valores
Primário
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
Matriz ARRAY < DATA_TYPE >
Estrutura STRUCT < COLUMN : DATA_TYPE >

Os formatos de arquivo compatíveis são definidos da seguinte maneira:

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

Para mais informações sobre os formatos de arquivo, consulte Formatos de armazenamento.

Os formatos de linha compatíveis são definidos da seguinte maneira:

  • [CAMPOS ENCERRADOS POR CHAR]
  • SERDE SERDE_NAME [COM SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

A seguir