Usar o conector do BigQuery do Spark

O spark-bigquery-conector é usado com o Apache Spark para ler e gravar dados do e para o BigQuery. O conector usa a API BigQuery Storage ao ler dados do BigQuery.

Este tutorial fornece informações sobre a disponibilidade do conector pré-instalado e mostra como disponibilizar uma versão específica do conector para trabalhos do Spark. O exemplo de código mostra como usar o conector do Spark BigQuery em um aplicativo Spark.

Usar o conector pré-instalado

O conector do Spark BigQuery é pré-instalado e está disponível para jobs do Spark executados em clusters do Dataproc criados com versões de imagem 2.1 e mais recentes. A versão do conector pré-instalado é listada na página de lançamento de cada versão da imagem. Por exemplo, a linha BigQuery Connector na página Versões de lançamento da imagem 2.2.x mostra a versão do conector instalada nas versões mais recentes da imagem 2.2.

Disponibilizar uma versão específica do conector para jobs do Spark

Se você quiser usar uma versão de conector diferente da versão pré-instalada em um cluster de versão de imagem 2.1 ou mais recente ou se quiser instalar o conector em um cluster de versão de imagem anterior à 2.1, siga as instruções desta seção.

Importante:a versão do spark-bigquery-connector precisa ser compatível com a versão da imagem do cluster do Dataproc. Consulte a matriz de compatibilidade do conector com a imagem do Dataproc.

Clusters de versão de imagem 2.1 e mais recentes

Ao criar um cluster do Dataproc com uma versão de imagem 2.1 ou mais recente, especifique a versão do conector como metadados do cluster.

Exemplo da CLI gcloud:

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --image-version=2.2 \
    --metadata=SPARK_BQ_CONNECTOR_VERSION or SPARK_BQ_CONNECTOR_URL\
    other flags

Observações:

  • SPARK_BQ_CONNECTOR_VERSION: especifica uma versão do conector. As versões do conector Spark BigQuery estão listadas na página spark-bigquery-connector/releases no GitHub.

    Exemplo:

    --metadata=SPARK_BQ_CONNECTOR_VERSION=0.42.1
    
  • SPARK_BQ_CONNECTOR_URL: especifique um URL que aponte para o JAR no Cloud Storage. É possível especificar o URL de um conector listado na coluna link em Downloading and Using the Connector no GitHub ou o caminho para um local do Cloud Storage em que você colocou um JAR de conector personalizado.

    Exemplos:

    --metadata=SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar
    --metadata=SPARK_BQ_CONNECTOR_URL=gs://PATH_TO_CUSTOM_JAR
    

2.0 e clusters de versões de imagem anteriores

É possível disponibilizar o conector do Spark BigQuery para seu aplicativo de uma das seguintes maneiras:

  1. Instale o spark-bigquery-connector no diretório jars do Spark de cada nó usando a ação de inicialização dos conectores do Dataproc ao criar o cluster.

  2. Forneça o URL do jar do conector ao enviar o job para o cluster usando o console do Google Cloud, a CLI gcloud ou a API Dataproc.

    Console

    Use o item Jars files do job do Spark na página Enviar um job do Dataproc.

    gcloud

    Use a flag gcloud dataproc jobs submit spark --jars.

    API

    Use o campo SparkJob.jarFileUris.

    Como especificar o jar do conector ao executar jobs do Spark em clusters de versões de imagem anteriores à 2.0

    • Especifique o jar do conector substituindo as informações da versão do Scala e do conector na seguinte string de URI:
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
      
    • Usar o Scala 2.12 com as versões de imagem do Dataproc 1.5+
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
      
      Exemplo da CLI gcloud:
      gcloud dataproc jobs submit spark \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
          -- job args
      
    • Use o Scala 2.11 com as versões de imagem do Dataproc 1.4 e anteriores:
      gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
      
      Exemplo da CLI gcloud:
      gcloud dataproc jobs submit spark \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
          -- job-args
      
  3. Inclua o jar do conector no aplicativo Scala ou Java Spark como uma dependência. Consulte Como compilar com o conector.

