Executar código PySpark em notebooks do BigQuery Studio
Neste documento, mostramos como executar código PySpark em um notebook Python do BigQuery.
Antes de começar
Se ainda não tiver feito isso, crie um Google Cloud projeto e um bucket do Cloud Storage.
Configurar seu projeto
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
Crie um bucket do Cloud Storage no seu projeto se você não tiver um para usar.
Configurar o notebook
- Credenciais do notebook: por padrão, sua sessão do notebook usa suas credenciais de usuário. Se você quiser especificar credenciais de conta de serviço para sua sessão, ela precisará ter o papel Worker do Dataproc (
roles/dataproc.worker
). Para mais informações, consulte Conta de serviço do Dataproc sem servidor. - Ambiente de execução do notebook: seu notebook usa um ambiente de execução padrão do Vertex, a menos que você selecione outro ambiente. Se quiser definir seu próprio ambiente de execução, crie-o na página Ambientes de execução no console do Google Cloud .
- Credenciais do notebook: por padrão, sua sessão do notebook usa suas credenciais de usuário. Se você quiser especificar credenciais de conta de serviço para sua sessão, ela precisará ter o papel Worker do Dataproc (
No console Google Cloud , acesse a página BigQuery.
Na barra de guias do painel de detalhes, clique na seta
ao lado do sinal + e clique em Notebook.- Configure e crie uma única sessão no notebook.
- Configure uma sessão do Spark em um
modelo de sessão interativa do Dataproc sem servidor para Spark
e use o modelo para configurar e criar uma sessão no notebook.
O BigQuery oferece um recurso
Query using Spark
que ajuda você a começar a codificar a sessão com modelo, conforme explicado na guia Sessão do Spark com modelo. Na barra de guias do painel do editor, clique na
seta suspensa ao lado do sinal + e clique em Notebook.Copie e execute o código a seguir em uma célula do notebook para configurar e criar uma sessão básica do Spark.
- APP_NAME: um nome opcional para a sessão.
- Configurações de sessão opcionais:é possível adicionar configurações da API Dataproc
Session
para personalizar sua sessão. Confira alguns exemplos:RuntimeConfig
:session.runtime_config.properties={spark.property.key1:VALUE_1,...,spark.property.keyN:VALUE_N}
session.runtime_config.container_image = path/to/container/image
EnvironmentConfig
:- session.environment_config.execution_config.subnetwork_uri = "SUBNET_NAME"
session.environment_config.execution_config.ttl = {"seconds": VALUE}
session.environment_config.execution_config.service_account = SERVICE_ACCOUNT
- Na barra de guias do painel do editor, clique na
- Em Começar com um modelo, clique em Consultar usando o Spark e em
Usar modelo para inserir o código no notebook.
- Especifique as variáveis conforme explicado nas Observações.
- Você pode excluir outras células de código de exemplo inseridas no notebook.
- PROJECT: seu ID do projeto, que está listado na seção Informações do projeto do painel do consoleGoogle Cloud .
- LOCATION: a região do Compute Engine em que a sessão do notebook será executada. Se não for fornecido, o local padrão será a região da VM que cria o notebook.
SESSION_TEMPLATE: o nome de um modelo de sessão interativa sem servidor do Dataproc. As configurações de configuração da sessão são obtidas do modelo. O modelo também precisa especificar as seguintes configurações:
- Versão do ambiente de execução
2.3
+ Tipo de notebook:
Spark Connect
Exemplo:
- Versão do ambiente de execução
APP_NAME: um nome opcional para a sessão.
- Execute uma contagem de palavras em um conjunto de dados público de Shakespeare.
- Crie uma tabela do Iceberg com metadados salvos no metastore do BigLake.
- APP_NAME: um nome opcional para a sessão.
- PROJECT: seu ID do projeto, que está listado na seção Informações do projeto do painel do consoleGoogle Cloud .
- REGION e SUBNET_NAME: especifique a região do Compute Engine e o nome de uma sub-rede na região da sessão. O Dataproc sem servidor ativa o Acesso privado do Google (PGA) na sub-rede especificada.
- LOCATION: o
BigQuery_metastore_config.location
e ospark.sql.catalog.{catalog}.gcp_location
padrão sãoUS
, mas você pode escolher qualquer local do BigQuery compatível. - BUCKET e WAREHOUSE_DIRECTORY: o bucket e a pasta do Cloud Storage usados para o diretório do data warehouse do Iceberg.
- CATALOG_NAME e NAMESPACE: o nome do catálogo e o namespace do Iceberg se combinam para identificar a tabela do Iceberg (
catalog.namespace.table_name
). - APP_NAME: um nome opcional para a sessão.
No console Google Cloud , acesse a página BigQuery.
No painel de recursos do projeto, clique no projeto e depois no namespace para listar a tabela
sample_iceberg_table
. Clique na tabela Detalhes para ver as informações de Abrir configuração da tabela do catálogo.Os formatos de entrada e saída são os formatos de classe padrão
InputFormat
eOutputFormat
do Hadoop usados pelo Iceberg.Clique em + Código na barra de ferramentas para inserir uma nova célula de código. A nova célula de código mostra
Start coding or generate with AI
. Clique em Gerar.No editor de geração, insira um comando de linguagem natural e clique em
enter
. Não se esqueça de incluir a palavra-chavespark
oupyspark
no comando.Exemplo de comando:
create a spark dataframe from order_items and filter to orders created in 2024
Exemplo de resposta:
spark.read.format("bigquery").option("table", "sqlgen-testing.pysparkeval_ecommerce.order_items").load().filter("year(created_at) = 2024").createOrReplaceTempView("order_items") df = spark.sql("SELECT * FROM order_items")
Para permitir que o Gemini Code Assist busque tabelas e esquemas relevantes, ative a Sincronização do Data Catalog para instâncias do Dataproc Metastore.
Verifique se a conta de usuário tem acesso ao Data Catalog e às tabelas de consulta. Para fazer isso, atribua o papel
DataCatalog.Viewer
.- Execute
spark.stop()
em uma célula do notebook. - Encerre o ambiente de execução no notebook:
- Clique no seletor de tempo de execução e em Gerenciar sessões.
- Na caixa de diálogo Sessões ativas, clique no ícone de encerramento e em Encerrar.
- Clique no seletor de tempo de execução e em Gerenciar sessões.
Programe o código do notebook no console Google Cloud (preços dos notebooks aplicáveis).
Execute o código do notebook como uma carga de trabalho em lote sem servidor do Dataproc (os preços do Dataproc sem servidor se aplicam).
- Programe o notebook.
- Se a execução do código do notebook fizer parte de um fluxo de trabalho, programe o notebook como parte de um pipeline.
Faça o download do código do notebook em um arquivo em um terminal local ou no Cloud Shell.
Abra o notebook no painel Explorer na página do BigQuery Studio no console Google Cloud .
Para fazer o download do código do notebook, selecione Download no menu Arquivo e escolha
Download .py
.
Gerar
requirements.txt
.- Instale o
pipreqs
no diretório em que você salvou o arquivo.py
.pip install pipreqs
Execute
pipreqs
para gerarrequirements.txt
.pipreqs filename.py
Use a Google Cloud CLI para copiar o arquivo
requirements.txt
local para um bucket no Cloud Storage.gcloud storage cp requirements.txt gs://BUCKET/
- Instale o
Atualize o código da sessão do Spark editando o arquivo
.py
baixado.Remova ou comente todos os comandos de script shell.
Remova o código que configura a sessão do Spark e especifique os parâmetros de configuração como parâmetros de envio de carga de trabalho em lote. Consulte Enviar uma carga de trabalho em lote do Spark.
Exemplo:
Remova a seguinte linha de configuração de sub-rede da sessão do código:
session.environment_config.execution_config.subnetwork_uri = "{subnet_name}"
Ao executar sua carga de trabalho em lote, use a flag
--subnet
para especificar a sub-rede.gcloud dataproc batches submit pyspark \ --subnet=SUBNET_NAME
Use um snippet de código simples para criar uma sessão.
Exemplo de código de notebook baixado antes da simplificação.
from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session
session = Session() spark = DataprocSparkSession \ .builder \ .appName("CustomSparkSession") .dataprocSessionConfig(session) \ .getOrCreate()
Código da carga de trabalho em lote após a simplificação.
from pyspark.sql import SparkSession
spark = SparkSession \ .builder \ .getOrCreate()
Execute a carga de trabalho em lote.
Consulte Enviar a carga de trabalho em lote do Spark para instruções.
Inclua a flag --deps-bucket para apontar para o bucket do Cloud Storage que contém o arquivo
requirements.txt
.Exemplo:
gcloud dataproc batches submit pyspark FILENAME.py \ --region=REGION \ --deps-bucket=BUCKET \ --version=2.3
Observações:
- FILENAME: o nome do arquivo de código do notebook baixado e editado.
- REGION: a região do Compute Engine em que o cluster está localizado.
- BUCKET O nome do bucket do Cloud Storage
que contém o arquivo
requirements.txt
. --version
: a versão 2.3 do ambiente de execução do Spark é selecionada para executar a carga de trabalho em lote.
Confirme o código.
- Depois de testar o código da carga de trabalho em lote, faça commit do arquivo
.ipynb
ou.py
no repositório usando o clientegit
, como GitHub, GitLab ou Bitbucket, como parte do pipeline de CI/CD.
- Depois de testar o código da carga de trabalho em lote, faça commit do arquivo
Programe sua carga de trabalho em lote com o Cloud Composer.
- Consulte Executar cargas de trabalho do Dataproc sem servidor com o Cloud Composer para instruções.
- Demonstração em vídeo do YouTube: Como aproveitar o poder do Apache Spark integrado ao BigQuery.
- Usar o metastore do BigLake com o Dataproc
- Usar o metastore BigLake com o Dataproc sem servidor
Preços
Para informações sobre preços, consulte Preços de tempo de execução do notebook do BigQuery.
Abrir um notebook Python do BigQuery Studio
Criar uma sessão do Spark em um notebook do BigQuery Studio
Você pode usar um notebook Python do BigQuery Studio para criar uma sessão interativa do Spark Connect. Cada notebook do BigQuery Studio pode ter apenas uma sessão ativa do Dataproc sem servidor associada a ele.
É possível criar uma sessão do Spark em um notebook Python do BigQuery Studio das seguintes maneiras:
Sessão única
Para criar uma sessão do Spark em um novo notebook, faça o seguinte:
from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session import pyspark.sql.connect.functions as f session = Session() # Create the Spark session. spark = ( DataprocSparkSession.builder .appName("APP_NAME") .dataprocSessionConfig(session) .getOrCreate() )
Substitua:
Sessão do Spark com modelo
É possível inserir e executar o código em uma célula de notebook para criar uma sessão do Spark com base em um modelo de sessão sem servidor do Dataproc. Todas as configurações de
session
fornecidas no código do notebook vão substituir as mesmas configurações definidas no modelo de sessão.Para começar rapidamente, use o modelo
Query using Spark
para pré-preencher seu notebook com o código do modelo de sessão do Spark:from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session import pyspark.sql.connect.functions as f session = Session() # Configure the session with an existing session template. session_template = "SESSION_TEMPLATE" session.session_template = f"projects/{project}/locations/{location}/sessionTemplates/{session_template}" # Create the Spark session. spark = ( DataprocSparkSession.builder .appName("APP_NAME") .dataprocSessionConfig(session) .getOrCreate() )
Escrever e executar código PySpark no notebook do BigQuery Studio
Depois de criar uma sessão do Spark no notebook, use-a para executar o código do notebook do Spark.
Suporte à API PySpark do Spark Connect:sua sessão do notebook do Spark Connect é compatível com a maioria das APIs PySpark, incluindo DataFrame, Functions, e Column, mas não é compatível com SparkContext e RDD e outras APIs PySpark. Para mais informações, consulte O que é compatível com o Spark 3.5.
APIs específicas do Dataproc:o Dataproc simplifica a adição dinâmica de pacotes
PyPI
à sua sessão do Spark estendendo o métodoaddArtifacts
. Você pode especificar a lista no formatoversion-scheme
(semelhante apip install
). Isso instrui o servidor do Spark Connect a instalar pacotes e dependências em todos os nós do cluster, disponibilizando-os para os workers das suas UDFs.Exemplo que instala a versão
textdistance
especificada e as bibliotecasrandom2
compatíveis mais recentes no cluster para permitir que UDFs usandotextdistance
erandom2
sejam executadas em nós de trabalho.spark.addArtifacts("textdistance==4.6.1", "random2", pypi=True)
Ajuda com o código do notebook:o notebook do BigQuery Studio oferece ajuda com o código quando você mantém o ponteiro sobre um nome de classe ou método e ajuda com o preenchimento automático de código enquanto você insere o código.
No exemplo a seguir, insira
DataprocSparkSession
. e manter o ponteiro sobre o nome da classe mostra o preenchimento de código e a ajuda da documentação.Exemplos de PySpark em notebooks do BigQuery Studio
Esta seção fornece exemplos de notebooks Python do BigQuery Studio com código PySpark para realizar as seguintes tarefas:
Wordcount
O exemplo de Pyspark a seguir cria uma sessão do Spark e conta as ocorrências de palavras em um conjunto de dados público
bigquery-public-data.samples.shakespeare
.# Basic wordcount example from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session import pyspark.sql.connect.functions as f session = Session() # Create the Spark session. spark = ( DataprocSparkSession.builder .appName("APP_NAME") .dataprocSessionConfig(session) .getOrCreate() ) # Run a wordcount on the public Shakespeare dataset. df = spark.read.format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load() words_df = df.select(f.explode(f.split(f.col("word"), " ")).alias("word")) word_counts_df = words_df.filter(f.col("word") != "").groupBy("word").agg(f.count("*").alias("count")).orderBy("word") word_counts_df.show()
Substitua:
Saída:
A saída da célula lista uma amostra da saída de wordcount. Para ver os detalhes da sessão no console Google Cloud , clique no link Visualização detalhada da sessão interativa. Para monitorar sua sessão do Spark, clique em Ver interface do Spark na página de detalhes da sessão.
Interactive Session Detail View: LINK +------------+-----+ | word|count| +------------+-----+ | '| 42| | ''All| 1| | ''Among| 1| | ''And| 1| | ''But| 1| | ''Gamut'| 1| | ''How| 1| | ''Lo| 1| | ''Look| 1| | ''My| 1| | ''Now| 1| | ''O| 1| | ''Od's| 1| | ''The| 1| | ''Tis| 4| | ''When| 1| | ''tis| 1| | ''twas| 1| | 'A| 10| |'ARTEMIDORUS| 1| +------------+-----+ only showing top 20 rows
Tabela Iceberg
Executar código do PySpark para criar uma tabela do Iceberg com metadados do metastore do BigLake
O exemplo de código a seguir cria um
sample_iceberg_table
com metadados da tabela armazenados no metastore do BigLake e, em seguida, consulta a tabela.from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session import pyspark.sql.connect.functions as f # Create the Dataproc Serverless session. session = Session() # Set the session configuration for BigLake Metastore with the Iceberg environment. project = "PROJECT" region = "REGION" subnet_name = "SUBNET_NAME" location = "LOCATION" session.environment_config.execution_config.subnetwork_uri = f"{subnet_name}" warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY" catalog = "CATALOG_NAME" namespace = "NAMESPACE" session.runtime_config.properties[f"spark.sql.catalog.{catalog}"] = "org.apache.iceberg.spark.SparkCatalog" session.runtime_config.properties[f"spark.sql.catalog.{catalog}.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog" session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_project"] = f"{project_id}" session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_location"] = f"{location}" session.runtime_config.properties[f"spark.sql.catalog.{catalog}.warehouse"] = f"{warehouse_dir}" # Create the Spark Connect session. spark = ( DataprocSparkSession.builder .appName("APP_NAME") .dataprocSessionConfig(session) .getOrCreate() ) # Create the namespace in BigQuery. spark.sql(f"USE `{catalog}`;") spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;") spark.sql(f"USE `{namespace}`;") # Create the Iceberg table. spark.sql("DROP TABLE IF EXISTS `sample_iceberg_table`"); spark.sql("CREATE TABLE sample_iceberg_table (id int, data string) USING ICEBERG;") spark.sql("DESCRIBE sample_iceberg_table;") # Insert table data and query the table. spark.sql("INSERT INTO sample_iceberg_table VALUES (1, \"first row\");") # Alter table, then query and display table data and schema. spark.sql("ALTER TABLE sample_iceberg_table ADD COLUMNS (newDoubleCol double);") spark.sql("DESCRIBE sample_iceberg_table;") df = spark.sql("SELECT * FROM sample_iceberg_table") df.show() df.printSchema()
Observações:
A saída da célula lista o
sample_iceberg_table
com a coluna adicionada e mostra um link para a página Detalhes da sessão interativa no console do Google Cloud . Clique em Ver interface do Spark na página de detalhes da sessão para monitorar sua sessão do Spark.Interactive Session Detail View: LINK +---+---------+------------+ | id| data|newDoubleCol| +---+---------+------------+ | 1|first row| NULL| +---+---------+------------+ root |-- id: integer (nullable = true) |-- data: string (nullable = true) |-- newDoubleCol: double (nullable = true)
Ver detalhes da tabela no BigQuery
Siga estas etapas para verificar os detalhes da tabela do Iceberg no BigQuery:
Outros exemplos
Crie um Spark
DataFrame
(sdf
) de um DataFrame do Pandas (df
).sdf = spark.createDataFrame(df) sdf.show()
Executar agregações no Spark
DataFrames
.from pyspark.sql import functions as F sdf.groupby("segment").agg( F.mean("total_spend_per_user").alias("avg_order_value"), F.approx_count_distinct("user_id").alias("unique_customers") ).show()
Leia do BigQuery usando o conector Spark-BigQuery.
spark.conf.set("viewsEnabled","true") spark.conf.set("materializationDataset","my-bigquery-dataset") sdf = spark.read.format('bigquery') \ .load(query)
Escrever código Spark com o Gemini Code Assist
Você pode pedir ao Gemini Code Assist para gerar código PySpark no seu notebook. O Gemini Code Assist busca e usa tabelas relevantes do BigQuery e do Dataproc Metastore e os respectivos esquemas para gerar uma resposta de código.
Para gerar código do Gemini Code Assist no seu notebook, faça o seguinte:
Dicas para a geração de código do Gemini Code Assist
Encerrar a sessão do Spark
Você pode realizar qualquer uma das seguintes ações para interromper a sessão do Spark Connect no notebook do BigQuery Studio:
Orquestrar o código do notebook do BigQuery Studio
É possível orquestrar o código do notebook do BigQuery Studio das seguintes maneiras:
Programar código de notebook no console Google Cloud
É possível programar o código do notebook das seguintes maneiras:
Executar código de notebook como uma carga de trabalho em lote do Dataproc sem servidor
Siga estas etapas para executar o código do notebook do BigQuery Studio como uma carga de trabalho em lote sem servidor do Dataproc.
Resolver problemas em notebooks
Se ocorrer uma falha em uma célula que contém código do Spark, clique no link Visualização detalhada da sessão interativa na saída da célula para resolver o problema. Consulte os exemplos de contagem de palavras e tabela do Iceberg.
Problemas conhecidos e soluções
Erro: um ambiente de execução de notebook criado com a versão
3.10
do Python pode causar um erroPYTHON_VERSION_MISMATCH
ao tentar se conectar à sessão do Spark.Solução: recrie o ambiente de execução com a versão
3.11
do Python.A seguir