Executar código PySpark em notebooks do BigQuery Studio

Neste documento, mostramos como executar código PySpark em um notebook Python do BigQuery.

Antes de começar

Se ainda não tiver feito isso, crie um Google Cloud projeto e um bucket do Cloud Storage.

  1. Configurar seu projeto

    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    3. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    6. Crie um bucket do Cloud Storage no seu projeto se você não tiver um para usar.

    7. Configurar o notebook

      1. Credenciais do notebook: por padrão, sua sessão do notebook usa suas credenciais de usuário. Se você quiser especificar credenciais de conta de serviço para sua sessão, ela precisará ter o papel Worker do Dataproc (roles/dataproc.worker). Para mais informações, consulte Conta de serviço do Dataproc sem servidor.
      2. Ambiente de execução do notebook: seu notebook usa um ambiente de execução padrão do Vertex, a menos que você selecione outro ambiente. Se quiser definir seu próprio ambiente de execução, crie-o na página Ambientes de execução no console do Google Cloud .
    8. Preços

      Para informações sobre preços, consulte Preços de tempo de execução do notebook do BigQuery.

      Abrir um notebook Python do BigQuery Studio

      1. No console Google Cloud , acesse a página BigQuery.

        Acessar o BigQuery

      2. Na barra de guias do painel de detalhes, clique na seta ao lado do sinal + e clique em Notebook.

      Criar uma sessão do Spark em um notebook do BigQuery Studio

      Você pode usar um notebook Python do BigQuery Studio para criar uma sessão interativa do Spark Connect. Cada notebook do BigQuery Studio pode ter apenas uma sessão ativa do Dataproc sem servidor associada a ele.

      É possível criar uma sessão do Spark em um notebook Python do BigQuery Studio das seguintes maneiras:

      • Configure e crie uma única sessão no notebook.
      • Configure uma sessão do Spark em um modelo de sessão interativa do Dataproc sem servidor para Spark e use o modelo para configurar e criar uma sessão no notebook. O BigQuery oferece um recurso Query using Spark que ajuda você a começar a codificar a sessão com modelo, conforme explicado na guia Sessão do Spark com modelo.

      Sessão única

      Para criar uma sessão do Spark em um novo notebook, faça o seguinte:

      1. Na barra de guias do painel do editor, clique na seta suspensa ao lado do sinal + e clique em Notebook.

        Captura de tela mostrando a interface do BigQuery com o botão "+" para criar um notebook.
      2. Copie e execute o código a seguir em uma célula do notebook para configurar e criar uma sessão básica do Spark.

      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      
      import pyspark.sql.connect.functions as f
      
      session = Session()
      
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      

      Substitua:

      • APP_NAME: um nome opcional para a sessão.
      • Configurações de sessão opcionais:é possível adicionar configurações da API Dataproc Session para personalizar sua sessão. Confira alguns exemplos:
        • RuntimeConfig:
          Ajuda de código mostrando opções de session.runtime.config.
          • session.runtime_config.properties={spark.property.key1:VALUE_1,...,spark.property.keyN:VALUE_N}
          • session.runtime_config.container_image = path/to/container/image
        • EnvironmentConfig:
          Ajuda de código mostrando opções de configuração de execução, configuração de ambiente de sessão.
          • session.environment_config.execution_config.subnetwork_uri = "SUBNET_NAME"
          • session.environment_config.execution_config.ttl = {"seconds": VALUE}
          • session.environment_config.execution_config.service_account = SERVICE_ACCOUNT

      Sessão do Spark com modelo

      É possível inserir e executar o código em uma célula de notebook para criar uma sessão do Spark com base em um modelo de sessão sem servidor do Dataproc. Todas as configurações de session fornecidas no código do notebook vão substituir as mesmas configurações definidas no modelo de sessão.

      Para começar rapidamente, use o modelo Query using Spark para pré-preencher seu notebook com o código do modelo de sessão do Spark:

      1. Na barra de guias do painel do editor, clique na seta suspensa ao lado do sinal + e clique em Notebook.
        Captura de tela mostrando a interface do BigQuery com o botão "+" para criar um notebook.
      2. Em Começar com um modelo, clique em Consultar usando o Spark e em Usar modelo para inserir o código no notebook.
        Seleções da interface do BigQuery para começar com um modelo
      3. Especifique as variáveis conforme explicado nas Observações.
      4. Você pode excluir outras células de código de exemplo inseridas no notebook.
      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      session = Session()
      # Configure the session with an existing session template.
      session_template = "SESSION_TEMPLATE"
      session.session_template = f"projects/{project}/locations/{location}/sessionTemplates/{session_template}"
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      

      Substitua:

      • PROJECT: seu ID do projeto, que está listado na seção Informações do projeto do painel do consoleGoogle Cloud .
      • LOCATION: a região do Compute Engine em que a sessão do notebook será executada. Se não for fornecido, o local padrão será a região da VM que cria o notebook.
      • SESSION_TEMPLATE: o nome de um modelo de sessão interativa sem servidor do Dataproc. As configurações de configuração da sessão são obtidas do modelo. O modelo também precisa especificar as seguintes configurações:

        • Versão do ambiente de execução 2.3+
        • Tipo de notebook: Spark Connect

          Exemplo:

          Captura de tela mostrando as configurações necessárias do Spark Connect.
      • APP_NAME: um nome opcional para a sessão.

      Escrever e executar código PySpark no notebook do BigQuery Studio

      Depois de criar uma sessão do Spark no notebook, use-a para executar o código do notebook do Spark.

      Suporte à API PySpark do Spark Connect:sua sessão do notebook do Spark Connect é compatível com a maioria das APIs PySpark, incluindo DataFrame, Functions, e Column, mas não é compatível com SparkContext e RDD e outras APIs PySpark. Para mais informações, consulte O que é compatível com o Spark 3.5.

      APIs específicas do Dataproc:o Dataproc simplifica a adição dinâmica de pacotes PyPI à sua sessão do Spark estendendo o método addArtifacts. Você pode especificar a lista no formato version-scheme (semelhante a pip install). Isso instrui o servidor do Spark Connect a instalar pacotes e dependências em todos os nós do cluster, disponibilizando-os para os workers das suas UDFs.

      Exemplo que instala a versão textdistance especificada e as bibliotecas random2 compatíveis mais recentes no cluster para permitir que UDFs usando textdistance e random2 sejam executadas em nós de trabalho.

      spark.addArtifacts("textdistance==4.6.1", "random2", pypi=True)
      

      Ajuda com o código do notebook:o notebook do BigQuery Studio oferece ajuda com o código quando você mantém o ponteiro sobre um nome de classe ou método e ajuda com o preenchimento automático de código enquanto você insere o código.

      No exemplo a seguir, insira DataprocSparkSession. e manter o ponteiro sobre o nome da classe mostra o preenchimento de código e a ajuda da documentação.

      Exemplos de dicas de documentação e conclusão de código.

      Exemplos de PySpark em notebooks do BigQuery Studio

      Esta seção fornece exemplos de notebooks Python do BigQuery Studio com código PySpark para realizar as seguintes tarefas:

      • Execute uma contagem de palavras em um conjunto de dados público de Shakespeare.
      • Crie uma tabela do Iceberg com metadados salvos no metastore do BigLake.

      Wordcount

      O exemplo de Pyspark a seguir cria uma sessão do Spark e conta as ocorrências de palavras em um conjunto de dados público bigquery-public-data.samples.shakespeare.

      # Basic wordcount example
      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      session = Session()
      
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      # Run a wordcount on the public Shakespeare dataset.
      df = spark.read.format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load()
      words_df = df.select(f.explode(f.split(f.col("word"), " ")).alias("word"))
      word_counts_df = words_df.filter(f.col("word") != "").groupBy("word").agg(f.count("*").alias("count")).orderBy("word")
      word_counts_df.show()
      

      Substitua:

      • APP_NAME: um nome opcional para a sessão.

      Saída:

      A saída da célula lista uma amostra da saída de wordcount. Para ver os detalhes da sessão no console Google Cloud , clique no link Visualização detalhada da sessão interativa. Para monitorar sua sessão do Spark, clique em Ver interface do Spark na página de detalhes da sessão.

      Botão "Ver interface do Spark" na página de detalhes da sessão no console
      Interactive Session Detail View: LINK
      +------------+-----+
      |        word|count|
      +------------+-----+
      |           '|   42|
      |       ''All|    1|
      |     ''Among|    1|
      |       ''And|    1|
      |       ''But|    1|
      |    ''Gamut'|    1|
      |       ''How|    1|
      |        ''Lo|    1|
      |      ''Look|    1|
      |        ''My|    1|
      |       ''Now|    1|
      |         ''O|    1|
      |      ''Od's|    1|
      |       ''The|    1|
      |       ''Tis|    4|
      |      ''When|    1|
      |       ''tis|    1|
      |      ''twas|    1|
      |          'A|   10|
      |'ARTEMIDORUS|    1|
      +------------+-----+
      only showing top 20 rows
      

      Tabela Iceberg

      Executar código do PySpark para criar uma tabela do Iceberg com metadados do metastore do BigLake

      O exemplo de código a seguir cria um sample_iceberg_table com metadados da tabela armazenados no metastore do BigLake e, em seguida, consulta a tabela.

      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      # Create the Dataproc Serverless session.
      session = Session()
      # Set the session configuration for BigLake Metastore with the Iceberg environment.
      project = "PROJECT"
      region = "REGION"
      subnet_name = "SUBNET_NAME"
      location = "LOCATION"
      session.environment_config.execution_config.subnetwork_uri = f"{subnet_name}"
      warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY"
      catalog = "CATALOG_NAME"
      namespace = "NAMESPACE"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}"] = "org.apache.iceberg.spark.SparkCatalog"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_project"] = f"{project_id}"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_location"] = f"{location}"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.warehouse"] = f"{warehouse_dir}"
      # Create the Spark Connect session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      # Create the namespace in BigQuery.
      spark.sql(f"USE `{catalog}`;")
      spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;")
      spark.sql(f"USE `{namespace}`;")
      # Create the Iceberg table.
      spark.sql("DROP TABLE IF EXISTS `sample_iceberg_table`");
      spark.sql("CREATE TABLE sample_iceberg_table (id int, data string) USING ICEBERG;")
      spark.sql("DESCRIBE sample_iceberg_table;")
      # Insert table data and query the table.
      spark.sql("INSERT INTO sample_iceberg_table VALUES (1, \"first row\");")
      # Alter table, then query and display table data and schema.
      spark.sql("ALTER TABLE sample_iceberg_table ADD COLUMNS (newDoubleCol double);")
      spark.sql("DESCRIBE sample_iceberg_table;")
      df = spark.sql("SELECT * FROM sample_iceberg_table")
      df.show()
      df.printSchema()
      

      Observações:

      • PROJECT: seu ID do projeto, que está listado na seção Informações do projeto do painel do consoleGoogle Cloud .
      • REGION e SUBNET_NAME: especifique a região do Compute Engine e o nome de uma sub-rede na região da sessão. O Dataproc sem servidor ativa o Acesso privado do Google (PGA) na sub-rede especificada.
      • LOCATION: o BigQuery_metastore_config.location e o spark.sql.catalog.{catalog}.gcp_location padrão são US, mas você pode escolher qualquer local do BigQuery compatível.
      • BUCKET e WAREHOUSE_DIRECTORY: o bucket e a pasta do Cloud Storage usados para o diretório do data warehouse do Iceberg.
      • CATALOG_NAME e NAMESPACE: o nome do catálogo e o namespace do Iceberg se combinam para identificar a tabela do Iceberg (catalog.namespace.table_name).
      • APP_NAME: um nome opcional para a sessão.

      A saída da célula lista o sample_iceberg_table com a coluna adicionada e mostra um link para a página Detalhes da sessão interativa no console do Google Cloud . Clique em Ver interface do Spark na página de detalhes da sessão para monitorar sua sessão do Spark.

      Interactive Session Detail View: LINK
      +---+---------+------------+
      | id|     data|newDoubleCol|
      +---+---------+------------+
      |  1|first row|        NULL|
      +---+---------+------------+
      
      root
       |-- id: integer (nullable = true)
       |-- data: string (nullable = true)
       |-- newDoubleCol: double (nullable = true)
      

      Ver detalhes da tabela no BigQuery

      Siga estas etapas para verificar os detalhes da tabela do Iceberg no BigQuery:

      1. No console Google Cloud , acesse a página BigQuery.

        Acessar o BigQuery

      2. No painel de recursos do projeto, clique no projeto e depois no namespace para listar a tabela sample_iceberg_table. Clique na tabela Detalhes para ver as informações de Abrir configuração da tabela do catálogo.

        Os formatos de entrada e saída são os formatos de classe padrão InputFormat e OutputFormat do Hadoop usados pelo Iceberg.

        Metadados da tabela Iceberg listados na interface do BigQuery

      Outros exemplos

      Crie um Spark DataFrame (sdf) de um DataFrame do Pandas (df).

      sdf = spark.createDataFrame(df)
      sdf.show()
      

      Executar agregações no Spark DataFrames.

      from pyspark.sql import functions as F
      
      sdf.groupby("segment").agg(
         F.mean("total_spend_per_user").alias("avg_order_value"),
         F.approx_count_distinct("user_id").alias("unique_customers")
      ).show()
      

      Leia do BigQuery usando o conector Spark-BigQuery.

      spark.conf.set("viewsEnabled","true")
      spark.conf.set("materializationDataset","my-bigquery-dataset")
      
      sdf = spark.read.format('bigquery') \
       .load(query)
      

      Escrever código Spark com o Gemini Code Assist

      Você pode pedir ao Gemini Code Assist para gerar código PySpark no seu notebook. O Gemini Code Assist busca e usa tabelas relevantes do BigQuery e do Dataproc Metastore e os respectivos esquemas para gerar uma resposta de código.

      Para gerar código do Gemini Code Assist no seu notebook, faça o seguinte:

      1. Clique em + Código na barra de ferramentas para inserir uma nova célula de código. A nova célula de código mostra Start coding or generate with AI. Clique em Gerar.

      2. No editor de geração, insira um comando de linguagem natural e clique em enter. Não se esqueça de incluir a palavra-chave spark ou pyspark no comando.

        Exemplo de comando:

        create a spark dataframe from order_items and filter to orders created in 2024
        

        Exemplo de resposta:

        spark.read.format("bigquery").option("table", "sqlgen-testing.pysparkeval_ecommerce.order_items").load().filter("year(created_at) = 2024").createOrReplaceTempView("order_items")
        df = spark.sql("SELECT * FROM order_items")
        

      Dicas para a geração de código do Gemini Code Assist

      • Para permitir que o Gemini Code Assist busque tabelas e esquemas relevantes, ative a Sincronização do Data Catalog para instâncias do Dataproc Metastore.

      • Verifique se a conta de usuário tem acesso ao Data Catalog e às tabelas de consulta. Para fazer isso, atribua o papel DataCatalog.Viewer.

      Encerrar a sessão do Spark

      Você pode realizar qualquer uma das seguintes ações para interromper a sessão do Spark Connect no notebook do BigQuery Studio:

      • Execute spark.stop() em uma célula do notebook.
      • Encerre o ambiente de execução no notebook:
        1. Clique no seletor de tempo de execução e em Gerenciar sessões.
          Gerenciar a seleção de sessões
        2. Na caixa de diálogo Sessões ativas, clique no ícone de encerramento e em Encerrar.
          Encerrar a seleção de sessão na caixa de diálogo "Sessões ativas"

      Orquestrar o código do notebook do BigQuery Studio

      É possível orquestrar o código do notebook do BigQuery Studio das seguintes maneiras:

      Programar código de notebook no console Google Cloud

      É possível programar o código do notebook das seguintes maneiras:

      • Programe o notebook.
      • Se a execução do código do notebook fizer parte de um fluxo de trabalho, programe o notebook como parte de um pipeline.

      Executar código de notebook como uma carga de trabalho em lote do Dataproc sem servidor

      Siga estas etapas para executar o código do notebook do BigQuery Studio como uma carga de trabalho em lote sem servidor do Dataproc.

      1. Faça o download do código do notebook em um arquivo em um terminal local ou no Cloud Shell.

        1. Abra o notebook no painel Explorer na página do BigQuery Studio no console Google Cloud .

        2. Para fazer o download do código do notebook, selecione Download no menu Arquivo e escolha Download .py.

          Menu "Arquivo" > "Download" na página do Explorer.
      2. Gerar requirements.txt.

        1. Instale o pipreqs no diretório em que você salvou o arquivo .py.
          pip install pipreqs
          
        2. Execute pipreqs para gerar requirements.txt.

          pipreqs filename.py
          

        3. Use a Google Cloud CLI para copiar o arquivo requirements.txt local para um bucket no Cloud Storage.

          gcloud storage cp requirements.txt gs://BUCKET/
          
      3. Atualize o código da sessão do Spark editando o arquivo .py baixado.

        1. Remova ou comente todos os comandos de script shell.

        2. Remova o código que configura a sessão do Spark e especifique os parâmetros de configuração como parâmetros de envio de carga de trabalho em lote. Consulte Enviar uma carga de trabalho em lote do Spark.

          Exemplo:

          • Remova a seguinte linha de configuração de sub-rede da sessão do código:

            session.environment_config.execution_config.subnetwork_uri = "{subnet_name}"
            

          • Ao executar sua carga de trabalho em lote, use a flag --subnet para especificar a sub-rede.

            gcloud dataproc batches submit pyspark \
            --subnet=SUBNET_NAME
            
        3. Use um snippet de código simples para criar uma sessão.

          • Exemplo de código de notebook baixado antes da simplificação.

            from google.cloud.dataproc_spark_connect import DataprocSparkSession
            from google.cloud.dataproc_v1 import Session
            

            session = Session() spark = DataprocSparkSession \     .builder \     .appName("CustomSparkSession")     .dataprocSessionConfig(session) \     .getOrCreate()

          • Código da carga de trabalho em lote após a simplificação.

            from pyspark.sql import SparkSession
            

            spark = SparkSession \ .builder \ .getOrCreate()

      4. Execute a carga de trabalho em lote.

        1. Consulte Enviar a carga de trabalho em lote do Spark para instruções.

          • Inclua a flag --deps-bucket para apontar para o bucket do Cloud Storage que contém o arquivo requirements.txt.

            Exemplo:

          gcloud dataproc batches submit pyspark FILENAME.py \
              --region=REGION \
              --deps-bucket=BUCKET \
              --version=2.3 
          

          Observações:

          • FILENAME: o nome do arquivo de código do notebook baixado e editado.
          • REGION: a região do Compute Engine em que o cluster está localizado.
          • BUCKET O nome do bucket do Cloud Storage que contém o arquivo requirements.txt.
          • --version: a versão 2.3 do ambiente de execução do Spark é selecionada para executar a carga de trabalho em lote.
      5. Confirme o código.

        1. Depois de testar o código da carga de trabalho em lote, faça commit do arquivo .ipynb ou .py no repositório usando o cliente git, como GitHub, GitLab ou Bitbucket, como parte do pipeline de CI/CD.
      6. Programe sua carga de trabalho em lote com o Cloud Composer.

        1. Consulte Executar cargas de trabalho do Dataproc sem servidor com o Cloud Composer para instruções.

      Resolver problemas em notebooks

      Se ocorrer uma falha em uma célula que contém código do Spark, clique no link Visualização detalhada da sessão interativa na saída da célula para resolver o problema. Consulte os exemplos de contagem de palavras e tabela do Iceberg.

      Problemas conhecidos e soluções

      Erro: um ambiente de execução de notebook criado com a versão 3.10 do Python pode causar um erro PYTHON_VERSION_MISMATCH ao tentar se conectar à sessão do Spark.

      Solução: recrie o ambiente de execução com a versão 3.11 do Python.

      A seguir