Écrire des messages Pub/Sub Lite à l'aide d'Apache Spark
Le connecteur Spark Pub/Sub Lite est une bibliothèque cliente Java Open Source qui permet d'utiliser Pub/Sub Lite en tant que source d'entrée et de sortie pour Apache Spark Structured Streaming. Le connecteur fonctionne dans toutes les distributions Apache Spark, y compris Dataproc.
Ce guide de démarrage rapide vous montre comment :
- Lire les messages de Pub/Sub Lite
- Écrire des messages dans Pub/Sub Lite
Utiliser PySpark à partir d'un cluster Spark Dataproc
Avant de commencer
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataproc, Cloud Storage, Logging APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
Prérequis
Créez des variables pour votre projet.
export PROJECT_ID=$(gcloud config get-value project)
export PROJECT_NUMBER=$(gcloud projects list \ --filter="projectId:$PROJECT_ID" \ --format="value(PROJECT_NUMBER)")
Créez un bucket Cloud Storage. Les noms des buckets Cloud Storage doivent être uniques.
export BUCKET=your-bucket-name
gcloud storage buckets create gs://$BUCKET
Créez un sujet et un abonnement Pub/Sub Lite dans un emplacement compatible. Consultez Créer un sujet si vous utilisez une réservation Pub/Sub Lite.
export TOPIC=your-lite-topic-id
export SUBSCRIPTION=your-lite-subscription-id
export PUBSUBLITE_LOCATION=your-lite-location
gcloud pubsub lite-topics create $TOPIC \ --location=$PUBSUBLITE_LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$PUBSUBLITE_LOCATION \ --topic=$TOPIC
Créez un cluster Dataproc.
export DATAPROC_REGION=your-dataproc-region
export CLUSTER_ID=your-dataproc-cluster-id
gcloud dataproc clusters create $CLUSTER_ID \ --region $DATAPROC_REGION \ --image-version 2.1 \ --scopes 'https://www.googleapis.com/auth/cloud-platform' \ --enable-component-gateway \ --bucket $BUCKET
--region
: région Dataproc compatible avec votre sujet et votre abonnement Pub/Sub Lite--image-version
: version d'image du cluster, qui détermine la version d'Apache Spark installée sur le cluster. Choisissez des versions de version d'image 2.x.x, car le connecteur Spark/Pub/Sub Lite est actuellement compatible avec Apache Spark 3.x.x.--scopes
: active l'accès API aux services Google Cloud dans le même projet.--enable-component-gateway
: permet d'accéder à l'interface utilisateur Web d'Apache Spark.--bucket
: bucket Cloud Storage de préproduction utilisé pour stocker les dépendances de tâches du cluster, les résultats du pilote et les fichiers de configuration du cluster.
Clonez le dépôt du guide de démarrage rapide et accédez au répertoire de l'exemple de code :
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
cd python-docs-samples/pubsublite/spark-connector/
Écrire dans Pub/Sub Lite
L'exemple ci-dessous permet d'effectuer les opérations suivantes :
- Créer une source de débit qui génère des nombres consécutifs et des horodatages au format
spark.sql.Row
- Transformer les données pour qu'elles correspondent au schéma de table requis par l'API
writeStream
du connecteur Spark Pub/Sub Lite - Écrire les données dans un sujet Pub/Sub Lite existant
Pour envoyer la tâche d'écriture à Dataproc, procédez comme suit :
Console
- Importez le script PySpark dans votre bucket Cloud Storage.
- Accédez à la console Cloud Storage.
- Sélectionnez votre bucket.
- Utilisez l'option Importer des fichiers pour importer le script PySpark que vous souhaitez utiliser.
- Envoyez la tâche à votre cluster Dataproc :
- Accédez à la console Dataproc.
- Accédez aux tâches.
- Cliquez sur Submit job (Envoyer la tâche).
- Renseignez les détails de la tâche.
- Sous Cluster, choisissez votre cluster.
- Sous Job (Tâche), attribuez un nom à l'ID de tâche.
- Dans le champ Job Type (Type de tâche), sélectionnez PySpark.
- Dans le champ Main python file (Fichier Python principal), indiquez l'URI gcloud Storage du script PySpark importé qui commence par
gs://
. - Pour les fichiers JAR, choisissez la dernière version du connecteur Spark dans Maven, recherchez le fichier JAR avec des dépendances dans les options de téléchargement, puis copiez son lien.
- Pour Arguments, si vous utilisez le script PySpark complet de GitHub, saisissez
--project_number=
PROJECT_NUMBER,--location=
PUBSUBLITE_LOCATION,--topic_id=
TOPIC_ID. Si vous copiez le script PySpark ci-dessus avec les tâches terminées, laissez le champ vide. - Sous Properties (Propriétés), saisissez la clé
spark.master
et la valeuryarn
. - Cliquez sur Envoyer.
gcloud
Exécutez la commande gcloud dataproc jobs submit pyspark pour envoyer la tâche à Dataproc :
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --topic_id=$TOPIC
--region
: région Dataproc présélectionnée--cluster
: nom du cluster Dataproc.--jars
: fichier Uber du connecteur Pub/Sub Spark Spark avec des dépendances dans un bucket Cloud Storage public. Vous pouvez également cliquer sur ce lien pour télécharger le fichier JAR Uber avec les dépendances de Maven.--driver-log-levels
: définissez le niveau de journalisation sur INFO au niveau racine.--properties
: utilisez le gestionnaire de ressources YARN pour le maître Spark.--
: fournit les arguments requis par le script.
Si l'opération writeStream
aboutit, vous devriez voir les messages de journal tels que le suivant en local et sur la page des détails de la tâche dans la console Google Cloud:
INFO com.google.cloud.pubsublite.spark.PslStreamWriter: Committed 1 messages for epochId ..
Lire à partir de Pub/Sub Lite
L'exemple suivant lit les messages d'un abonnement Pub/Sub Lite existant à l'aide de l'API readStream
. Le connecteur affichera les messages conformes au schéma de la table fixe au format spark.sql.Row
.
Pour envoyer la tâche de lecture à Dataproc, procédez comme suit :
Console
- Importez le script PySpark dans votre bucket Cloud Storage.
- Accédez à la console Cloud Storage.
- Sélectionnez votre bucket.
- Utilisez l'option Importer des fichiers pour importer le script PySpark que vous souhaitez utiliser.
- Envoyez la tâche à votre cluster Dataproc :
- Accédez à la console Dataproc.
- Accédez aux tâches.
- Cliquez sur Submit job (Envoyer la tâche).
- Renseignez les détails de la tâche.
- Sous Cluster, choisissez votre cluster.
- Sous Job (Tâche), attribuez un nom à l'ID de tâche.
- Dans le champ Job Type (Type de tâche), sélectionnez PySpark.
- Dans le champ Main python file (Fichier Python principal), indiquez l'URI gcloud Storage du script PySpark importé qui commence par
gs://
. - Pour les fichiers JAR, choisissez la dernière version du connecteur Spark dans Maven, recherchez le fichier JAR avec des dépendances dans les options de téléchargement, puis copiez son lien.
- Pour Arguments, si vous utilisez le script PySpark complet de GitHub, saisissez
--project_number=
PROJECT_NUMBER,--location=
PUBSUBLITE_LOCATION,--subscription_id=
SUBSCRIPTION_ID. Si vous copiez le script PySpark ci-dessus avec les tâches terminées, laissez le champ vide. - Sous Properties (Propriétés), saisissez la clé
spark.master
et la valeuryarn
. - Cliquez sur Envoyer.
gcloud
Exécutez la commande gcloud dataproc jobs submit pyspark pour envoyer à nouveau la tâche à Dataproc :
gcloud dataproc jobs submit pyspark spark_streaming_to_pubsublite_example.py \
--region=$DATAPROC_REGION \
--cluster=$CLUSTER_ID \
--jars=gs://spark-lib/pubsublite/pubsublite-spark-sql-streaming-LATEST-with-dependencies.jar \
--driver-log-levels=root=INFO \
--properties=spark.master=yarn \
-- --project_number=$PROJECT_NUMBER --location=$PUBSUBLITE_LOCATION --subscription_id=$SUBSCRIPTION
--region
: région Dataproc présélectionnée--cluster
: nom du cluster Dataproc.--jars
: fichier Uber du connecteur Pub/Sub Spark Spark avec des dépendances dans un bucket Cloud Storage public. Vous pouvez également cliquer sur ce lien pour télécharger le fichier JAR Uber avec les dépendances de Maven.--driver-log-levels
: définissez le niveau de journalisation sur INFO au niveau racine.--properties
: utilisez le gestionnaire de ressources YARN pour le maître Spark.--
: fournit les arguments requis pour le script.
Si l'opération readStream
aboutit, vous devriez voir les messages de journal tels que le suivant en local et sur la page des détails de la tâche dans la console Google Cloud:
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
| subscription|partition|offset|key|data| publish_timestamp| event_timestamp|attributes|
+--------------------+---------+------+---+----+--------------------+--------------------+----------+
|projects/50200928...| 0| 89523| 0| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89524| 1| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
|projects/50200928...| 0| 89525| 2| .|2021-09-03 23:01:...|2021-09-03 22:56:...| []|
Rouvrir et supprimer définitivement des messages de Pub/Sub Lite
Les opérations de recherche ne fonctionnent pas lors de la lecture à partir de Pub/Sub Lite à l'aide du connecteur Spark Pub/Sub Lite, car les systèmes Apache Spark effectuent leur propre suivi des décalages au sein des partitions. La solution consiste à vider, rechercher et redémarrer les workflows.
Effectuer un nettoyage
Pour éviter que les ressources utilisées sur cette page soient facturées sur votre compte Google Cloud, procédez comme suit :
Supprimez le sujet et l'abonnement.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
Supprimez le cluster Dataproc.
gcloud dataproc clusters delete $CLUSTER_ID --region=$DATAPROC_REGION
Supprimez le bucket Cloud Storage.
gcloud storage rm gs://$BUCKET
Étape suivante
Consultez l'exemple de comptage de mots en Java pour le connecteur Spark Pub/Sub Lite.
Découvrez comment accéder aux résultats du pilote de tâches Dataproc.
Découvrez d'autres connecteurs Spark par produits Google Cloud : connecteur BigQuery, connecteur Bigtable, connecteur Cloud Storage.