Nesta página, mostramos como criar um cluster do Dataproc que usa o conector do Spark Spanner para ler dados do Spanner usando o Apache Spark.
O conector do Spanner funciona com o Spark para ler dados do banco de dados do Spanner usando a biblioteca Java do Spanner. O conector do Spanner permite ler tabelas e gráficos do Spanner em DataFrames e GraphFrames do Spark.
Custos
Neste documento, você vai usar os seguintes componentes faturáveis do Google Cloud:
- Dataproc
- Spanner
- Cloud Storage
Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços.
Antes de começar
Antes de usar o conector do Spanner neste tutorial, configure um cluster do Dataproc e uma instância e um banco de dados do Spanner.
Configurar um cluster do Dataproc
Crie um cluster do Dataproc ou use um cluster atual com as seguintes configurações:
Permissões da conta de serviço da VM. A conta de serviço da VM do cluster precisa receber as permissões do Spanner adequadas. Se você usar o Data Boost (ativado no exemplo de código em Exportar tabelas do Spanner), a conta de serviço da VM também precisará ter as permissões do IAM do Data Boost necessárias.
Escopo de acesso. O cluster precisa ser criado com o escopo
cloud-platform
ou o escopospanner
apropriado ativado. O escopocloud-platform
é ativado por padrão para clusters criados com a versão da imagem 2.1 ou mais recente.As instruções a seguir mostram como definir o
cloud-platform
escopo como parte de uma solicitação de criação de cluster que usa o console Google Cloud , a CLI gcloud ou a API Dataproc. Para mais instruções sobre como criar um cluster, consulte Criar um cluster.Google Cloud console
- No console do Google Cloud , abra a página Criar um cluster do Dataproc.
- No painel Gerenciar segurança, na seção Acesso ao projeto, clique em "Ativa o escopo da plataforma de nuvem para este cluster".
- Preencha ou confirme os outros campos de criação de cluster e clique em Criar.
CLI da gcloud
Execute o comando
gcloud dataproc clusters create
a seguir para criar um cluster com o escopocloud-platform
ativado.gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
É possível especificar o GceClusterConfig.serviceAccountScopes como parte de uma solicitação clusters.create.
"serviceAccountScopes": "https://www.googleapis.com/auth/cloud-platform"
Configurar uma instância do Spanner com uma tabela de banco de dados Singers
Crie uma instância do Spanner
com um banco de dados que contenha uma tabela Singers
. Anote o ID da instância e do banco de dados do Spanner.
Usar o conector do Spanner com o Spark
O conector do Spanner está disponível para as versões 3.1+
do Spark.
Você especifica a versão do conector como parte da especificação do arquivo JAR do conector do Cloud Storage ao enviar um job para um cluster do Dataproc.
Exemplo:envio de job do Spark da CLI gcloud com o conector do Spanner.
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar \ ... [other job submission flags]
Substitua:
CONNECTOR_VERSION: versão do conector do Spanner.
Escolha a versão do conector do Spanner na lista de versões do repositório GoogleCloudDataproc/spark-spanner-connector
do GitHub.
Ler tabelas do Spanner
É possível usar Python ou Scala para ler dados de tabelas do Spanner em um DataFrame do Spark usando a API de fonte de dados do Spark.
PySpark
É possível executar o exemplo de código PySpark nesta seção no cluster enviando o job ao serviço do Dataproc ou executando o job no REPL spark-submit
no nó mestre do cluster.
Job do Dataproc
- Crie um arquivo
singers.py
usando um editor de texto local ou no Cloud Shell com o editor de texto pré-instaladovi
,vim
ounano
. - Depois de preencher as variáveis de marcador de posição, cole o código a seguir no arquivo
singers.py
. O recurso Data Boost do Spanner está ativado, o que tem impacto quase zero na instância principal do Spanner.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Substitua:
- PROJECT_ID: o ID do projeto do Google Cloud . Os IDs do projeto estão listados na seção Informações do projeto no painel do console Google Cloud .
- INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulte Configurar uma instância do Spanner com a tabela de banco de dados
Singers
.
- Salve o arquivo
singers.py
. - Envie o job para o serviço do Dataproc usando o console Google Cloud , CLI gcloud ou a API Dataproc.
Exemplo:envio de jobs da CLI gcloud com o conector do Spanner.
gcloud dataproc jobs submit pyspark singers.py \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Substitua:
- CLUSTER_NAME: o nome do novo cluster.
- REGION: uma região do Compute Engine disponível para executar a carga de trabalho.
- CONNECTOR_VERSION: versão do conector do Spanner.
Escolha a versão do conector do Spanner na lista de versões do repositório
GoogleCloudDataproc/spark-spanner-connector
do GitHub.
Job spark-submit
- Conecte-se ao nó mestre do cluster do Dataproc usando SSH.
- Acesse a página Clusters do Dataproc no console Google Cloud e clique no nome do cluster.
- Na página Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em
SSH
à direita do nome do nó mestre do cluster.Uma janela de navegador é aberta no diretório inicial do nó mestre.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crie um arquivo
singers.py
no nó mestre usando o editor de texto pré-instaladovi
,vim
ounano
.- Cole o código a seguir no arquivo
singers.py
. O recurso Data Boost do Spanner está ativado, o que tem impacto quase zero na instância principal do Spanner.#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "PROJECT_ID") \ .option("instanceId", "INSTANCE_ID") \ .option("databaseId", "DATABASE_ID") \ .option("table", "TABLE_NAME") \ .option("enableDataBoost", "true") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
Substitua:
- PROJECT_ID: o ID do projeto do Google Cloud . Os IDs do projeto estão listados na seção Informações do projeto no painel do console Google Cloud .
- INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulte Configurar uma instância do Spanner com a tabela de banco de dados
Singers
.
- Salve o arquivo
singers.py
.
- Cole o código a seguir no arquivo
- Execute
singers.py
comspark-submit
para criar a tabelaSingers
do Spanner.spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
Substitua:
- CONNECTOR_VERSION: versão do conector do Spanner.
Escolha a versão do conector do Spanner na lista de versões do repositório
GoogleCloudDataproc/spark-spanner-connector
do GitHub.
A resposta é:
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- CONNECTOR_VERSION: versão do conector do Spanner.
Escolha a versão do conector do Spanner na lista de versões do repositório
Scala
Para executar o exemplo de código Scala no cluster, siga estas etapas:
- Conecte-se ao nó mestre do cluster do Dataproc usando SSH.
- Acesse a página Clusters do Dataproc no console Google Cloud e clique no nome do cluster.
- Na página Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em
SSH
à direita do nome do nó mestre do cluster.Uma janela de navegador é aberta no diretório inicial do nó mestre.
Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crie um arquivo
singers.scala
no nó mestre usando o editor de texto pré-instaladovi
,vim
ounano
.- Cole o código a seguir no arquivo
singers.scala
. O recurso Data Boost do Spanner está ativado, o que tem impacto quase zero na instância principal do Spanner.object singers { def main(): Unit = { /* * Uncomment (use the following code) if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "PROJECT_ID") .option("instanceId", "INSTANCE_ID") .option("databaseId", "DATABASE_ID") .option("table", "TABLE_NAME") .option("enableDataBoost", true) .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
Substitua:
- PROJECT_ID: o ID do projeto do Google Cloud . Os IDs do projeto estão listados na seção Informações do projeto no painel do console Google Cloud .
- INSTANCE_ID, DATABASE_ID e TABLE_NAME : consulte Configurar uma instância do Spanner com a tabela de banco de dados
Singers
.
- Salve o arquivo
singers.scala
.
- Cole o código a seguir no arquivo
- Inicie o REPL
spark-shell
.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Substitua:
CONNECTOR_VERSION: versão do conector do Spanner. Escolha a versão do conector do Spanner na lista de versões do repositório
GoogleCloudDataproc/spark-spanner-connector
do GitHub. - Execute
singers.scala
com o comando:load singers.scala
para criar a tabelaSingers
do Spanner. A listagem de saída mostra exemplos da saída "Singers".> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
Ler gráficos do Spanner
O conector do Spanner permite exportar o gráfico para DataFrames de nós e arestas separados, além de exportar diretamente para GraphFrames
.
O exemplo a seguir exporta um Spanner para um GraphFrame
.
Ele usa a classe SpannerGraphConnector
do Python, incluída no jar do conector do Spanner, para ler o Spanner Graph.
from pyspark.sql import SparkSession connector_jar = "gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar" spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example") .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12") .config("spark.jars", connector_jar) .getOrCreate()) spark.sparkContext.addPyFile(connector_jar) from spannergraph import SpannerGraphConnector connector = (SpannerGraphConnector() .spark(spark) .project("PROJECT_ID") .instance("INSTANCE_ID") .database("DATABASE_ID") .graph("GRAPH_ID")) g = connector.load_graph() g.vertices.show() g.edges.show()
Substitua:
- CONNECTOR_VERSION: versão do conector do Spanner.
Escolha a versão do conector do Spanner na lista de versões do repositório
GoogleCloudDataproc/spark-spanner-connector
do GitHub. - PROJECT_ID: o ID do projeto do Google Cloud . Os IDs do projeto estão listados na seção Informações do projeto no Google Cloud painel do console.
- INSTANCE_ID, DATABASE_ID e TABLE_NAME Insira os IDs da instância, do banco de dados e do gráfico.
Para exportar nós e arestas DataFrames
em vez de GraphFrames, use
load_dfs
:
df_vertices, df_edges, df_id_map = connector.load_dfs()
Limpar
Para evitar cobranças contínuas na sua conta do Google Cloud , pare ou exclua o cluster do Dataproc e exclua a instância do Spanner.
A seguir
- Consulte os
exemplos de
pyspark.sql.DataFrame
. - Para informações sobre o suporte a idiomas do DataFrame do Spark, consulte:
- Consulte o repositório do conector do Spark Spanner no GitHub.
- Confira as dicas de ajuste de jobs do Spark.