Usar a metastore do BigQuery com o Spark no BigQuery Studio

Este documento explica como usar a metastore do BigQuery com o Apache Spark no BigQuery Studio.

É possível usar o Spark no BigQuery Studio para criar uma tabela do Apache Iceberg com o Spark no BigQuery Studio. Depois de criar a tabela, você pode consultar os dados do Spark. Também é possível consultar os mesmos dados no console do BigQuery usando SQL.

Antes de começar

  1. Solicite acesso ao Spark no BigQuery Studio pelo formulário de inscrição abaixo.
  2. Ative o faturamento do projeto Google Cloud . Saiba como verificar se o faturamento está ativado em um projeto.
  3. Ative as APIs BigQuery e Dataproc.

    Ativar as APIs

  4. Opcional: entenda como a metastore do BigQuery funciona e por que você deve usá-la.

Funções exigidas

Para receber as permissões necessárias para usar os notebooks do Spark no BigQuery Studio, peça ao administrador para conceder a você os seguintes papéis do IAM:

  • Crie tabelas de metastore no Spark: Editor de dados do BigQuery (roles/bigquery.dataEditor) no projeto
  • Crie uma sessão do Spark com base nas tabelas do metastore do notebook no Spark: Dataproc Worker (roles/dataproc.serverlessEditor) na conta de usuário

Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Também é possível conseguir as permissões necessárias por meio de papéis personalizados ou de outros papéis predefinidos.

Conectar com um notebook

O exemplo a seguir mostra como configurar um caderno do Spark para interagir com tabelas do Iceberg armazenadas na metastore do BigQuery.

Neste exemplo, você configura uma sessão do Spark, cria um namespace e uma tabela, adiciona alguns dados à tabela e consulta os dados no BigQuery Studio.

  1. Crie um notebook do Spark no BigQuery Studio.

  2. Defina um catálogo, um namespace e um diretório de repositório.

    catalog = "CATALOG_NAME"
    namespace = "NAMESPACE_NAME"
    warehouse_dir = "gs://WAREHOUSE_DIRECTORY"

    Substitua:

    • CATALOG_NAME: um nome de catálogo para referenciar sua tabela do Spark.
    • NAMESPACE_NAME: um rótulo de namespace para referenciar sua tabela do Spark.
    • WAREHOUSE_DIRECTORY: o URI da pasta do Cloud Storage em que o data warehouse é armazenado.
  3. Configure uma sessão do Spark.

    from google.cloud.dataproc_v1 import Session
    
    session = Session()
    session.runtime_config.properties = {
      f"spark.sql.catalog.CATALOG_NAME": "org.apache.iceberg.spark.SparkCatalog",
      f"spark.sql.catalog.CATALOG_NAME.catalog-impl": "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog",
      f"spark.sql.catalog.CATALOG_NAME.gcp_project": "PROJECT_ID",
      f"spark.sql.catalog.CATALOG_NAME.gcp_location": "LOCATION",
      f"spark.sql.catalog.CATALOG_NAME.warehouse": warehouse_dir,
    }

    Substitua:

    • PROJECT_ID: o ID do projeto Google Cloud que está executando o código do Spark.
    • LOCATION: o local para executar o job do Spark.
  4. Crie uma sessão do Spark.

    from google.cloud.dataproc_spark_connect import DataprocSparkSession
    
    spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()
  5. Crie um catálogo e um namespace.

    spark.sql(f"USE `CATALOG_NAME`;")
    spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `NAMESPACE_NAME`;")
    spark.sql(f"USE `NAMESPACE_NAME`;")
  6. Crie uma tabela.

    spark.sql("CREATE OR REPLACE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME ;")

    Substitua:

    • TABLE_NAME: um nome para a tabela Iceberg.
  7. Execute uma linguagem de manipulação de dados (DML) no Spark.

    spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"Hello BigQuery and Spark\");")
    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  8. Execute uma linguagem de definição de dados (DDL) no Spark.

    spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (temperature_fahrenheit int);")
    spark.sql("DESCRIBE TABLE_NAME ;")
  9. Insira dados na tabela.

    spark.sql("INSERT INTO TABLE_NAME  VALUES (1, \"It's a sunny day!\", 83);")
  10. Consulte a tabela no Spark.

    df = spark.sql("SELECT * from TABLE_NAME ;")
    df.show()
  11. Consultar a tabela no console do Google Cloud em um novo conjunto de dados.

    SELECT * FROM `PROJECT_ID.NAMESPACE_NAME.TABLE_NAME` LIMIT 100

A seguir