Calcular custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

  • Dataproc
  • BigQuery
  • Cloud Storage

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Ler e gravar dados no BigQuery

Este exemplo lê dados do BigQuery em um DataFrame do Spark para executar uma contagem de palavras usando a API de origem de dados padrão.

O conector grava os dados no BigQuery primeiro armazenando-os em buffer em uma tabela temporária do Cloud Storage. Em seguida, ele copia todos os dados do BigQuery em uma única operação. O conector tenta excluir os arquivos temporários depois que a operação de carregamento do BigQuery for bem-sucedida e mais uma vez quando o aplicativo Spark é encerrado. Se o job falhar, remova todos os arquivos temporários do Cloud Storage restantes. Normalmente, os arquivos temporários do BigQuery estão localizados em gs://[bucket]/.spark-bigquery-[jobid]-[UUID].

Configurar o faturamento

Por padrão, o projeto associado às credenciais ou à conta de serviço é cobrado pelo uso da API. Para faturar um projeto diferente, defina a seguinte configuração: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Ele também pode ser adicionado a uma operação de leitura/gravação, da seguinte maneira: .option("parentProject", "<BILLED-GCP-PROJECT>").

Executar o código

Antes de executar este exemplo, crie um conjunto de dados chamado "wordcount_dataset" ou mude o conjunto de dados de saída no código para um conjunto de dados do BigQuery no projeto Google Cloud .

Use o comando bq para criar o wordcount_dataset:

bq mk wordcount_dataset

Use o comando Google Cloud CLI para criar um bucket do Cloud Storage, que será usado para exportar para o BigQuery:

gcloud storage buckets create gs://[bucket]

Scala

  1. Examine o código e substitua o marcador [bucket] pelo bucket do Cloud Storage criado anteriormente.
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      (spark.read.format("bigquery")
      .option("table","bigquery-public-data:samples.shakespeare")
      .load()
      .cache())
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .option("table","wordcount_dataset.wordcount_output")
      .save())
  2. Executar o código no seu cluster
    1. Use o SSH para se conectar ao nó mestre do cluster do Dataproc.
      1. Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do cluster.
        Página de clusters do Dataproc no Console do Cloud.
      2. Na página >Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em SSH à direita do nome do nó mestre do cluster
        Página de detalhes do cluster do Dataproc no console do Cloud

        Uma janela do navegador é aberta no diretório principal do nó mestre.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crie wordcount.scala com o editor de texto vi, vim ou nano pré-instalado e cole o código da lista de códigos Scala
      nano wordcount.scala
        
    3. Inicie o REPL spark-shell.
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. Execute o wordcount.scala com o comando :load wordcount.scala para criar a tabela wordcount_output do BigQuery. A listagem de saída exibe 20 linhas a partir da saída de wordcount.
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Para visualizar a tabela de saída, abra a página BigQuery, selecione a tabela wordcount_output e clique em Visualizar.
      Visualização da tabela na página do BigQuery Explorer no console do Cloud.

PySpark

  1. Examine o código e substitua o marcador [bucket] pelo bucket do Cloud Storage criado anteriormente.
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
  2. Execute o código no cluster
    1. Use o SSH para se conectar ao nó mestre do cluster do Dataproc.
      1. Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do cluster.
        Página &quot;Clusters&quot; no console do Cloud.
      2. Na página Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em SSH à direita do nome do nó mestre do cluster
        Selecione SSH na linha do nome do cluster na página &quot;Detalhes do cluster&quot; no Console do Cloud.

        Uma janela do navegador é aberta no diretório principal do nó mestre.
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crie wordcount.py com o editor de texto vi, vim ou nano pré-instalado e cole o código PySpark da lista de códigos PySpark
      nano wordcount.py
      
    3. Execute a contagem de palavras com spark-submit para criar a tabela wordcount_output do BigQuery. A listagem de saída exibe 20 linhas a partir da saída de wordcount.
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Para visualizar a tabela de saída, abra a página BigQuery, selecione a tabela wordcount_output e clique em Visualizar.
      Visualização da tabela na página do BigQuery Explorer no console do Cloud.

Para mais informações