Usar o metastore do BigLake com o Dataproc

Neste documento, explicamos como usar o metastore do BigLake com o Dataproc no Compute Engine. Essa conexão oferece um metastore único e compartilhado que funciona em mecanismos de software de código aberto, como o Apache Spark ou o Apache Flink.

Antes de começar

  1. Ative o faturamento no projeto Google Cloud . Saiba como verificar se o faturamento está ativado em um projeto.
  2. Ative as APIs BigQuery e Dataproc.

    Ativar as APIs

  3. Opcional: entenda como o BigLake Metastore funciona e por que você deve usá-lo.

Funções exigidas

Para receber as permissões necessárias para usar o Spark ou o Flink e o Dataproc com o metastore do BigLake como um repositório de metadados, peça ao administrador para conceder a você os seguintes papéis do IAM:

Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Também é possível conseguir as permissões necessárias por meio de papéis personalizados ou de outros papéis predefinidos.

Fluxo de trabalho geral

Para usar o Dataproc no Compute Engine com o metastore do BigLake, siga estas etapas gerais:

  1. Crie ou configure um cluster do Dataproc.
  2. Conecte-se ao seu mecanismo de software de código aberto preferido, como Spark ou Flink.
  3. Use um arquivo JAR para instalar o plug-in de catálogo do Apache Iceberg no cluster.
  4. Crie e gerencie os recursos do metastore do BigLake conforme necessário, dependendo do mecanismo de software de código aberto que você está usando.
  5. No BigQuery, acesse e use os recursos do metastore do BigLake.

Conectar o BigLake Metastore ao Spark

As instruções a seguir mostram como conectar o Dataproc ao metastore do BigLake usando o Spark SQL interativo.

Baixar o plug-in do catálogo do Iceberg

Para conectar o BigLake Metastore ao Dataproc e ao Spark, use o arquivo JAR do plug-in do catálogo do Iceberg do BigLake Metastore.

Esse arquivo está incluído por padrão na versão 2.2 da imagem do Dataproc. Se os clusters do Dataproc não tiverem acesso direto à Internet, baixe o plug-in e faça upload dele para um bucket do Cloud Storage que o cluster do Dataproc possa acessar.

Faça o download do plug-in do catálogo do Iceberg do metastore do BigLake.

Configurar um cluster do Dataproc

Antes de se conectar ao metastore do BigLake, configure um cluster do Dataproc.

Para isso, crie um cluster ou use um já existente. Depois, você usa esse cluster para executar o Spark SQL interativo e gerenciar os recursos do metastore do BigLake.

  • A sub-rede na região em que o cluster é criado precisa ter o Acesso privado do Google (PGA) ativado. Por padrão, as VMs de cluster do Dataproc, criadas com uma versão de imagem 2.2 (padrão) ou posterior, têm apenas endereços IP internos. Para permitir que as VMs do cluster se comuniquem com as APIs do Google, ative o Acesso privado do Google na sub-rede default (ou nome de rede especificado pelo usuário, se aplicável) na região em que o cluster foi criado.

  • Se você quiser executar o exemplo da interface da Web do Zeppelin neste guia, use ou crie um cluster do Dataproc com o componente opcional do Zeppelin ativado.

Novo cluster

Para criar um cluster do Dataproc, execute o seguinte comando gcloud dataproc clusters create. Essa configuração contém as configurações necessárias para usar o BigLake Metastore.

gcloud dataproc clusters create CLUSTER_NAME \
  --project=PROJECT_ID \
  --region=LOCATION \
  --optional-components=ZEPPELIN \
  --enable-component-gateway \
  --single-node

Substitua:

  • CLUSTER_NAME: um nome para o cluster do Dataproc.
  • PROJECT_ID: o ID do Google Cloud projeto em que você está criando o cluster.
  • LOCATION: a Google Cloud região em que você está criando o cluster.

Cluster existente

Para configurar um cluster atual, adicione o seguinte tempo de execução do Iceberg Spark ao seu cluster.

org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1

Você pode adicionar o tempo de execução usando uma das seguintes opções:

Enviar um job do Spark

Para enviar um job do Spark, use um dos seguintes métodos:

CLI da gcloud

gcloud dataproc jobs submit spark-sql \
--project=PROJECT_ID \
--cluster=CLUSTER_NAME \
--region==REGION \
--jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
--properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
--execute="SPARK_SQL_COMMAND"

