Utilizzare il connettore Bigtable Spark
Il connettore Bigtable Spark ti consente di leggere e scrivere dati da e verso Bigtable. Puoi leggere i dati dall'applicazione Spark utilizzando Spark SQL e DataFrames. Le seguenti operazioni Bigtable sono supportate utilizzando il connettore Bigtable Spark:
- Scrivi dati
- Lettura di dati
- crea una nuova tabella
Questo documento illustra come convertire una tabella DataFrame di Spark SQL in una tabella Bigtable, quindi compilare e creare un file JAR per inviare un job Spark.
Stato del supporto di Spark e Scala
Il connettore Bigtable Spark supporta solo la versione Scala 2.12 e le seguenti versioni di Spark:
Il connettore Bigtable Spark supporta le seguenti versioni di Dataproc:
- Cluster image-version 1.5
- Cluster di versioni immagine 2.0
- 2.1 Cluster di versioni delle immagini
- 2.2 Cluster di versioni delle immagini
- Versione 1.0 del runtime Dataproc Serverless
Calcolare i costi
Se decidi di utilizzare uno dei seguenti componenti fatturabili di Google Cloud, ti vengono addebitate le risorse che utilizzi:
- Bigtable (non ti viene addebitato alcun costo per l'utilizzo dell'emulatore Bigtable)
- Dataproc
- Cloud Storage
I prezzi di Dataproc si applicano all'utilizzo di Dataproc sui cluster Compute Engine. I prezzi di Dataproc Serverless si applicano ai carichi di lavoro e alle sessioni eseguiti su Dataproc Serverless per Spark.
Per generare una stima dei costi in base all'utilizzo previsto, utilizza il Calcolatore prezzi.
Prima di iniziare
Completa i seguenti prerequisiti prima di utilizzare il connettore Bigtable Spark.
Ruoli obbligatori
Per ottenere le autorizzazioni necessarie per utilizzare il connettore Bigtable Spark, chiedi all'amministratore di concederti i seguenti ruoli IAM nel progetto:
-
Amministratore Bigtable (
roles/bigtable.admin
)(Facoltativo): ti consente di leggere o scrivere dati e creare una nuova tabella. -
Utente Bigtable (
roles/bigtable.user
): ti consente di leggere o scrivere dati, ma non di creare una nuova tabella.
Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.
Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.
Se utilizzi Dataproc o Cloud Storage, potrebbero essere necessarie autorizzazioni aggiuntive. Per ulteriori informazioni, consulta le autorizzazioni Dataproc e Cloud Storage.
Configurare Spark
Oltre a creare un'istanza Bigtable, devi anche configurare l'istanza Spark. Puoi farlo localmente o selezionare una delle seguenti opzioni per utilizzare Spark con Dataproc:
- Cluster Dataproc
- Dataproc Serverless
Per ulteriori informazioni sulla scelta tra un cluster Dataproc o un'opzione serverless, consulta la documentazione Confronto tra Dataproc Serverless per Spark e Dataproc su Compute Engine .
Scarica il file JAR del connettore
Puoi trovare il codice sorgente del connettore Bigtable Spark con esempi nel repository GitHub del connettore Bigtable Spark.
In base alla configurazione di Spark, puoi accedere al file JAR nel seguente modo:
Se esegui PySpark localmente, devi scaricare il file JAR del connettore dalla posizione Cloud Storage
gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
.Sostituisci
SCALA_VERSION
con la versione di Scala, imposta2.12
come unica versione supportata eCONNECTOR_VERSION
con la versione del connettore che vuoi utilizzare.Per l'opzione cluster o serverless Dataproc, utilizza il file JAR più recente come elemento che può essere aggiunto alle applicazioni Spark Scala o Java. Per ulteriori informazioni sull'utilizzo del file JAR come elemento, consulta Gestire le dipendenze.
Se invii il job PySpark a Dataproc, utilizza il flag
gcloud dataproc jobs submit pyspark --jars
per impostare l'URI della posizione del file JAR in Cloud Storage, ad esempiogs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
.
Determina il tipo di calcolo
Per i job di sola lettura, puoi utilizzare il serverless computing di Data Boost (anteprima), che consente di evitare ripercussioni sui cluster di servizio delle applicazioni. Per utilizzare Data Boost, l'applicazione Spark deve utilizzare la versione 1.1.0 o successive del connettore Spark.
Per utilizzare Data Boost, devi creare un profilo dell'app Data Boost e poi fornire l'ID profilo dell'app per l'spark.bigtable.app_profile.id
opzione Spark quando aggiungi la configurazione di Bigtable all'applicazione Spark. Se hai già creato un profilo dell'app per i tuoi job di lettura Spark e vuoi continuare a utilizzarlo senza modificare il codice dell'applicazione, puoi convertirlo in un profilo dell'app Data Boost. Per scoprire di più, consulta Convertire un profilo
dell'app.
Per ulteriori informazioni, consulta la panoramica di Bigtable Data Boost.
Per i job che prevedono letture e scritture, puoi utilizzare i nodi del cluster della tua istanza per il calcolo specificando un profilo dell'app standard con la richiesta.
Identifica o crea un profilo dell'app da utilizzare
Se non specifichi un ID profilo dell'app, il connettore utilizza il profilo dell'app predefinito.
Ti consigliamo di utilizzare un profilo dell'app univoco per ogni applicazione in esecuzione, inclusa l'applicazione Spark. Per ulteriori informazioni sui tipi e sulle impostazioni dei profili delle app, consulta la Panoramica dei profili delle app. Per le istruzioni, vedi Creare e configurare i profili di app.
Aggiungi la configurazione di Bigtable all'applicazione Spark
Nell'applicazione Spark, aggiungi le opzioni Spark che ti consentono di interagire con Bigtable.
Opzioni Spark supportate
Utilizza le opzioni di Spark disponibili nel pacchetto com.google.cloud.spark.bigtable
.
Nome opzione | Obbligatorio | Valore predefinito | Significato |
---|---|---|---|
spark.bigtable.project.id |
Sì | N/D | Imposta l'ID progetto Bigtable. |
spark.bigtable.instance.id |
Sì | N/D | Imposta l'ID istanza Bigtable. |
catalog |
Sì | N/D | Imposta il formato JSON che specifica il formato di conversione tra lo schema simile a SQL del DataFrame e lo schema della tabella Bigtable. Per ulteriori informazioni, consulta Creare metadati della tabella in formato JSON. |
spark.bigtable.app_profile.id |
No | default |
Imposta l'ID profilo dell'app Bigtable. |
spark.bigtable.write.timestamp.milliseconds |
No | Ora di sistema attuale | Imposta il timestamp in millisecondi da utilizzare durante la scrittura di un DataFrame in Bigtable. Tieni presente che, poiché tutte le righe del DataFrame utilizzano lo stesso timestamp, le righe con la stessa colonna della chiave di riga nel DataFrame vengono conservate come una singola versione in Bigtable poiché condividono lo stesso timestamp. |
spark.bigtable.create.new.table |
No | false |
Imposta su true per creare una nuova tabella prima di scrivere in Bigtable. |
spark.bigtable.read.timerange.start.milliseconds o spark.bigtable.read.timerange.end.milliseconds |
No | N/D | Imposta i timestamp (in millisecondi dal tempo di epoch) per filtrare le celle con una data di inizio e una di fine specifiche. |
spark.bigtable.push.down.row.key.filters |
No | true |
Imposta su true per consentire un semplice filtro delle chiave di riga lato server. Il filtro in base alle chiavi di riga composte è implementato lato client.Per ulteriori informazioni, consulta Leggere una riga specifica del DataFrame utilizzando un filtro. |
spark.bigtable.read.rows.attempt.timeout.milliseconds |
No | 30 min | Imposta la durata del timeout per un tentativo di lettura delle righe corrispondente a una partizione DataFrame nel client Bigtable per Java. |
spark.bigtable.read.rows.total.timeout.milliseconds |
No | 12 ore | Imposta la durata del timeout totale per un tentativo di lettura delle righe corrispondente a una partizione DataFrame nel client Bigtable per Java. |
spark.bigtable.mutate.rows.attempt.timeout.milliseconds |
No | 1 min | Imposta la durata del timeout per un tentativo di mutazione delle righe corrispondente a una partizione del DataFrame nel client Bigtable per Java. |
spark.bigtable.mutate.rows.total.timeout.milliseconds |
No | 10 min | Imposta la durata del timeout totale per un tentativo di mutazione delle righe corrispondente a una partizione del DataFrame nel client Bigtable per Java. |
spark.bigtable.batch.mutate.size |
No | 100 |
Impostato sul numero di mutazioni in ogni batch. Il valore massimo che puoi impostare è 100000 . |
spark.bigtable.enable.batch_mutate.flow_control |
No | false |
Imposta su true per attivare il controllo flusso per le mutazioni collettive. |
Creare i metadati delle tabelle in formato JSON
Il formato della tabella DataFrame di Spark SQL deve essere convertito in una tabella Bigtable utilizzando una stringa in formato JSON. Questo formato JSON di stringa rende il formato dei dati compatibile con Bigtable. Puoi passare il formato JSON nel codice dell'applicazione utilizzando l'opzione .option("catalog", catalog_json_string)
.
Ad esempio, considera la seguente tabella DataFrame e la tabella Bigtable corrispondente.
In questo esempio, le colonne name
e birthYear
nel DataFrame vengono raggruppate nella famiglia di colonne info
e rinominate rispettivamente in name
e birth_year
. Analogamente, la colonna address
viene archiviata nella famiglia di colonne location
con lo stesso nome. La colonna id
del DataFrame viene convertita nella chiave di riga Bigtable.
Le chiavi di riga non hanno un nome di colonna dedicato in Bigtable e in questo esempio id_rowkey
viene utilizzato solo per indicare al connettore che si tratta della colonna della chiave di riga. Puoi utilizzare un nome qualsiasi per la colonna della chiave di riga e assicurarti di utilizzare lo stesso nome quando dichiari il campo "rowkey":"column_name"
in formato JSON.
DataFrame | Tabella Bigtable = t1 | |||||||
Colonne | Chiave di riga | Famiglie di colonne | ||||||
informazioni | location | |||||||
Colonne | Colonne | |||||||
id | name | birthYear | address | id_rowkey | name | birth_year | address |
Il formato JSON del catalogo è il seguente:
"""
{
"table": {"name": "t1"},
"rowkey": "id_rowkey",
"columns": {
"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
"name": {"cf": "info", "col": "name", "type": "string"},
"birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
"address": {"cf": "location", "col": "address", "type": "string"}
}
}
"""
Le chiavi e i valori utilizzati nel formato JSON sono i seguenti:
Chiave del catalogo | Valore del catalogo | Formato JSON |
---|---|---|
tabella | Nome della tabella Bigtable. | "table":{"name":"t1"} Se la tabella non esiste, utilizza .option("spark.bigtable.create.new.table", "true") per crearne una. |
rowkey | Nome della colonna che verrà utilizzata come chiave di riga Bigtable. Assicurati che il nome della colonna del DataFrame venga utilizzato come chiave di riga, ad esempio id_rowkey . Sono accettate anche chiavi composte come chiavi di riga. Ad esempio: "rowkey":"name:address" . Questo approccio potrebbe comportare chiavi di riga che richiedono una scansione completa della tabella per tutte le richieste di lettura. |
"rowkey":"id_rowkey" , |
colonne | Mappatura di ogni colonna del DataFrame alla famiglia di colonne ("cf" ) e al nome della colonna ("col" ) Bigtable corrispondenti. Il nome della colonna può essere diverso da quello della colonna nella tabella del DataFrame. I tipi di dati supportati includono string , long e binary . |
"columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}" In questo esempio, id_rowkey è la chiave di riga e info e location sono le famiglie di colonne. |
Tipi di dati supportati
Il connettore supporta l'utilizzo di tipi string
, long
e binary
(array di byte) nel catalogo. Fino a quando non verrà aggiunto il supporto di altri tipi, come int
e float
,
puoi convertire manualmente questi tipi di dati in array di byte (BinaryType
di Spark SQL) prima di utilizzare il connettore per scriverli in
Bigtable.
Inoltre, puoi utilizzare Avro per serializzare tipi complessi, come ArrayType
. Per ulteriori informazioni, consulta Eseguire la serializzazione di tipi di dati complessi utilizzando Apache Avro.
Scrivere in Bigtable
Utilizza la funzione .write()
e le opzioni supportate per scrivere i dati in Bigtable.
Java
Il seguente codice del repository GitHub utilizza Java e Maven per scrivere in Bigtable.
String catalog = "{" +
"\"table\":{\"name\":\"" + tableName + "\"," +
"\"tableCoder\":\"PrimitiveType\"}," +
"\"rowkey\":\"wordCol\"," +
"\"columns\":{" +
"\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
"\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
"}}".replaceAll("\\s+", "");
…
private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
String createNewTable) {
dataframe
.write()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.option("spark.bigtable.create.new.table", createNewTable)
.save();
}
Python
Il seguente codice del repository GitHub utilizza Python per scrivere in Bigtable.
catalog = ''.join(("""{
"table":{"name":" """ + bigtable_table_name + """
", "tableCoder":"PrimitiveType"},
"rowkey":"wordCol",
"columns":{
"word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
"count":{"cf":"example_family", "col":"countCol", "type":"long"}
}
}""").split())
…
input_data = spark.createDataFrame(data)
print('Created the DataFrame:')
input_data.show()
input_data.write \
.format('bigtable') \
.options(catalog=catalog) \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.option('spark.bigtable.create.new.table', create_new_table) \
.save()
print('DataFrame was written to Bigtable.')
…
Leggere da Bigtable
Utilizza la funzione .read()
per verificare se la tabella è stata importata correttamente in Bigtable.
Java
…
private static Dataset<Row> readDataframeFromBigtable(String catalog) {
Dataset<Row> dataframe = spark
.read()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.load();
return dataframe;
}
Python
…
records = spark.read \
.format('bigtable') \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.options(catalog=catalog) \
.load()
print('Reading the DataFrame from Bigtable:')
records.show()
Compila il progetto
Genera il file JAR utilizzato per eseguire un job in un cluster Dataproc, in Dataproc Serverless o in un'istanza Spark locale. Puoi compilare il file JAR localmente e utilizzarlo per inviare un job. Il percorso del file JAR compilato viene impostato come variabile di ambiente PATH_TO_COMPILED_JAR
quando invii un job.
Questo passaggio non si applica alle applicazioni PySpark.
Gestisci dipendenze
Il connettore Bigtable Spark supporta i seguenti strumenti di gestione delle dipendenze:
Compila il file JAR
Maven
Aggiungi la dipendenza
spark-bigtable
al file pom.xml.<dependencies> <dependency> <groupId>com.google.cloud.spark.bigtable</groupId> <artifactId>spark-bigtable_SCALA_VERSION</artifactId> <version>0.1.0</version> </dependency> </dependencies>
Aggiungi il plug-in Maven Shade al file
pom.xml
per creare un uber JAR:<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins>
Esegui il comando
mvn clean install
per generare un file JAR.
sbt
Aggiungi la dipendenza
spark-bigtable
al filebuild.sbt
:libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
Aggiungi il plug-in
sbt-assembly
al fileproject/plugins.sbt
oproject/assembly.sbt
per creare un file JAR Uber.addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
Esegui il comando
sbt clean assembly
per generare il file JAR.
Gradle
Aggiungi la dipendenza
spark-bigtable
al filebuild.gradle
.dependencies { implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0' }
Aggiungi il plug-in Shadow al file
build.gradle
per creare un file JAR uber:plugins { id 'com.github.johnrengelman.shadow' version '8.1.1' id 'java' }
Per ulteriori informazioni sulla configurazione e sulla compilazione del JAR, consulta la documentazione del plug-in Shadow.
Invia un job
Invia un job Spark utilizzando Dataproc, Dataproc Serverless o un'istanza Spark locale per avviare l'applicazione.
Imposta l'ambiente di runtime
Imposta le seguenti variabili di ambiente.
#Google Cloud
export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE
#Dataproc Serverless
export BIGTABLE_SPARK_SUBNET=SUBNET
export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME
#Scala/Java
export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR
#PySpark
export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR
Sostituisci quanto segue:
- PROJECT_ID: l'identificatore permanente del progetto Bigtable.
- INSTANCE_ID: l'identificatore permanente dell'istanza Bigtable.
- TABLE_NAME: l'identificatore permanente della tabella.
- DATAPROC_CLUSTER: l'identificatore permanente del cluster Dataproc.
- DATAPROC_REGION: la regione Dataproc che contiene uno dei cluster nell'istanza Dataproc, ad esempio
northamerica-northeast2
. - DATAPROC_ZONE: la zona in cui viene eseguito il cluster Dataproc.
- SUBNET: il percorso completo della risorsa della subnet.
- GCS_BUCKET_NAME: il bucket Cloud Storage per caricare le dipendenze del carico di lavoro Spark.
- PATH_TO_COMPILED_JAR: il percorso completo o relativo al file JAR compilato, ad esempio
/path/to/project/root/target/<compiled_JAR_name>
per Maven. - GCS_PATH_TO_CONNECTOR_JAR: il bucket Cloud Storage
gs://spark-lib/bigtable
in cui si trova il filespark-bigtable_SCALA_VERSION_CONNECTOR_VERSION.jar
. - PATH_TO_PYTHON_FILE: per le applicazioni PySpark, il percorso del file Python che verrà utilizzato per scrivere e leggere i dati da Bigtable.
- LOCAL_PATH_TO_CONNECTOR_JAR: per le applicazioni PySpark, il percorso del file JAR del connettore Bigtable Spark scaricato.
Invia un job Spark
Per le istanze Dataproc o la configurazione locale di Spark, esegui un job Spark per caricare i dati in Bigtable.
Cluster Dataproc
Utilizza il file JAR compilato e crea un job del cluster Dataproc che legga e scriva dati da e verso Bigtable.
Crea un cluster Dataproc. L'esempio seguente mostra un comando di esempio per creare un cluster Dataproc 2.0 con Debian 10, due nodi worker e configurazioni predefinite.
gcloud dataproc clusters create \ $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \ --zone $BIGTABLE_SPARK_DATAPROC_ZONE \ --master-machine-type n2-standard-4 --master-boot-disk-size 500 \ --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \ --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
Invia un job.
Scala/Java
L'esempio seguente mostra la classe
spark.bigtable.example.WordCount
che include la logica per creare una tabella di test in DataFrame, scrivere la tabella in Bigtable e conteggiare il numero di parole nella tabella.gcloud dataproc jobs submit spark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --class=spark.bigtable.example.WordCount \ --jar=$PATH_TO_COMPILED_JAR \ -- \ $BIGTABLE_SPARK_PROJECT_ID \ $BIGTABLE_SPARK_INSTANCE_ID \ $BIGTABLE_SPARK_TABLE_NAME \
PySpark
gcloud dataproc jobs submit pyspark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --jars=$GCS_PATH_TO_CONNECTOR_JAR \ --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \ $PATH_TO_PYTHON_FILE \ -- \ --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \ --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \ --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
Dataproc Serverless
Utilizza il file JAR compilato e crea un job Dataproc che legga e scriva dati da e in Bigtable con un'istanza Dataproc Serverless.
Scala/Java
gcloud dataproc batches submit spark \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
-- \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
--jars=$GCS_PATH_TO_CONNECTOR_JAR \
--properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
-- \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
Spark locale
Utilizza il file JAR scaricato e crea un job Spark che legga e scriva dati da e verso Bigtable con un'istanza Spark locale. Puoi anche utilizzare l'emulatore Bigtable per inviare il job Spark.
Utilizzare l'emulatore Bigtable
Se decidi di utilizzare l'emulatore Bigtable, segui questi passaggi:
Esegui il seguente comando per avviare l'emulatore:
gcloud beta emulators bigtable start
Per impostazione predefinita, l'emulatore sceglie
localhost:8086
.Imposta la variabile di ambiente
BIGTABLE_EMULATOR_HOST
:export BIGTABLE_EMULATOR_HOST=localhost:8086
Per ulteriori informazioni sull'utilizzo dell'emulatore Bigtable, consulta Eseguire test utilizzando l'emulatore.
Invia un job Spark
Utilizza il comando spark-submit
per inviare un job Spark indipendentemente dal fatto che tu stia utilizzando un emulatore Bigtable locale.
Scala/Java
spark-submit $PATH_TO_COMPILED_JAR \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
spark-submit \
--jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
--packages=org.slf4j:slf4j-reload4j:1.7.36 \
$PATH_TO_PYTHON_FILE \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
Verificare i dati della tabella
Esegui il seguente comando cbt
CLI
per verificare che i dati vengano scritti in Bigtable. L'cbt
CLI
è un componente di Google Cloud CLI. Per ulteriori informazioni, consulta la
panoramica della CLIcbt
.
cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
read $BIGTABLE_SPARK_TABLE_NAME
Soluzioni aggiuntive
Utilizza il connettore Bigtable Spark per soluzioni specifiche, come la serializzazione di tipi Spark SQL complessi, la lettura di righe specifiche e la generazione di metriche lato client.
Leggere una riga specifica del DataFrame utilizzando un filtro
Quando utilizzi i DataFrame per leggere da Bigtable, puoi specificare un filtro per leggere solo righe specifiche. Filtri semplici come ==
, <=
e startsWith
nella colonna della chiave di riga vengono applicati lato server per evitare una scansione completa della tabella. I filtri sulle chiavi di riga composte o filtri complessi come il filtro LIKE
nella colonna della chiave di riga vengono applicati lato client.
Se stai leggendo tabelle di grandi dimensioni, ti consigliamo di utilizzare filtri semplici per chiave di riga per evitare di eseguire una scansione completa della tabella. L'istruzione di esempio seguente mostra come leggere utilizzando un semplice filtro. Assicurati di utilizzare nel filtro Spark il nome della colonna del DataFrame che viene convertita in chiave di riga:
dataframe.filter("id == 'some_id'").show()
Quando applichi un filtro, utilizza il nome della colonna del DataFrame anziché il nome della colonna della tabella Bigtable.
Serializzare tipi di dati complessi utilizzando Apache Avro
Il connettore Bigtable Spark supporta l'utilizzo di Apache Avro per serializzare tipi Spark SQL complessi, come ArrayType
, MapType
o StructType
. Apache Avro fornisce la serializzazione dei dati per i dati dei record che vengono comunemente utilizzati per l'elaborazione e l'archiviazione di strutture di dati complesse.
Utilizza una sintassi come "avro":"avroSchema"
per specificare che una colonna in Bigtable deve essere codificata utilizzando Avro. Puoi quindi utilizzare .option("avroSchema", avroSchemaString)
durante la lettura o la scrittura in Bigtable per specificare lo schema Avro corrispondente a quella colonna in formato stringa. Puoi utilizzare nomi di opzioni diversi, ad esempio "anotherAvroSchema"
per colonne diverse, e passare schemi Avro per più colonne.
def catalogWithAvroColumn = s"""{
|"table":{"name":"ExampleAvroTable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
Utilizzare le metriche lato client
Poiché il connettore Bigtable Spark si basa sul client Bigtable per Java, le metriche lato client sono abilitate all'interno del connettore per impostazione predefinita. Per ulteriori dettagli su come accedere a queste metriche e interpretarle, consulta la documentazione relativa alle metriche lato client.
Utilizzare il client Bigtable per Java con funzioni RDD di basso livello
Poiché il connettore Bigtable Spark si basa sul client Bigtable per Java, puoi utilizzare direttamente il client nelle tue applicazioni Spark ed eseguire richieste di lettura o scrittura distribuite all'interno delle funzioni RDD di basso livello come mapPartitions
e foreachPartition
.
Per utilizzare il client Bigtable per le classi Java, aggiungi il prefisso com.google.cloud.spark.bigtable.repackaged
ai nomi dei pacchetti. Ad esempio, anziché utilizzare il nome della classe come com.google.cloud.bigtable.data.v2.BigtableDataClient
, utilizza com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient
.
Per ulteriori informazioni sul client Bigtable per Java, consulta il client Bigtable per Java.
Passaggi successivi
- Scopri come ottimizzare il job Spark in Dataproc.
- Utilizza le classi del client Bigtable per Java con il connettore Bigtable Spark.