Usar o conector Spark do Bigtable
Com o conector Bigtable Spark, é possível ler e gravar dados no Bigtable. É possível ler dados do seu aplicativo Spark usando o Spark SQL e o DataFrames. As seguintes operações do Bigtable são compatíveis com o uso do conector Bigtable Spark:
- Gravar dados
- Ler dados
- Criar uma tabela
Neste documento, mostramos como converter uma tabela de DataFrames SQL do Spark em uma tabela do Bigtable e, em seguida, compilar e criar um arquivo JAR para enviar um job do Spark.
Status de suporte do Spark e do Scala
O conector do Bigtable Spark é compatível apenas com a versão do Scala 2.12 e com as seguintes versões do Spark:
O conector do Bigtable Spark é compatível com as seguintes versões do Dataproc:
- Cluster de versão de imagem 1.5
- Cluster da versão da imagem 2.0
- Cluster de versão de imagem 2.1
- Cluster de versão de imagem 2.2
- Ambiente de execução sem servidor do Dataproc versão 1.0
Calcular custos
Se você decidir usar qualquer um dos seguintes componentes faturáveis do Google Cloud, será cobrado pelos recursos que usar:
- Bigtable (você não é cobrado por usar o emulador desse serviço)
- Dataproc
- Cloud Storage
Os preços do Dataproc se aplicam ao uso dos Dataproc em clusters do Compute Engine. Dataproc sem servidor preços se aplicam a cargas de trabalho e sessões executadas, no Dataproc sem servidor para Spark.
Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços.
Antes de começar
Atenda aos pré-requisitos a seguir antes de usar o conector Spark do Bigtable.
Funções exigidas
Para ter as permissões necessárias para usar o conector Bigtable Spark, peça ao administrador para conceder a você os seguintes papéis do IAM no projeto:
-
Administrador do Bigtable (
roles/bigtable.admin
)(opcional): permite ler ou gravar dados e criar uma nova tabela. -
Usuário do Bigtable (
roles/bigtable.user
): permite ler ou gravar dados, mas não permite criar uma nova tabela.
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Também é possível conseguir as permissões necessárias com papéis personalizados ou outros papéis predefinidos.
Se você estiver usando o Dataproc ou o Cloud Storage, talvez outras permissões sejam necessárias. Para mais informações, consulte Permissões do Dataproc e do Cloud Storage.
Configurar o Spark
Além de criar uma instância do Bigtable, você também precisa configurar sua instância do Spark. É possível fazer isso localmente ou selecionar uma destas opções para usar o Spark com o Dataproc:
- Cluster do Dataproc
- Dataproc sem servidor
Para mais informações sobre como escolher entre um cluster do Dataproc ou uma opção sem servidor, consulte Dataproc sem servidor para Spark em comparação com o Dataproc no Compute Engine. documentação.
Baixe o arquivo JAR do conector
Você pode encontrar o código-fonte do conector do Bigtable Spark com exemplos no repositório GitHub do conector do Bigtable Spark.
Com base na sua configuração do Spark, acesse o arquivo JAR da seguinte maneira:
Se você estiver executando o PySpark localmente, faça o download do arquivo JAR do conector no local
gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
do Cloud Storage.Substitua
SCALA_VERSION
pela versão do Scala, definida como2.12
como a única versão compatível, eCONNECTOR_VERSION
pela versão do conector que você quer usar.Para cluster do Dataproc ou opção sem servidor, use o arquivo JAR mais recente como um artefato que pode ser adicionado aos seus aplicativos Scala ou Java Spark. Para mais informações sobre como usar o arquivo JAR como um artefato, consulte Gerenciar dependências.
Se você estiver enviando seu job do PySpark para o Dataproc, use a sinalização
gcloud dataproc jobs submit pyspark --jars
para definir o URI do local do arquivo JAR no Cloud Storage, por exemplo,gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
.
Determinar o tipo de computação
Para jobs somente leitura, use a computação sem servidor do Data Boost (pré-lançamento), que evita o impacto nos clusters de serviço de aplicativos. Sua faísca aplicativo deve usar a versão 1.1.0 ou posterior do conector do Spark para usar Data Boost.
Para usar o Data Boost, você precisa criar um perfil de app e depois
forneça o ID do perfil de aplicativo para o spark.bigtable.app_profile.id
Spark
ao adicionar o Bigtable
no aplicativo Spark. Se você já criou um app
para seus jobs de leitura do Spark e quiser continuar usando sem
altera o código do aplicativo, você pode converter o perfil do aplicativo em um
Perfil do app Data Boost. Para mais informações, consulte Converter um aplicativo
de usuário.
Para mais informações, consulte a documentação do Bigtable Data Boost geral.
Para jobs que envolvem leituras e gravações, use o cluster da instância nós para computação especificando um perfil de aplicativo padrão com sua solicitação.
Identifique ou crie um perfil de aplicativo para usar
Se você não especificar um ID de perfil de app, o conector vai usar o app padrão. perfil.
Recomendamos que você use um perfil de aplicativo único para cada aplicativo que você em execução, incluindo seu aplicativo Spark. Para mais informações sobre o perfil de aplicativo tipos e configurações, consulte a seção Perfis de aplicativo geral. Para instruções, consulte Criar e configurar perfis de app.
Adicionar a configuração do Bigtable ao aplicativo Spark
No aplicativo Spark, adicione as opções do Spark que permitem interagir com o Bigtable.
Opções do Spark com suporte
Use as opções do Spark disponíveis como parte do pacote com.google.cloud.spark.bigtable
.
Nome da opção | Obrigatório | Valor padrão | Significado |
---|---|---|---|
spark.bigtable.project.id |
Sim | N/A | Defina o ID do projeto do Bigtable. |
spark.bigtable.instance.id |
Sim | N/A | Defina o ID da instância do Bigtable. |
catalog |
Sim | N/A | Defina o formato JSON que especifica o formato de conversão entre o esquema semelhante a SQL do DataFrame e o esquema da tabela do Bigtable. Consulte Criar metadados de tabela no formato JSON para mais informações. |
spark.bigtable.app_profile.id |
Não | default |
Defina o ID do perfil do app do Bigtable. |
spark.bigtable.write.timestamp.milliseconds |
Não | Hora atual do sistema | Defina o carimbo de data/hora em milissegundos para usar ao gravar um DataFrame no Bigtable. Observe que, como todas as linhas no DataFrame usam o mesmo carimbo de data/hora, as linhas com a mesma coluna de chave de linha no DataFrame são mantidas como uma única versão no Bigtable, porque compartilham o mesmo carimbo de data/hora. |
spark.bigtable.create.new.table |
Não | false |
Defina como true para criar uma nova tabela antes de gravar no Bigtable. |
spark.bigtable.read.timerange.start.milliseconds ou spark.bigtable.read.timerange.end.milliseconds |
Não | N/A | Defina carimbos de data/hora (em milissegundos desde o horário da época) para filtrar células com datas específicas de início e término, respectivamente. |
spark.bigtable.push.down.row.key.filters |
Não | true |
Defina como true para permitir a filtragem simples de chave de linha no lado do servidor. A filtragem em chaves de linha compostas é implementada no lado do cliente.Consulte Ler uma linha específica do DataFrame usando um filtro para mais informações. |
spark.bigtable.read.rows.attempt.timeout.milliseconds |
Não | 30min | Defina a duração do tempo limite para uma tentativa de linhas de leitura correspondente a uma partição do DataFrame no cliente do Bigtable para Java. |
spark.bigtable.read.rows.total.timeout.milliseconds |
Não | 12h | Defina a duração do tempo limite total de uma tentativa de linhas de leitura correspondente a uma partição do DataFrame no cliente do Bigtable para Java. |
spark.bigtable.mutate.rows.attempt.timeout.milliseconds |
Não | 1 min | Defina a duração de timeout para uma tentativa de mutate de linhas correspondente a uma partição do DataFrame no cliente do Bigtable para Java. |
spark.bigtable.mutate.rows.total.timeout.milliseconds |
Não | 10 min | Defina a duração do tempo limite total de uma tentativa de mutação de linhas correspondente a uma partição do DataFrame no cliente do Bigtable para Java. |
spark.bigtable.batch.mutate.size |
Não | 100 |
Defina conforme o número de mutações em cada lote. O valor máximo que você pode definir é 100000 . |
spark.bigtable.enable.batch_mutate.flow_control |
Não | false |
Defina como true para ativar o controle de fluxo para mutações em lote. |
Criar metadados de tabela no formato JSON
O formato de tabela DataFrames SQL do Spark precisa ser convertido em uma tabela do Bigtable usando uma string com formato JSON. Esse formato de string JSON torna o formato de dados compatível com o Bigtable. É possível transmitir o formato JSON no código do aplicativo usando a opção .option("catalog", catalog_json_string)
.
Por exemplo, considere a tabela do DataFrame a seguir e a tabela correspondente do Bigtable.
Neste exemplo, as colunas name
e birthYear
no DataFrame são agrupadas no grupo de colunas info
e renomeadas como name
e birth_year
, respectivamente. Da mesma forma, a coluna address
é armazenada no grupo de colunas location
com o mesmo nome de coluna. A coluna id
do DataFrame é convertida na chave de linha do Bigtable.
As chaves de linha não têm um nome de coluna dedicado no Bigtable. Neste exemplo, id_rowkey
é usada apenas para indicar ao conector que essa é a coluna de chave de linha. É possível usar qualquer nome para a coluna de chave de linha, e o mesmo nome deve ser usado ao declarar o campo "rowkey":"column_name"
no formato JSON.
DataFrame | Tabela do Bigtable = t1 | |||||||
Colunas | Chave de linha | Grupos de colunas | ||||||
informações | local | |||||||
Colunas | Colunas | |||||||
id | name | birthYear | address | id_rowkey | name | birth_year | address |
O formato JSON para o catálogo é o seguinte:
"""
{
"table": {"name": "t1"},
"rowkey": "id_rowkey",
"columns": {
"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
"name": {"cf": "info", "col": "name", "type": "string"},
"birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
"address": {"cf": "location", "col": "address", "type": "string"}
}
}
"""
As chaves e os valores usados no formato JSON são os seguintes:
Chave do catálogo | Valor do catálogo | Formato JSON |
---|---|---|
tabela | Nome da tabela do Bigtable. | "table":{"name":"t1"} Se a tabela não existe, use .option("spark.bigtable.create.new.table", "true") para criá-la. |
rowkey | Nome da coluna que será usada como a chave de linha do Bigtable. Verifique se o nome da coluna do DataFrame é usado como a chave de linha, por exemplo, id_rowkey . Chaves compostas também são aceitas como chaves de linha. Por exemplo, "rowkey":"name:address" . Essa abordagem pode resultar em chaves de linha que exigem uma verificação completa da tabela para todas as solicitações de leitura. |
"rowkey":"id_rowkey" , |
colunas | O mapeamento de cada coluna do DataFrame para o grupo de colunas ("cf" ) e o nome da coluna ("col" ) correspondentes do Bigtable. O nome da coluna pode ser diferente do nome da coluna na tabela do DataFrame. Os tipos de dados aceitos incluem string , long e binary . |
"columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}" Neste exemplo, id_rowkey é a chave de linha, e info e location são os grupos de colunas. |
Tipos de dados compatíveis
O conector oferece suporte ao uso dos tipos string
, long
e binary
(matriz de bytes)
no catálogo. Até que suporte para outros tipos, como int
e float
seja adicionado,
é possível converter manualmente esses tipos de dados em matrizes de bytes (a função
BinaryType
) antes de usar o conector para fazer a gravação.
Bigtable.
Além disso, é possível usar o Avro para serializar
tipos, como ArrayType
. Para mais informações, consulte Serializar dados complexos
tipos usando o Apache Avro.
Gravar no Bigtable
Use a função .write()
e as opções compatíveis para gravar seus dados no Bigtable.
Java
O código a seguir do repositório do GitHub usa Java e Maven para gravar no Bigtable.
String catalog = "{" +
"\"table\":{\"name\":\"" + tableName + "\"," +
"\"tableCoder\":\"PrimitiveType\"}," +
"\"rowkey\":\"wordCol\"," +
"\"columns\":{" +
"\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
"\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
"}}".replaceAll("\\s+", "");
…
private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
String createNewTable) {
dataframe
.write()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.option("spark.bigtable.create.new.table", createNewTable)
.save();
}
Python
O código a seguir do repositório do GitHub usa Python para gravar no Bigtable.
catalog = ''.join(("""{
"table":{"name":" """ + bigtable_table_name + """
", "tableCoder":"PrimitiveType"},
"rowkey":"wordCol",
"columns":{
"word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
"count":{"cf":"example_family", "col":"countCol", "type":"long"}
}
}""").split())
…
input_data = spark.createDataFrame(data)
print('Created the DataFrame:')
input_data.show()
input_data.write \
.format('bigtable') \
.options(catalog=catalog) \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.option('spark.bigtable.create.new.table', create_new_table) \
.save()
print('DataFrame was written to Bigtable.')
…
Ler do Bigtable
Use a função .read()
para verificar se a tabela foi importada com sucesso para o Bigtable.
Java
…
private static Dataset<Row> readDataframeFromBigtable(String catalog) {
Dataset<Row> dataframe = spark
.read()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.load();
return dataframe;
}
Python
…
records = spark.read \
.format('bigtable') \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.options(catalog=catalog) \
.load()
print('Reading the DataFrame from Bigtable:')
records.show()
Compilar seu projeto
Gere o arquivo JAR usado para executar um job em um cluster do Dataproc, o Dataproc sem servidor ou uma instância local do Spark. É possível compilar o arquivo JAR localmente e usá-lo para enviar um job. O caminho para o JAR compilado é definido como a variável de ambiente PATH_TO_COMPILED_JAR
quando você envia um job.
Esta etapa não se aplica aos aplicativos PySpark.
Gerenciar dependências
O conector Bigtable Spark é compatível com as seguintes ferramentas de gerenciamento de dependências:
Compilar o arquivo JAR
Maven
Adicione a dependência
spark-bigtable
ao seu arquivo pom.xml.<dependencies> <dependency> <groupId>com.google.cloud.spark.bigtable</groupId> <artifactId>spark-bigtable_SCALA_VERSION</artifactId> <version>0.1.0</version> </dependency> </dependencies>
Adicione o plug-in Maven Shade ao arquivo
pom.xml
para criar um JAR uber:<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins>
Execute o comando
mvn clean install
para gerar um arquivo JAR.
sbt
Adicione a dependência
spark-bigtable
ao arquivobuild.sbt
:libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
Adicione o plug-in
sbt-assembly
ao arquivoproject/plugins.sbt
ouproject/assembly.sbt
para criar um arquivo Uber JAR.addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
Execute o comando
sbt clean assembly
para gerar o arquivo JAR.
Gradle
Adicione a dependência
spark-bigtable
ao arquivobuild.gradle
.dependencies { implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0' }
Adicione o plug-in Shadow ao arquivo
build.gradle
para criar um arquivo JAR uber:plugins { id 'com.github.johnrengelman.shadow' version '8.1.1' id 'java' }
Consulte a documentação do plug-in Shadow para ver mais informações sobre configuração e compilação de JAR.
Envie um job
Envie um job do Spark usando o Dataproc, o Dataproc sem servidor ou uma instância local do Spark para iniciar o aplicativo.
Definir o ambiente de execução
Defina as seguintes variáveis de ambiente.
#Google Cloud
export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE
#Dataproc Serverless
export BIGTABLE_SPARK_SUBNET=SUBNET
export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME
#Scala/Java
export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR
#PySpark
export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR
Substitua:
- PROJECT_ID: o identificador permanente do projeto do Bigtable.
- INSTANCE_ID: o identificador permanente da instância do Bigtable.
- TABLE_NAME: identificador permanente da tabela.
- DATAPROC_CLUSTER: o identificador permanente do cluster do Dataproc.
- DATAPROC_REGION: a região do Dataproc que contém um dos clusters em sua instância do Dataproc, por exemplo,
northamerica-northeast2
. - DATAPROC_ZONE: a zona em que o cluster do Dataproc é executado.
- SUBNET: o caminho completo do recurso da sub-rede.
- GCS_BUCKET_NAME: o bucket do Cloud Storage para fazer upload das dependências de carga de trabalho do Spark.
- PATH_TO_COMPILED_JAR: o caminho completo ou relativo para o JAR compilado. Por exemplo,
/path/to/project/root/target/<compiled_JAR_name>
para Maven. - GCS_PATH_TO_CONNECTOR_JAR: o bucket do Cloud Storage
gs://spark-lib/bigtable
, em que o arquivospark-bigtable_SCALA_VERSION_CONNECTOR_VERSION.jar
está localizado. - PATH_TO_PYTHON_FILE: para aplicativos PySpark, o caminho para o arquivo Python que será usado para gravar e ler dados do Bigtable.
- LOCAL_PATH_TO_CONNECTOR_JAR: para aplicativos PySpark, caminho para o arquivo JAR do conector do Bigtable Spark transferido por download.
Enviar um job do Spark
Para instâncias do Dataproc ou sua configuração local do Spark, execute um job do Spark para fazer upload dos dados para o Bigtable.
Cluster do Dataproc
Usar o arquivo JAR compilado e criar um job de cluster do Dataproc que leia e grave dados no Bigtable e no Bigtable.
Criar um cluster de Dataproc. O exemplo a seguir mostra um comando de amostra para criar um cluster do Dataproc v2.0 com o Debian 10, dois nós de trabalho e configurações padrão.
gcloud dataproc clusters create \ $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \ --zone $BIGTABLE_SPARK_DATAPROC_ZONE \ --master-machine-type n2-standard-4 --master-boot-disk-size 500 \ --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \ --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
Enviar um job.
Scala/Java
O exemplo a seguir mostra a classe
spark.bigtable.example.WordCount
, que inclui a lógica para criar uma tabela de teste no DataFrame, gravar a tabela no Bigtable e contar o número de palavras na tabela.gcloud dataproc jobs submit spark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --class=spark.bigtable.example.WordCount \ --jar=$PATH_TO_COMPILED_JAR \ -- \ $BIGTABLE_SPARK_PROJECT_ID \ $BIGTABLE_SPARK_INSTANCE_ID \ $BIGTABLE_SPARK_TABLE_NAME \
PySpark
gcloud dataproc jobs submit pyspark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --jars=$GCS_PATH_TO_CONNECTOR_JAR \ --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \ $PATH_TO_PYTHON_FILE \ -- \ --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \ --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \ --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
Dataproc sem servidor
Usar o arquivo JAR compilado e criar um job do Dataproc que leia e grave dados no Bigtable com uma instância do Dataproc sem servidor.
Scala/Java
gcloud dataproc batches submit spark \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
-- \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
--jars=$GCS_PATH_TO_CONNECTOR_JAR \
--properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
-- \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
Spark local
Use o arquivo JAR baixado e crie um job do Spark que leia e grave dados no Bigtable e no Bigtable com uma instância local do Spark. Também é possível usar o emulador do Bigtable para enviar o job do Spark.
Usar o emulador do Bigtable
Se você decidir usar o emulador do Bigtable, siga estas etapas:
Execute o seguinte comando para iniciar o emulador:
gcloud beta emulators bigtable start
Por padrão, o emulador escolhe
localhost:8086
.Defina a variável de ambiente
BIGTABLE_EMULATOR_HOST
:export BIGTABLE_EMULATOR_HOST=localhost:8086
Para mais informações sobre como usar o emulador do Bigtable, consulte Testar usando o emulador.
Enviar um job do Spark
Use o comando spark-submit
para enviar um job do Spark, independentemente de estar usando um emulador local do Bigtable.
Scala/Java
spark-submit $PATH_TO_COMPILED_JAR \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
spark-submit \
--jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
--packages=org.slf4j:slf4j-reload4j:1.7.36 \
$PATH_TO_PYTHON_FILE \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
Verificar os dados da tabela
Execute o comando a seguir:
CLI cbt
para verificar se os dados foram gravados no Bigtable. O
CLI cbt
é um componente da Google Cloud CLI. Para mais informações, consulte a
CLI cbt
informações gerais.
cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
read $BIGTABLE_SPARK_TABLE_NAME
Soluções adicionais
Use o conector Spark do Bigtable para soluções específicas, como serialização de tipos complexos do Spark SQL, leitura de linhas específicas e geração de métricas do lado do cliente.
Ler uma linha específica do DataFrame usando um filtro
Ao usar o DataFrames para ler no Bigtable, especifique um filtro para ler somente linhas específicas. Filtros simples, como ==
, <=
e startsWith
na coluna da chave de linha, são aplicados no lado do servidor para evitar uma verificação completa da tabela. Os filtros em chaves de linha compostas ou complexos, como LIKE
na coluna da chave de linha, são aplicados no lado do cliente.
Se você estiver lendo tabelas grandes, recomendamos o uso de filtros de chave de linha simples para evitar a realização de uma verificação completa da tabela. O exemplo de instrução a seguir mostra como ler usando um filtro simples. No filtro do Spark, use o nome da coluna do DataFrame convertida na chave de linha:
dataframe.filter("id == 'some_id'").show()
Ao aplicar um filtro, use o nome da coluna do DataFrame em vez do nome da coluna da tabela do Bigtable.
Serializar tipos de dados complexos usando o Apache Avro
O conector Bigtable Spark permite usar o Apache Avro para serializar tipos complexos do Spark SQL, como ArrayType
, MapType
ou StructType
. O Apache Avro oferece serialização de dados para registro de dados que são normalmente usados para processar e armazenar estruturas de dados complexas.
Use uma sintaxe como "avro":"avroSchema"
para especificar que uma coluna no Bigtable precisa ser codificada usando Avro. Assim, é possível usar .option("avroSchema", avroSchemaString)
ao ler ou gravar no Bigtable para especificar o esquema Avro correspondente a essa coluna no formato de string. É possível usar nomes de opções diferentes, por exemplo, "anotherAvroSchema"
para colunas diferentes e transmitir esquemas Avro para várias colunas.
def catalogWithAvroColumn = s"""{
|"table":{"name":"ExampleAvroTable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
Usar métricas do lado do cliente
Como o conector Spark do Bigtable é baseado no cliente do Bigtable para Java, as métricas do lado do cliente são ativadas dentro do conector por padrão. Consulte a documentação sobre métricas do lado do cliente para encontrar mais detalhes sobre como acessar e interpretar essas métricas.
Usar o cliente Bigtable para Java com funções RDD de baixo nível
Como o conector Spark do Bigtable é baseado no cliente do Bigtable para Java, é possível usar o cliente diretamente nos aplicativos Spark e realizar solicitações de leitura ou gravação distribuídas nas funções de RDD de baixo nível, como mapPartitions
e foreachPartition
.
Para usar o cliente Bigtable em classes Java, anexe o prefixo com.google.cloud.spark.bigtable.repackaged
aos nomes dos pacotes. Por exemplo, em vez de usar o nome da classe como com.google.cloud.bigtable.data.v2.BigtableDataClient
, use com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient
.
Para mais informações sobre o cliente Bigtable para Java, consulte Cliente Bigtable para Java.
A seguir
- Saiba como ajustar seu job do Spark no Dataproc.
- Use as classes do cliente Bigtable para Java com o conector do Bigtable Spark.