O spark-bigquery-conector é usado com o Apache Spark para ler e gravar dados do e para o BigQuery. O conector usa a API BigQuery Storage ao ler dados do BigQuery.
Este tutorial fornece informações sobre a disponibilidade do conector pré-instalado e mostra como disponibilizar uma versão específica do conector para trabalhos do Spark. O exemplo de código mostra como usar o conector do Spark BigQuery em um aplicativo Spark.
Usar o conector pré-instalado
O conector do Spark BigQuery é pré-instalado e está disponível para
jobs do Spark executados em clusters do Dataproc criados com versões de imagem
2.1
e mais recentes. A versão do conector pré-instalado é listada na página de lançamento de cada versão
da imagem. Por exemplo, a linha BigQuery Connector na página
Versões de lançamento da imagem 2.2.x
mostra a versão do conector instalada nas versões
mais recentes da imagem 2.2.
Disponibilizar uma versão específica do conector para jobs do Spark
Se você quiser usar uma versão de conector diferente da versão pré-instalada em um cluster de versão de imagem 2.1
ou mais recente ou se quiser instalar o conector em um cluster de versão de imagem anterior à 2.1
, siga as instruções desta seção.
Importante:a versão do spark-bigquery-connector
precisa ser compatível com a versão da imagem do cluster do Dataproc. Consulte a
matriz de compatibilidade do conector com a imagem do Dataproc.
Clusters de versão de imagem 2.1
e mais recentes
Ao criar um cluster do Dataproc
com uma versão de imagem 2.1
ou mais recente, especifique a
versão do conector como metadados do cluster.
Exemplo da CLI gcloud:
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=2.2 \ --metadata=SPARK_BQ_CONNECTOR_VERSION or SPARK_BQ_CONNECTOR_URL\ other flags
Observações:
SPARK_BQ_CONNECTOR_VERSION: especifica uma versão do conector. As versões do conector Spark BigQuery estão listadas na página spark-bigquery-connector/releases no GitHub.
Exemplo:
--metadata=SPARK_BQ_CONNECTOR_VERSION=0.42.1
SPARK_BQ_CONNECTOR_URL: especifique um URL que aponte para o JAR no Cloud Storage. É possível especificar o URL de um conector listado na coluna link em Downloading and Using the Connector no GitHub ou o caminho para um local do Cloud Storage em que você colocou um JAR de conector personalizado.
Exemplos:
--metadata=SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar --metadata=SPARK_BQ_CONNECTOR_URL=gs://PATH_TO_CUSTOM_JAR
2.0
e clusters de versões de imagem anteriores
É possível disponibilizar o conector do Spark BigQuery para seu aplicativo de uma das seguintes maneiras:
Instale o spark-bigquery-connector no diretório jars do Spark de cada nó usando a ação de inicialização dos conectores do Dataproc ao criar o cluster.
Forneça o URL do jar do conector ao enviar o job para o cluster usando o console do Google Cloud, a CLI gcloud ou a API Dataproc.
Console
Use o item Jars files do job do Spark na página Enviar um job do Dataproc.
gcloud
API
Use o campo
SparkJob.jarFileUris
.Como especificar o jar do conector ao executar jobs do Spark em clusters de versões de imagem anteriores à 2.0
- Especifique o jar do conector substituindo as informações da versão do Scala e do conector
na seguinte string de URI:
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
- Usar o Scala
2.12
com as versões de imagem do Dataproc1.5+
Exemplo da CLI gcloud:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job args
- Use o Scala
2.11
com as versões de imagem do Dataproc1.4
e anteriores: Exemplo da CLI gcloud:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
- Especifique o jar do conector substituindo as informações da versão do Scala e do conector
na seguinte string de URI:
Inclua o jar do conector no aplicativo Scala ou Java Spark como uma dependência. Consulte Como compilar com o conector.
Calcular custos
Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:
- Dataproc
- BigQuery
- Cloud Storage
Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços.
Ler e gravar dados no BigQuery
Este exemplo lê dados do BigQuery em um DataFrame do Spark para executar uma contagem de palavras usando a API de origem de dados padrão.
O conector grava os dados no BigQuery primeiro armazenando-os em buffer em uma tabela temporária do Cloud Storage. Em seguida, ele copia todos os dados do BigQuery em uma única operação. O conector tenta excluir os arquivos temporários depois que a operação de carregamento do BigQuery for bem-sucedida e mais uma vez quando o aplicativo Spark é encerrado.
Se o job falhar, remova todos os arquivos temporários
do Cloud Storage restantes. Normalmente, os arquivos temporários do BigQuery estão localizados em gs://[bucket]/.spark-bigquery-[jobid]-[UUID]
.
Configurar o faturamento
Por padrão, o projeto associado às credenciais ou à conta de serviço é
cobrado pelo uso da API. Para faturar um projeto diferente, defina a seguinte configuração: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Ele também pode ser adicionado a uma operação de leitura/gravação, da seguinte maneira: .option("parentProject", "<BILLED-GCP-PROJECT>")
.
Executar o código
Antes de executar este exemplo, crie um conjunto de dados chamado "wordcount_dataset" ou mude o conjunto de dados de saída no código para um conjunto de dados do BigQuery no projeto Google Cloud .
Use o comando bq para criar o wordcount_dataset
:
bq mk wordcount_dataset
Use o comando Google Cloud CLI para criar um bucket do Cloud Storage, que será usado para exportar para o BigQuery:
gcloud storage buckets create gs://[bucket]
Scala
- Examine o código e substitua o marcador [bucket] pelo bucket do Cloud Storage criado anteriormente.
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = (spark.read.format("bigquery") .option("table","bigquery-public-data:samples.shakespeare") .load() .cache()) wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .option("table","wordcount_dataset.wordcount_output") .save())
- Executar o código no seu cluster
- Use o SSH para se conectar ao nó mestre do cluster do Dataproc.
- Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do cluster.
- Na página >Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em
SSH
à direita do nome do nó mestre do cluster
Uma janela do navegador é aberta no diretório principal do nó mestre.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do cluster.
- Crie
wordcount.scala
com o editor de textovi
,vim
ounano
pré-instalado e cole o código da lista de códigos Scalanano wordcount.scala
- Inicie o REPL
spark-shell
.$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
- Execute o wordcount.scala com o comando
:load wordcount.scala
para criar a tabelawordcount_output
do BigQuery. A listagem de saída exibe 20 linhas a partir da saída de wordcount.:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para visualizar a tabela de saída, abra a páginaBigQuery
, selecione a tabelawordcount_output
e clique em Visualizar.
- Use o SSH para se conectar ao nó mestre do cluster do Dataproc.
PySpark
- Examine o código e substitua o marcador [bucket] pelo bucket do Cloud Storage criado anteriormente.
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Execute o código no cluster
- Use o SSH para se conectar ao nó mestre do cluster do Dataproc.
- Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do cluster.
- Na página Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em
SSH
à direita do nome do nó mestre do cluster
Uma janela do navegador é aberta no diretório principal do nó mestre.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do cluster.
- Crie
wordcount.py
com o editor de textovi
,vim
ounano
pré-instalado e cole o código PySpark da lista de códigos PySparknano wordcount.py
- Execute a contagem de palavras com
spark-submit
para criar a tabelawordcount_output
do BigQuery. A listagem de saída exibe 20 linhas a partir da saída de wordcount.spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para visualizar a tabela de saída, abra a páginaBigQuery
, selecione a tabelawordcount_output
e clique em Visualizar.
- Use o SSH para se conectar ao nó mestre do cluster do Dataproc.
Para mais informações
- Armazenamento do BigQuery e Spark SQL: Python
- Como criar um arquivo de definição de tabela para uma fonte de dados externa
- Como consultar dados particionados externamente
- Dicas de ajuste de jobs do Spark