Utiliser le connecteur Bigtable Spark
Le connecteur Bigtable Spark vous permet de lire et d'écrire des données depuis et vers Bigtable. Vous pouvez lire les données de votre application Spark à l'aide de Spark SQL et de DataFrames. Les opérations Bigtable suivantes sont compatibles avec le connecteur Spark Bigtable:
- Écrire des données
- Lire des données
- Créer une table
Ce document explique comment convertir une table DataFrames SQL Spark en table Bigtable, puis compiler et créer un fichier JAR pour envoyer un job Spark.
État de la compatibilité de Spark et Scala
Le connecteur Bigtable Spark n'est compatible qu'avec la version Scala 2.12 et les versions Spark suivantes:
Le connecteur Bigtable Spark est compatible avec les versions Dataproc suivantes:
- Cluster de version d'image 1.5
- Cluster de version d'image 2.0
- Cluster image-version 2.1
- Cluster image-version 2.2
- Version d'exécution Dataproc sans serveur 1.0
Calculer les coûts
Si vous décidez d'utiliser l'un des composants facturables de Google Cloud suivants, les ressources que vous utilisez vous sont facturées:
- Bigtable (l'utilisation de l'émulateur Bigtable n'est pas facturée)
- Dataproc
- Cloud Storage
La tarification Dataproc s'applique à l'utilisation de Dataproc sur les clusters Compute Engine. Les tarifs Dataproc sans serveur s'appliquent aux charges de travail et aux sessions exécutées sur Dataproc sans serveur pour Spark.
Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût.
Avant de commencer
Avant d'utiliser le connecteur Bigtable Spark, vérifiez que vous disposez des conditions préalables suivantes.
Rôles requis
Pour obtenir les autorisations nécessaires pour utiliser le connecteur Bigtable Spark, demandez à votre administrateur de vous accorder les rôles IAM suivants sur votre projet:
-
Administrateur Bigtable (
roles/bigtable.admin
)(facultatif) : vous permet de lire ou d'écrire des données et de créer une table. -
Utilisateur Bigtable (
roles/bigtable.user
) : permet de lire ou d'écrire des données, mais pas de créer de table.
Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.
Vous pouvez également obtenir les autorisations requises via des rôles personnalisés ou d'autres rôles prédéfinis.
Si vous utilisez Dataproc ou Cloud Storage, des autorisations supplémentaires peuvent être requises. Pour en savoir plus, consultez les pages Autorisations Dataproc et Autorisations Cloud Storage.
Configurer Spark
En plus de créer une instance Bigtable, vous devez également configurer votre instance Spark. Vous pouvez le faire localement ou sélectionner l'une des options suivantes pour utiliser Spark avec Dataproc:
- Cluster Dataproc
- Dataproc sans serveur
Pour en savoir plus sur le choix entre un cluster Dataproc ou une option sans serveur, consultez la documentation Dataproc sans serveur pour Spark par rapport à Dataproc sur Compute Engine .
Télécharger le fichier JAR du connecteur
Vous trouverez le code source du connecteur Bigtable Spark avec des exemples dans le dépôt GitHub du connecteur Bigtable Spark.
En fonction de votre configuration Spark, vous pouvez accéder au fichier JAR comme suit:
Si vous exécutez PySpark localement, vous devez télécharger le fichier JAR du connecteur à partir de l'emplacement Cloud Storage
gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
.Remplacez
SCALA_VERSION
par la version Scala, définissez2.12
comme seule version compatible etCONNECTOR_VERSION
par la version du connecteur que vous souhaitez utiliser.Pour l'option cluster Dataproc ou sans serveur, utilisez le dernier fichier JAR comme artefact pouvant être ajouté à vos applications Spark Scala ou Java. Pour en savoir plus sur l'utilisation du fichier JAR en tant qu'artefact, consultez Gérer les dépendances.
Si vous envoyez votre tâche PySpark à Dataproc, utilisez l'indicateur
gcloud dataproc jobs submit pyspark --jars
pour définir l'URI sur l'emplacement du fichier JAR dans Cloud Storage (par exemple,gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
).
Déterminer le type de calcul
Pour les jobs en lecture seule, vous pouvez utiliser le calcul sans serveur Data Boost (Preview), ce qui vous permet d'éviter d'affecter vos clusters de traitement des applications. Votre application Spark doit utiliser la version 1.1.0 ou ultérieure du connecteur Spark pour utiliser Data Boost.
Pour utiliser Data Boost, vous devez créer un profil d'application Data Boost, puis fournir l'ID du profil d'application pour l'option Spark spark.bigtable.app_profile.id
lorsque vous ajoutez votre configuration Bigtable à votre application Spark. Si vous avez déjà créé un profil d'application pour vos tâches de lecture Spark et que vous souhaitez continuer à l'utiliser sans modifier le code de votre application, vous pouvez le convertir en profil d'application Data Boost. Pour en savoir plus, consultez Convertir un profil d'application.
Pour en savoir plus, consultez la présentation de Data Boost pour Bigtable.
Pour les tâches impliquant des lectures et des écritures, vous pouvez utiliser les nœuds de cluster de votre instance pour le calcul en spécifiant un profil d'application standard avec votre requête.
Identifier ou créer un profil d'application à utiliser
Si vous ne spécifiez pas d'ID de profil d'application, le connecteur utilise le profil d'application par défaut.
Nous vous recommandons d'utiliser un profil d'application unique pour chaque application que vous exécutez, y compris votre application Spark. Pour en savoir plus sur les types et les paramètres de profil d'application, consultez la présentation des profils d'application. Pour obtenir des instructions, consultez la section Créer et configurer des profils d'application.
Ajouter une configuration Bigtable à votre application Spark
Dans votre application Spark, ajoutez les options Spark qui vous permettent d'interagir avec Bigtable.
Options Spark acceptées
Utilisez les options Spark disponibles dans le package com.google.cloud.spark.bigtable
.
Nom de l'option | Obligatoire | Valeur par défaut | Signification |
---|---|---|---|
spark.bigtable.project.id |
Oui | N/A | Définissez l'ID du projet Bigtable. |
spark.bigtable.instance.id |
Oui | N/A | Définissez l'ID de l'instance Bigtable. |
catalog |
Oui | N/A | Définissez le format JSON qui spécifie le format de conversion entre le schéma semblable à SQL du DataFrame et le schéma du tableau Bigtable. Pour en savoir plus, consultez Créer des métadonnées de table au format JSON. |
spark.bigtable.app_profile.id |
Non | default |
Définissez l'ID du profil d'application Bigtable. |
spark.bigtable.write.timestamp.milliseconds |
Non | Heure système actuelle | Définissez le code temporel en millisecondes à utiliser lors de l'écriture d'un DataFrame dans Bigtable. Notez que, comme toutes les lignes du DataFrame utilisent le même code temporel, les lignes ayant la même colonne de clé de ligne dans le DataFrame persistent en tant que version unique dans Bigtable, car elles partagent le même code temporel. |
spark.bigtable.create.new.table |
Non | false |
Définissez cette valeur sur true pour créer une table avant d'écrire dans Bigtable. |
spark.bigtable.read.timerange.start.milliseconds ou spark.bigtable.read.timerange.end.milliseconds |
Non | N/A | Définissez des codes temporels (en millisecondes depuis l'epoch) pour filtrer les cellules avec une date de début et de fin spécifiques, respectivement. |
spark.bigtable.push.down.row.key.filters |
Non | true |
Définissez cette valeur sur true pour autoriser un filtrage simple de la clé de ligne côté serveur. Le filtrage sur des clés de ligne composées est implémenté côté client.Pour en savoir plus, consultez Lire une ligne DataFrame spécifique à l'aide d'un filtre. |
spark.bigtable.read.rows.attempt.timeout.milliseconds |
Non | 30 min | Définissez la durée du temps limite pour une tentative de lecture de lignes correspondant à une partition DataFrame dans le client Bigtable pour Java. |
spark.bigtable.read.rows.total.timeout.milliseconds |
Non | 12 h | Définissez la durée du temps limite total pour une tentative de lecture de lignes correspondant à une partition DataFrame dans le client Bigtable pour Java. |
spark.bigtable.mutate.rows.attempt.timeout.milliseconds |
Non | 1 min | Définissez la durée du temps limite pour une tentative de modification des lignes correspondant à une partition DataFrame dans le client Bigtable pour Java. |
spark.bigtable.mutate.rows.total.timeout.milliseconds |
Non | 10 min | Définissez la durée du temps limite total pour une tentative de modification de lignes correspondant à une partition DataFrame dans le client Bigtable pour Java. |
spark.bigtable.batch.mutate.size |
Non | 100 |
Défini sur le nombre de mutations dans chaque lot. La valeur maximale que vous pouvez définir est 100000 . |
spark.bigtable.enable.batch_mutate.flow_control |
Non | false |
Définissez la valeur sur true pour activer le contrôle de flux pour les mutations par lot. |
Créer des métadonnées de table au format JSON
Le format de table DataFrames Spark SQL doit être converti en table Bigtable à l'aide d'une chaîne au format JSON. Ce format JSON de chaîne rend le format de données compatible avec Bigtable. Vous pouvez transmettre le format JSON dans le code de votre application à l'aide de l'option .option("catalog", catalog_json_string)
.
Prenons l'exemple de la table DataFrame suivante et de la table Bigtable correspondante.
Dans cet exemple, les colonnes name
et birthYear
du DataFrame sont regroupées sous la famille de colonnes info
et rebaptisées name
et birth_year
, respectivement. De même, la colonne address
est stockée dans la famille de colonnes location
avec le même nom de colonne. La colonne id
du DataFrame est convertie en clé de ligne Bigtable.
Les clés de ligne n'ont pas de nom de colonne dédié dans Bigtable. Dans cet exemple, id_rowkey
n'est utilisé que pour indiquer au connecteur qu'il s'agit de la colonne de clé de ligne. Vous pouvez utiliser n'importe quel nom pour la colonne de clé de ligne et vous assurer d'utiliser le même nom lorsque vous déclarez le champ "rowkey":"column_name"
au format JSON.
DataFrame | Table Bigtable = t1 | |||||||
Colonnes | Clé de ligne | Familles de colonnes | ||||||
infos | position | |||||||
Colonnes | Colonnes | |||||||
id | name | birthYear | adresse | id_rowkey | name | birth_year | adresse |
Le format JSON du catalogue est le suivant:
"""
{
"table": {"name": "t1"},
"rowkey": "id_rowkey",
"columns": {
"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
"name": {"cf": "info", "col": "name", "type": "string"},
"birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
"address": {"cf": "location", "col": "address", "type": "string"}
}
}
"""
Les clés et valeurs utilisées au format JSON sont les suivantes:
Clé de catalogue | Valeur du catalogue | Format JSON |
---|---|---|
table | Nom de la table Bigtable. | "table":{"name":"t1"} Si la table n'existe pas, utilisez .option("spark.bigtable.create.new.table", "true") pour en créer une. |
rowkey | Nom de la colonne qui sera utilisée comme clé de ligne Bigtable. Assurez-vous que le nom de la colonne du DataFrame est utilisé comme clé de ligne (par exemple, id_rowkey ). Les clés composées sont également acceptées comme clés de ligne. Par exemple, "rowkey":"name:address" . Cette approche peut entraîner des clés de ligne qui nécessitent un balayage complet de la table pour toutes les requêtes de lecture. |
"rowkey":"id_rowkey" , |
colonnes | Mappage de chaque colonne DataFrame dans la famille de colonnes Bigtable ("cf" ) et le nom de colonne ("col" ) correspondants. Le nom de colonne peut être différent de celui de la table DataFrame. Les types de données acceptés incluent string , long et binary . |
"columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}" Dans cet exemple, id_rowkey est la clé de ligne, et info et location sont les familles de colonnes. |
Types de données acceptés
Le connecteur prend en charge les types string
, long
et binary
(tableau d'octets) dans le catalogue. Tant que la prise en charge d'autres types tels que int
et float
n'est pas ajoutée, vous pouvez convertir manuellement ces types de données en tableaux d'octets (BinaryType
de Spark SQL) avant d'utiliser le connecteur pour les écrire dans Bigtable.
De plus, vous pouvez utiliser Avro pour sérialiser des types complexes, tels que ArrayType
. Pour en savoir plus, consultez Sérialiser des types de données complexes à l'aide d'Apache Avro.
Écrire dans Bigtable
Utilisez la fonction .write()
et les options compatibles pour écrire vos données dans Bigtable.
Java
Le code suivant du dépôt GitHub utilise Java et Maven pour écrire dans Bigtable.
String catalog = "{" +
"\"table\":{\"name\":\"" + tableName + "\"," +
"\"tableCoder\":\"PrimitiveType\"}," +
"\"rowkey\":\"wordCol\"," +
"\"columns\":{" +
"\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
"\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
"}}".replaceAll("\\s+", "");
…
private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
String createNewTable) {
dataframe
.write()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.option("spark.bigtable.create.new.table", createNewTable)
.save();
}
Python
Le code suivant du dépôt GitHub utilise Python pour écrire dans Bigtable.
catalog = ''.join(("""{
"table":{"name":" """ + bigtable_table_name + """
", "tableCoder":"PrimitiveType"},
"rowkey":"wordCol",
"columns":{
"word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
"count":{"cf":"example_family", "col":"countCol", "type":"long"}
}
}""").split())
…
input_data = spark.createDataFrame(data)
print('Created the DataFrame:')
input_data.show()
input_data.write \
.format('bigtable') \
.options(catalog=catalog) \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.option('spark.bigtable.create.new.table', create_new_table) \
.save()
print('DataFrame was written to Bigtable.')
…
Lire à partir de Bigtable
Utilisez la fonction .read()
pour vérifier si la table a bien été importée dans Bigtable.
Java
…
private static Dataset<Row> readDataframeFromBigtable(String catalog) {
Dataset<Row> dataframe = spark
.read()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.load();
return dataframe;
}
Python
…
records = spark.read \
.format('bigtable') \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.options(catalog=catalog) \
.load()
print('Reading the DataFrame from Bigtable:')
records.show()
Compiler votre projet
Générez le fichier JAR utilisé pour exécuter une tâche dans un cluster Dataproc, Dataproc sans serveur ou une instance Spark locale. Vous pouvez compiler le fichier JAR localement, puis l'utiliser pour envoyer une tâche. Le chemin d'accès au fichier JAR compilé est défini comme variable d'environnement PATH_TO_COMPILED_JAR
lorsque vous envoyez une tâche.
Cette étape ne s'applique pas aux applications PySpark.
Gérer les dépendances
Le connecteur Bigtable Spark est compatible avec les outils de gestion des dépendances suivants:
Compiler le fichier JAR
Maven
Ajoutez la dépendance
spark-bigtable
à votre fichier pom.xml.<dependencies> <dependency> <groupId>com.google.cloud.spark.bigtable</groupId> <artifactId>spark-bigtable_SCALA_VERSION</artifactId> <version>0.1.0</version> </dependency> </dependencies>
Ajoutez le plug-in Maven Shade à votre fichier
pom.xml
pour créer un fichier Uber JAR:<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins>
Exécutez la commande
mvn clean install
pour générer un fichier JAR.
sbt
Ajoutez la dépendance
spark-bigtable
à votre fichierbuild.sbt
:libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
Ajoutez le plug-in
sbt-assembly
à votre fichierproject/plugins.sbt
ouproject/assembly.sbt
pour créer un fichier Uber JAR.addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
Exécutez la commande
sbt clean assembly
pour générer le fichier JAR.
Gradle
Ajoutez la dépendance
spark-bigtable
à votre fichierbuild.gradle
.dependencies { implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0' }
Ajoutez le plug-in Shadow dans votre fichier
build.gradle
pour créer un fichier JAR Uber:plugins { id 'com.github.johnrengelman.shadow' version '8.1.1' id 'java' }
Pour en savoir plus sur la configuration et la compilation JAR, consultez la documentation du plug-in Shadow.
Envoyer une tâche
Envoyez une tâche Spark à l'aide de Dataproc, de Dataproc sans serveur ou d'une instance Spark locale pour lancer votre application.
Définir l'environnement d'exécution
Définissez les variables d'environnement suivantes.
#Google Cloud
export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE
#Dataproc Serverless
export BIGTABLE_SPARK_SUBNET=SUBNET
export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME
#Scala/Java
export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR
#PySpark
export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR
Remplacez les éléments suivants :
- PROJECT_ID: identifiant permanent du projet Bigtable.
- INSTANCE_ID: identifiant permanent de l'instance Bigtable.
- TABLE_NAME: identifiant permanent de la table.
- DATAPROC_CLUSTER: identifiant permanent du cluster Dataproc.
- DATAPROC_REGION: région Dataproc contenant l'un des clusters de votre instance Dataproc (par exemple,
northamerica-northeast2
). - DATAPROC_ZONE: zone dans laquelle le cluster Dataproc s'exécute.
- SUBNET: chemin d'accès complet de la ressource du sous-réseau.
- GCS_BUCKET_NAME: bucket Cloud Storage permettant d'importer les dépendances de la charge de travail Spark.
- PATH_TO_COMPILED_JAR: chemin d'accès complet ou relatif au fichier JAR compilé (par exemple,
/path/to/project/root/target/<compiled_JAR_name>
pour Maven). - GCS_PATH_TO_CONNECTOR_JAR: bucket Cloud Storage
gs://spark-lib/bigtable
, dans lequel se trouve le fichierspark-bigtable_SCALA_VERSION_CONNECTOR_VERSION.jar
. - PATH_TO_PYTHON_FILE: pour les applications PySpark, chemin d'accès au fichier Python qui sera utilisé pour écrire et lire des données dans Bigtable.
- LOCAL_PATH_TO_CONNECTOR_JAR: pour les applications PySpark, chemin d'accès au fichier JAR du connecteur Spark Bigtable téléchargé.
Envoyer une tâche Spark
Pour les instances Dataproc ou votre configuration Spark locale, exécutez un job Spark pour importer des données dans Bigtable.
Cluster Dataproc
Utilisez le fichier JAR compilé et créez une tâche de cluster Dataproc qui lit et écrit des données depuis et vers Bigtable.
Créez un cluster Dataproc. L'exemple suivant montre une commande permettant de créer un cluster Dataproc v2.0 avec Debian 10, deux nœuds de calcul et des configurations par défaut.
gcloud dataproc clusters create \ $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \ --zone $BIGTABLE_SPARK_DATAPROC_ZONE \ --master-machine-type n2-standard-4 --master-boot-disk-size 500 \ --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \ --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
Envoyez une tâche.
Scala/Java
L'exemple suivant montre la classe
spark.bigtable.example.WordCount
qui inclut la logique permettant de créer une table de test dans DataFrame, d'écrire la table dans Bigtable, puis de compter le nombre de mots dans la table.gcloud dataproc jobs submit spark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --class=spark.bigtable.example.WordCount \ --jar=$PATH_TO_COMPILED_JAR \ -- \ $BIGTABLE_SPARK_PROJECT_ID \ $BIGTABLE_SPARK_INSTANCE_ID \ $BIGTABLE_SPARK_TABLE_NAME \
PySpark
gcloud dataproc jobs submit pyspark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --jars=$GCS_PATH_TO_CONNECTOR_JAR \ --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \ $PATH_TO_PYTHON_FILE \ -- \ --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \ --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \ --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
Dataproc sans serveur
Utilisez le fichier JAR compilé et créez une tâche Dataproc qui lit et écrit des données depuis et vers Bigtable avec une instance Dataproc Serverless.
Scala/Java
gcloud dataproc batches submit spark \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
-- \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
--jars=$GCS_PATH_TO_CONNECTOR_JAR \
--properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
-- \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
Étincelle locale
Utilisez le fichier JAR téléchargé et créez un job Spark qui lit et écrit des données à partir de Bigtable et vers Bigtable avec une instance Spark locale. Vous pouvez également utiliser l'émulateur Bigtable pour envoyer la tâche Spark.
Utiliser l'émulateur Bigtable
Si vous décidez d'utiliser l'émulateur Bigtable, procédez comme suit:
Exécutez la commande suivante pour démarrer l'émulateur :
gcloud beta emulators bigtable start
Par défaut, l'émulateur choisit la paire hôte/port
localhost:8086
.Définissez la variable d'environnement
BIGTABLE_EMULATOR_HOST
:export BIGTABLE_EMULATOR_HOST=localhost:8086
Pour en savoir plus sur l'utilisation de l'émulateur Bigtable, consultez Effectuer des tests à l'aide de l'émulateur.
Envoyer une tâche Spark
Utilisez la commande spark-submit
pour envoyer une tâche Spark, que vous utilisiez ou non un émulateur Bigtable local.
Scala/Java
spark-submit $PATH_TO_COMPILED_JAR \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
spark-submit \
--jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
--packages=org.slf4j:slf4j-reload4j:1.7.36 \
$PATH_TO_PYTHON_FILE \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
Vérifier les données de la table
Exécutez la commande CLI cbt
suivante pour vérifier que les données sont écrites dans Bigtable. La CLI cbt
est un composant de la Google Cloud CLI. Pour en savoir plus, consultez la
présentation de la CLI cbt
.
cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
read $BIGTABLE_SPARK_TABLE_NAME
Solutions supplémentaires
Utilisez le connecteur Bigtable Spark pour des solutions spécifiques, telles que la sérialisation de types Spark SQL complexes, la lecture de lignes spécifiques et la génération de métriques côté client.
Lire une ligne de DataFrame spécifique à l'aide d'un filtre
Lorsque vous utilisez des DataFrames pour lire à partir de Bigtable, vous pouvez spécifier un filtre pour ne lire que des lignes spécifiques. Les filtres simples tels que ==
, <=
et startsWith
sur la colonne de clé de ligne sont appliqués côté serveur pour éviter une analyse complète de la table. Les filtres sur les clés de ligne composées ou les filtres complexes tels que le filtre LIKE
sur la colonne de clé de ligne sont appliqués côté client.
Si vous lisez de grandes tables, nous vous recommandons d'utiliser des filtres de clé de ligne simples pour éviter d'effectuer une analyse complète de la table. L'exemple d'instruction suivant montre comment lire à l'aide d'un filtre simple. Assurez-vous que, dans votre filtre Spark, vous utilisez le nom de la colonne DataFrame convertie en clé de ligne:
dataframe.filter("id == 'some_id'").show()
Lorsque vous appliquez un filtre, utilisez le nom de la colonne DataFrame au lieu du nom de la colonne de la table Bigtable.
Sérialiser des types de données complexes à l'aide d'Apache Avro
Le connecteur Bigtable Spark permet d'utiliser Apache Avro pour sérialiser des types SQL Spark complexes, tels que ArrayType
, MapType
ou StructType
. Apache Avro fournit une sérialisation des données pour les données d'enregistrement couramment utilisées pour traiter et stocker des structures de données complexes.
Utilisez une syntaxe telle que "avro":"avroSchema"
pour spécifier qu'une colonne de Bigtable doit être encodée à l'aide d'Avro. Vous pouvez ensuite utiliser .option("avroSchema", avroSchemaString)
lors de la lecture ou de l'écriture dans Bigtable pour spécifier le schéma Avro correspondant à cette colonne au format de chaîne. Vous pouvez utiliser différents noms d'options, par exemple "anotherAvroSchema"
pour différentes colonnes et transmettre des schémas Avro pour plusieurs colonnes.
def catalogWithAvroColumn = s"""{
|"table":{"name":"ExampleAvroTable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
Utiliser des métriques côté client
Étant donné que le connecteur Bigtable Spark est basé sur le client Bigtable pour Java, les métriques côté client sont activées par défaut dans le connecteur. Pour en savoir plus sur l'accès et l'interprétation de ces métriques, consultez la documentation sur les métriques côté client.
Utiliser le client Bigtable pour Java avec des fonctions RDD de bas niveau
Étant donné que le connecteur Bigtable Spark est basé sur le client Bigtable pour Java, vous pouvez utiliser directement le client dans vos applications Spark et effectuer des requêtes de lecture ou d'écriture distribuées dans les fonctions RDD de bas niveau telles que mapPartitions
et foreachPartition
.
Pour utiliser le client Bigtable pour les classes Java, ajoutez le préfixe com.google.cloud.spark.bigtable.repackaged
aux noms de package. Par exemple, au lieu d'utiliser le nom de classe com.google.cloud.bigtable.data.v2.BigtableDataClient
, utilisez com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient
.
Pour en savoir plus sur le client Bigtable pour Java, consultez la page Client Bigtable pour Java.
Étape suivante
- Découvrez comment optimiser votre tâche Spark dans Dataproc.
- Utilisez les classes du client Bigtable pour Java avec le connecteur Bigtable Spark.