Substitua:

  • PROJECT_ID: o ID do Google Cloud projeto que contém o cluster do Dataproc.
  • CLUSTER_NAME: o nome do cluster do Dataproc que você está usando para executar o job do Spark SQL.
  • REGION: a região do Compute Engine em que o cluster está localizado.
  • LOCATION: o local dos recursos do BigQuery.
  • CATALOG_NAME: o nome do catálogo do Spark que você está usando com seu job do SQL.
  • WAREHOUSE_DIRECTORY: a pasta do Cloud Storage que contém seu data warehouse. Esse valor começa com gs://.
  • SPARK_SQL_COMMAND: a consulta do Spark SQL que você quer executar. Essa consulta inclui os comandos para criar seus recursos. Por exemplo, para criar um namespace e uma tabela.

Spark interativo

Conectar-se ao Spark e instalar o plug-in de catálogo

Para instalar o plug-in de catálogo do BigLake Metastore, conecte-se ao cluster do Dataproc usando SSH.

  1. No console Google Cloud , acesse a página Instâncias de VM.
  2. Para se conectar a uma instância de VM do Dataproc, clique em SSH na lista de instâncias de máquina virtual. O resultado será assim:

    Connected, host fingerprint: ssh-rsa ...
    Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
    ...
    example-cluster@cluster-1-m:~$
    
  3. No terminal, execute o seguinte comando de inicialização do metastore do BigLake:

    spark-sql \
    --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
    --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
    --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    Substitua:

    • CATALOG_NAME: o nome do catálogo do Spark que você está usando com seu job do SQL.
    • PROJECT_ID: o ID do Google Cloud projeto do catálogo do BigLake Metastore ao qual seu catálogo do Spark está vinculado.
    • LOCATION: o Google Cloud local do metastore do BigLake.
    • WAREHOUSE_DIRECTORY: a pasta do Cloud Storage que contém seu data warehouse. Esse valor começa com gs://.

    Depois de se conectar a um cluster, o terminal do Spark vai mostrar o prompt spark-sql.

    spark-sql (default)>
    

Gerenciar recursos do metastore do BigLake

Agora você está conectado ao BigLake Metastore. É possível conferir seus recursos atuais ou criar novos com base nos metadados armazenados no metastore do BigLake.

Por exemplo, execute os seguintes comandos na sessão interativa do Spark SQL para criar um namespace e uma tabela do Iceberg.

  • Use o catálogo personalizado do Iceberg:

    USE `CATALOG_NAME`;
  • Para criar um namespace:

    CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;
  • Use o namespace criado:

    USE NAMESPACE_NAME;
  • Crie uma tabela do Iceberg:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;
  • Insira uma linha da tabela:

    INSERT INTO TABLE_NAME VALUES (1, "first row");
  • Adicione uma coluna à tabela:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
  • Ver metadados da tabela:

    DESCRIBE EXTENDED TABLE_NAME;
  • Liste as tabelas no namespace:

    SHOW TABLES;

Notebook Zeppelin

  1. No console Google Cloud , acesse a página Clusters do Dataproc.

    Acessar clusters do Dataproc

  2. Clique no nome do cluster que você quer usar.

    A página Detalhes do cluster é aberta.

  3. No menu de navegação, clique em Interfaces da Web.

  4. Em Gateway de componentes, clique em Zeppelin. A página do notebook Zeppelin é aberta.

  5. No menu de navegação, clique em Notebook e em +Criar nova nota.

  6. Na caixa de diálogo, insira um nome para o notebook. Deixe Spark selecionado como o interpretador padrão.

  7. Clique em Criar. Um novo notebook é criado.

  8. No notebook, clique no menu de configurações e em Interpreter.

  9. No campo Pesquisar intérpretes, procure Spark.

  10. Clique em Editar.

  11. No campo Spark.jars, insira o URI do jar do Spark.

    https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar
    
  12. Clique em Salvar.

  13. Clique em OK.

  14. Copie o seguinte código PySpark no seu notebook do Zeppelin.

    %pyspark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.sql("select version()").show()
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME;").show()

    Substitua:

    • CATALOG_NAME: o nome do catálogo do Spark a ser usado no job do SQL.
    • PROJECT_ID: o ID do Google Cloud projeto que contém o cluster do Dataproc.
    • WAREHOUSE_DIRECTORY: a pasta do Cloud Storage que contém seu data warehouse. Esse valor começa com gs://.
    • NAMESPACE_NAME: o nome do namespace que referencia sua tabela do Spark.
    • WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage em que seu data warehouse está armazenado.
    • TABLE_NAME: um nome para sua tabela do Spark.
  15. Clique no ícone de execução ou pressione Shift-Enter para executar o código. Quando o job for concluído, a mensagem de status vai mostrar "Spark Job Finished", e a saída vai exibir o conteúdo da tabela:

As instruções a seguir mostram como conectar o Dataproc ao metastore do BigLake usando o cliente Flink SQL.

