Usar o conector do Spanner com o Spark

Nesta página, mostramos como usar o Conector do Spark Spanner para ler dados do Spanner usando o Apache Spark

Cálculo de custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

  • Dataproc
  • Spanner
  • Cloud Storage

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Antes de começar

Antes de executar o tutorial, verifique se você sabe a versão do conector e conseguir um URI do conector.

Como especificar o URI do arquivo JAR do conector

As versões do conector Spark Spanner estão listadas no repositório do GitHub GoogleCloudDataproc/spark-spanner-connector.

Especifique o arquivo JAR do conector substituindo as informações da versão do conector na seguinte string de URI:

gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar

O conector está disponível para as versões do Spark 3.1+

Exemplo da CLI gcloud:

gcloud dataproc jobs submit spark \
    --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \
    -- job-args
  

Preparar o banco de dados do Spanner

Se você não tiver uma tabela do Spanner, siga o tutorial para criar um na tabela do Spanner. Depois disso, você terá um ID de instância, um ID de banco de dados e uma tabela Singers.

Criar cluster do Dataproc

Qualquer cluster do Dataproc que use o conector precisa dos escopos spanner ou cloud-platform. Os clusters do Dataproc têm o escopo padrão cloud-platform para a imagem 2.1 ou mais recente. Se você usa uma versão mais antiga, pode usar o console do Google Cloud, a Google Cloud CLI e a API Dataproc para criar um cluster do Dataproc.

Console

  1. No console do Google Cloud, abra o Dataproc Página Criar um cluster
  2. Na página "Gerenciar segurança", clique em "Ativa o escopo da plataforma de nuvem para este cluster" na seção "Acesso ao projeto" nesta seção.
  3. Preencha ou confirme os outros campos de criação de cluster e, em seguida, clique em "Criar".

Google Cloud CLI

gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
    

API

É possível especificar o GceClusterConfig.serviceAccountScopes como parte de uma solicitação clusters.create. Exemplo:
        "serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
    

Garanta que a permissão do Spanner correspondente esteja atribuída à conta de serviço da VM do Dataproc. Se você usar o Data Boost no tutorial, consulte a Permissão do IAM do Data Boost

Ler dados do Spanner

É possível usar Scala e Python para ler dados do Spanner em um DataFrame usando uma fonte de dados do Spark API.

Scala

  1. Analise o código e substitua os marcadores de posição [projectId], [instanceId], [databaseId] e [table] por o ID do projeto, o ID da instância, o ID do banco de dados e a tabela que você criou anteriormente. A opção enableDataBoost ativa o recurso Data Boost do Spanner, que tem impacto quase zero na instância principal do Spanner.
    object singers {
      def main(): Unit = {
        /*
         * Remove comment if you are not running in spark-shell.
         *
        import org.apache.spark.sql.SparkSession
        val spark = SparkSession.builder()
          .appName("spark-spanner-demo")
          .getOrCreate()
        */
    
        // Load data in from Spanner. See
        // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties
        // for option information.
        val singersDF =
          (spark.read.format("cloud-spanner")
            .option("projectId", "[projectId]")
            .option("instanceId", "[instanceId]")
            .option("databaseId", "[databaseId]")
            .option("enableDataBoost", true)
            .option("table", "[table]")
            .load()
            .cache())
    
        singersDF.createOrReplaceTempView("Singers")
    
        // Load the Singers table.
        val result = spark.sql("SELECT * FROM Singers")
        result.show()
        result.printSchema()
      }
    }
  2. Executar o código no seu cluster
    1. Use o SSH para se conectar ao nó mestre do cluster do Dataproc
      1. Acesse o Clusters do Dataproc página no console do Google Cloud e, em seguida, clique no nome do seu cluster
        Página de clusters do Dataproc no console do Cloud.
      2. Na página >Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em SSH à direita do nome do nó mestre do cluster.
        Página de detalhes do cluster do Dataproc no console do Cloud

        Uma janela do navegador é aberta no diretório principal do nó mestre
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crie singers.scala com o editor de texto vi, vim ou nano pré-instalado e cole o código da lista de códigos Scala
      nano singers.scala
        
    3. Inicie o REPL spark-shell.
      $ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
      
    4. Executar singers.scala com o comando :load singers.scala para criar a tabela Singers do Spanner. O resultado a listagem exibe exemplos da saída de Singers.
      > :load singers.scala
      Loading singers.scala...
      defined object singers
      > singers.main()
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
       

PySpark

  1. Examine o código e substitua os marcadores [projectId], [instanceId], [databaseId] e [table] pelo ID do projeto, ID da instância, ID do banco de dados e tabela que você criou anteriormente. A opção enableDataBoost ativa o recurso Data Boost do Spanner, que tem quase zero na instância principal do Spanner.
    #!/usr/bin/env python
    
    """Spanner PySpark read example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-spanner-demo') \
      .getOrCreate()
    
    # Load data from Spanner.
    singers = spark.read.format('cloud-spanner') \
      .option("projectId", "[projectId]") \
      .option("instanceId", "[instanceId]") \
      .option("databaseId", "[databaseId]") \
      .option("enableDataBoost", "true") \
      .option("table", "[table]") \
      .load()
    singers.createOrReplaceTempView('Singers')
    
    # Read from Singers
    result = spark.sql('SELECT * FROM Singers')
    result.show()
    result.printSchema()
  2. Executar o código no cluster
    1. Use o SSH para se conectar ao nó mestre do cluster do Dataproc
      1. Acesse o Clusters do Dataproc página no console do Google Cloud e, em seguida, clique no nome do seu cluster
        Página "Clusters" no console do Cloud.
      2. Na página Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em SSH à direita do nome do nó mestre do cluster.
        Selecione "SSH na linha do nome do cluster" na página de detalhes do cluster no console do Cloud.

        Uma janela do navegador é aberta no seu diretório principal no nó principal
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crie singers.py com o editor de texto vi, vim ou nano pré-instalado e cole o código PySpark da lista de códigos PySpark
      nano singers.py
      
    3. Execute singers.py com spark-submit para criar a tabela Singers do Spanner.
      spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
      
      A saída é:
      ...
      +--------+---------+--------+---------+-----------+
      |SingerId|FirstName|LastName|BirthDate|LastUpdated|
      +--------+---------+--------+---------+-----------+
      |       1|     Marc|Richards|     null|       null|
      |       2| Catalina|   Smith|     null|       null|
      |       3|    Alice| Trentor|     null|       null|
      +--------+---------+--------+---------+-----------+
      
      root
       |-- SingerId: long (nullable = false)
       |-- FirstName: string (nullable = true)
       |-- LastName: string (nullable = true)
       |-- BirthDate: date (nullable = true)
       |-- LastUpdated: timestamp (nullable = true)
      only showing top 20 rows
      

Limpeza

Para limpar e evitar cobranças contínuas na sua conta do Google Cloud pelos recursos criados neste tutorial, siga as etapas a seguir.

gcloud dataproc clusters stop CLUSTER_NAME
gcloud dataproc clusters delete CLUSTER_NAME

Para mais informações