Para conectar o metastore do BigLake ao Flink, faça o seguinte:

  1. Crie um cluster do Dataproc com o componente opcional Flink ativado e verifique se você está usando o Dataproc 2.2 ou uma versão mais recente.
  2. No Google Cloud console, acesse a página Instâncias de VM:

    Acessar instâncias de VM

  3. Na lista de instâncias de máquina virtual, clique em SSH para se conectar a uma instância de VM do Dataproc.

  4. Configure o plug-in do catálogo personalizado do Iceberg para o BigLake Metastore:

    FLINK_VERSION=1.17
    ICEBERG_VERSION=1.5.2
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
  5. Inicie a sessão do Flink no YARN:

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
      -s yarn-session
  6. Crie um catálogo no Flink:

    CREATE CATALOG CATALOG_NAME WITH (
      'type'='iceberg',
      'warehouse'='WAREHOUSE_DIRECTORY',
      'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
      'gcp_project'='PROJECT_ID',
      'gcp_location'='LOCATION'
    );

    Substitua:

    • CATALOG_NAME: o identificador do catálogo do Flink, que está vinculado a um catálogo do BigLake Metastore.
    • WAREHOUSE_DIRECTORY: o caminho base para o diretório do data warehouse (a pasta do Cloud Storage em que o Flink cria arquivos). Esse valor começa com gs://.
    • PROJECT_ID: o ID do projeto do catálogo do BigLake Metastore ao qual o catálogo do Flink está vinculado.
    • LOCATION: o local dos recursos do BigQuery.

Sua sessão do Flink agora está conectada ao metastore do BigLake, e você pode executar comandos SQL do Flink.

Agora que você está conectado ao BigLake Metastore, é possível criar e visualizar recursos com base nos metadados armazenados nele.

Por exemplo, execute os comandos a seguir na sua sessão interativa do Flink SQL para criar um banco de dados e uma tabela do Iceberg.

  1. Use o catálogo personalizado do Iceberg:

    USE CATALOG CATALOG_NAME;

    Substitua CATALOG_NAME pelo identificador do catálogo do Flink.

  2. Crie um banco de dados, que cria um conjunto de dados no BigQuery:

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    Substitua DATABASE_NAME pelo nome do novo banco de dados.

  3. Use o banco de dados que você criou:

    USE DATABASE_NAME;
  4. Crie uma tabela do Iceberg. O comando a seguir cria uma tabela de vendas de exemplo:

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
        order_number BIGINT,
        price        DECIMAL(32,2),
        buyer        ROW<first_name STRING, last_name STRING>,
        order_time   TIMESTAMP(3)
    );

    Substitua ICEBERG_TABLE_NAME por um nome para a nova tabela.

  5. Ver metadados da tabela:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. Liste as tabelas no banco de dados:

    SHOW TABLES;

Ingerir dados na tabela

Depois de criar uma tabela do Iceberg na seção anterior, use o Flink DataGen como uma fonte de dados para ingerir dados em tempo real na sua tabela. As etapas a seguir são um exemplo desse fluxo de trabalho:

  1. Crie uma tabela temporária usando o DataGen:

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
        'connector' = 'datagen',
        'rows-per-second' = '10',
        'fields.order_number.kind' = 'sequence',
        'fields.order_number.start' = '1',
        'fields.order_number.end' = '1000000',
        'fields.price.min' = '0',
        'fields.price.max' = '10000',
        'fields.buyer.first_name.length' = '10',
        'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    Substitua:

    • DATABASE_NAME: o nome do banco de dados para armazenar sua tabela temporária.
    • TEMP_TABLE_NAME: um nome para sua tabela temporária.
    • ICEBERG_TABLE_NAME: o nome da tabela do Iceberg que você criou na seção anterior.
  2. Defina o paralelismo como 1:

    SET 'parallelism.default' = '1';
  3. Defina o intervalo de checkpoint:

    SET 'execution.checkpointing.interval' = '10second';
  4. Defina o checkpoint:

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. Inicie o job de transmissão em tempo real:

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    O resultado será assim:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. Para verificar o status do job de streaming, faça o seguinte:

    1. No console Google Cloud , acesse a página Clusters.

      Acessar Clusters

    2. Selecione o cluster.

    3. Clique na guia Interfaces da Web.

    4. Clique no link YARN ResourceManager.

    5. Na interface do YARN ResourceManager, encontre sua sessão do Flink e clique no link ApplicationMaster em IU de rastreamento.

    6. Na coluna Status, confirme se o status do job é Em execução.

  7. Consultar dados de streaming no cliente SQL do Flink:

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. Consultar dados de streaming no BigQuery:

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. Encerre o job de streaming no cliente Flink SQL:

    STOP JOB 'JOB_ID';

    Substitua JOB_ID pelo ID do job que foi exibido na saída quando você criou o job de streaming.

A seguir