Ce tutoriel utilise le modèle abonnement Pub/Sub vers BigQuery pour créer et exécuter un job de modèle Dataflow à l'aide de la console Google Cloud ou de Google Cloud CLI. Il présente un exemple de pipeline de streaming qui lit les messages encodés en JSON à partir de Pub/Sub et les écrit dans une table BigQuery.
Les pipelines d'analyse de flux et d'intégration de données utilisent Pub/Sub pour ingérer et distribuer les données. Pub/Sub vous permet de créer des systèmes de producteurs et de consommateurs d'événements, appelés éditeurs et abonnés. Les éditeurs envoient des événements au service Pub/Sub de manière asynchrone, et Pub/Sub transmet les événements à tous les services qui doivent y réagir.
Dataflow est un service entièrement géré permettant de transformer et d'enrichir les données par flux (en temps réel) et par lots. Il fournit un environnement de développement de pipeline simplifié qui utilise le SDK Apache Beam pour transformer les données entrantes et générer les données transformées.
L'avantage de ce workflow est que vous pouvez utiliser des UDF pour transformer les données des messages avant de les écrire dans BigQuery.
Avant d'exécuter un pipeline Dataflow pour ce scénario, déterminez si un abonnement Pub/Sub BigQuery avec une UDF répond à vos besoins.
Objectifs
- Créer un sujet Pub/Sub
- Créer un ensemble de données BigQuery avec une table et un schéma
- Utiliser un modèle de streaming fourni par Google pour diffuser des données en streaming de votre abonnement Pub/Sub vers BigQuery à l'aide de Dataflow.
Coûts
Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :
- Dataflow
- Pub/Sub
- Cloud Storage
- BigQuery
Pour obtenir une estimation des coûts en fonction de votre utilisation prévue, utilisez le simulateur de coût.
Une fois que vous avez terminé les tâches décrites dans ce document, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez la section Effectuer un nettoyage.
Avant de commencer
Cette section explique comment sélectionner un projet, activer des API et attribuer les rôles appropriés au compte utilisateur et au compte de service du nœud de calcul.
Console
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
Pour suivre la procédure décrite dans ce tutoriel, votre compte utilisateur doit disposer du rôle Utilisateur du compte de service. Le compte de service Compute Engine par défaut doit disposer des rôles suivants : Nœud de calcul Dataflow, Administrateur Dataflow, Éditeur Pub/Sub, Administrateur des objets de l'espace de stockage et Éditeur de données BigQuery. Pour ajouter les rôles requis dans la console Google Cloud :
Dans la console Google Cloud , accédez à la page IAM.
Accéder à IAM- Sélectionnez votre projet.
- Sur la ligne contenant votre compte utilisateur, cliquez sur Modifier le compte principal, puis sur Ajouter un autre rôle.
- Dans la liste déroulante, sélectionnez le rôle Utilisateur du compte de service.
- Sur la ligne contenant le compte de service Compute Engine par défaut, cliquez sur Modifier le compte principal, puis sur Ajouter un autre rôle.
- Dans la liste déroulante, sélectionnez le rôle Nœud de calcul Dataflow.
Répétez la procédure pour les rôles Administrateur Dataflow, Éditeur Pub/Sub, Administrateur des objets de l'espace de stockage et Éditeur de données BigQuery, puis cliquez surEnregistrer.
Pour en savoir plus sur l'attribution de rôles, consultez la page Attribuer un rôle IAM à l'aide de la console.
gcloud
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Attribuez des rôles à votre compte de service Compute Engine par défaut. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
Remplacez les éléments suivants :
PROJECT_ID
: ID de votre projet.PROJECT_NUMBER
: votre numéro de projet. Pour trouver votre numéro de projet, utilisez la commandegcloud projects describe
.SERVICE_ACCOUNT_ROLE
: chaque rôle individuel.
Créer un bucket Cloud Storage
Commencez par créer un bucket Cloud Storage à l'aide de la console Google Cloud ou de Google Cloud CLI. Le pipeline Dataflow utilise ce bucket comme emplacement de stockage temporaire.
Console
Dans la console Google Cloud , accédez à la page Buckets Cloud Storage.
Cliquez sur Créer.
Sur la page Créer un bucket, sous Nommer votre bucket, saisissez un nom qui respecte les règles de dénomination des buckets. Les noms des buckets Cloud Storage doivent être uniques. Ne sélectionnez pas les autres options.
Cliquez sur Créer.
gcloud
Exécutez la commande gcloud storage buckets create
:
gcloud storage buckets create gs://BUCKET_NAME
Remplacez BUCKET_NAME
par un nom qui respecte les règles de dénomination des buckets pour votre bucket Cloud Storage.
Les noms des buckets Cloud Storage doivent être uniques.
Créer un sujet et un abonnement Pub/Sub
Créez un sujet Pub/Sub, puis créez un abonnement associé à ce sujet.
Console
Pour créer un sujet, procédez comme suit.
Dans la console Google Cloud , accédez à la page Sujets de Pub/Sub.
Cliquez sur Create topic (Créer un sujet).
Dans le champ ID du sujet, saisissez un ID pour votre sujet. Pour savoir comment nommer un sujet, consultez la section Consignes de dénomination d'un sujet ou d'un abonnement.
Conservez l'option Ajouter un abonnement par défaut. Ne sélectionnez pas les autres options.
Cliquez sur Créer.
- Sur la page des détails du sujet, le nom de l'abonnement créé est indiqué sous ID de l'abonnement. Notez cette valeur pour les étapes ultérieures.
gcloud
Pour créer un sujet, exécutez la commande gcloud pubsub topics create
. Pour savoir comment nommer un abonnement, consultez la section Consignes de dénomination d'un sujet ou d'un abonnement.
gcloud pubsub topics create TOPIC_ID
Remplacez TOPIC_ID
par le nom de votre sujet Pub/Sub.
Pour créer un abonnement associé à votre sujet, exécutez la commande gcloud pubsub subscriptions create
:
gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID
Remplacez SUBSCRIPTION_ID
par le nom de votre abonnement Pub/Sub.
Créer une table BigQuery
Au cours de cette étape, vous allez créer une table BigQuery avec le schéma suivant :
Nom de la colonne | Type de données |
---|---|
name |
STRING |
customer_id |
INTEGER |
Si vous n'avez pas encore d'ensemble de données BigQuery, commencez par en créer un. Pour en savoir plus, consultez Créer des ensembles de données. Créez ensuite une table vide :
Console
Accédez à la page BigQuery.
Dans le volet Explorateur, développez votre projet, puis sélectionnez un ensemble de données.
Dans la section Informations sur l'ensemble de données, cliquez sur
Créer une table.Dans la liste Créer une table à partir de, sélectionnez Table vide.
Dans le champ Table, saisissez le nom de la table.
Dans la section Schéma, cliquez sur Modifier sous forme de texte.
Collez la définition de schéma suivante :
name:STRING, customer_id:INTEGER
Cliquez sur Créer une table.
gcloud
Exécutez la commande bq mk
.
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
Remplacez les éléments suivants :
PROJECT_ID
: ID de votre projetDATASET_NAME
: nom de l'ensemble de donnéesTABLE_NAME
: nom de la table à créer
Exécuter le pipeline
Exécutez un pipeline de flux de données à l'aide du modèle d'abonnement Pub/Sub vers BigQuery fourni par Google. Le pipeline récupère les données entrantes du sujet Pub/Sub et les génère dans votre ensemble de données BigQuery.
Console
Dans la console Google Cloud , accédez à la page Tâches Dataflow.
Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).
Saisissez un nom de tâche pour votre tâche Dataflow.
Pour le Point de terminaison régional, sélectionnez une région pour votre job Dataflow.
Pour Modèle Dataflow, sélectionnez le modèle Abonnement Pub/Sub vers BigQuery.
Pour Table BigQuery de sortie, sélectionnez Parcourir, puis sélectionnez votre table BigQuery.
Dans la liste Abonnement d'entrée Pub/Sub, sélectionnez l'abonnement Pub/Sub.
Dans le champ Emplacement temporaire, saisissez ce qui suit :
gs://BUCKET_NAME/temp/
Remplacez
BUCKET_NAME
par le nom de votre bucket Cloud Storage. Le dossiertemp
stocke les fichiers temporaires des jobs Dataflow.Cliquez sur Run Job (Exécuter la tâche).
gcloud
Pour exécuter le modèle dans votre shell ou votre terminal, utilisez la commande gcloud dataflow jobs run
.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
--region DATAFLOW_REGION \
--staging-location gs://BUCKET_NAME/temp \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME
Remplacez les variables suivantes :
JOB_NAME
: nom de la tâcheDATAFLOW_REGION
: région du jobPROJECT_ID
: nom de votre Google Cloud projetSUBSCRIPTION_ID
: nom de votre abonnement Pub/SubDATASET_NAME
: nom de votre ensemble de données BigQueryTABLE_NAME
: nom de votre table BigQuery
Publier des messages dans Pub/Sub
Une fois la tâche Dataflow lancée, vous pouvez publier des messages dans Pub/Sub. Le pipeline les écrit dans BigQuery.
Console
Dans la console Google Cloud , accédez à la page Pub/Sub > Sujets.
Dans la liste des thèmes, cliquez sur le nom de votre thème.
Cliquez sur Messages.
Cliquez sur Publier des messages.
Dans le champ Nombre de messages, saisissez
10
.Dans le champ Corps du message, saisissez
{"name": "Alice", "customer_id": 1}
.Cliquez sur Publier.
gcloud
Pour publier des messages dans votre sujet, utilisez la commande gcloud pubsub topics publish
.
for run in {1..10}; do
gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done
Remplacez TOPIC_ID
par le nom de votre sujet.
Afficher les résultats
Affichez les données écrites dans votre table BigQuery. L'affichage des données dans votre tableau peut prendre jusqu'à une minute.
Console
Dans la console Google Cloud , accédez à la page BigQuery.
Accéder à la page BigQueryDans l'éditeur de requête, saisissez la requête suivante :
SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME` LIMIT 1000
Remplacez les variables suivantes :
PROJECT_ID
: nom de votre Google CloudprojetDATASET_NAME
: nom de votre ensemble de données BigQueryTABLE_NAME
: nom de votre table BigQuery
gcloud
Vérifiez les résultats dans BigQuery en exécutant la requête suivante :
bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'
Remplacez les variables suivantes :
PROJECT_ID
: nom de votre Google CloudprojetDATASET_NAME
: nom de votre ensemble de données BigQueryTABLE_NAME
: nom de votre table BigQuery
Utiliser une UDF pour transformer les données
Ce tutoriel part du principe que les messages Pub/Sub sont au format JSON et que le schéma de la table BigQuery correspond aux données JSON.
Vous pouvez éventuellement fournir une fonction définie par l'utilisateur (UDF) en JavaScript qui transforme les données avant leur écriture dans BigQuery. Cette fonction permet d'effectuer un traitement supplémentaire, tel que le filtrage, la suppression des informations permettant d'identifier personnellement l'utilisateur ou l'enrichissement des données à l'aide de champs supplémentaires.
Pour en savoir plus, consultez Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.
Utiliser une table de lettres mortes
Pendant l'exécution du job, il est possible que le pipeline ne parvienne pas à écrire des messages individuels dans BigQuery. Voici les erreurs possibles :
- Erreurs de sérialisation, y compris un format JSON mal formaté.
- Erreurs de conversion de type, causées par une incohérence dans le schéma de la table et les données JSON.
- Champs supplémentaires dans les données JSON qui ne sont pas présents dans le schéma de la table.
Le pipeline écrit ces erreurs dans une table de lettres mortes dans BigQuery. Par défaut, le pipeline crée automatiquement une table de lettres mortes nommée TABLE_NAME_error_records
, où TABLE_NAME
correspond au nom de la table de sortie.
Pour utiliser un autre nom, définissez le paramètre de modèle outputDeadletterTable
.
Effectuer un nettoyage
Pour éviter que les ressources utilisées lors de ce tutoriel soient facturées sur votre compte Google Cloud, supprimez le projet contenant les ressources, ou conservez le projet et supprimez les ressources individuelles.
Supprimer le projet
Le moyen le plus simple d'empêcher la facturation est de supprimer le projet Google Cloud que vous avez créé pour ce tutoriel.
Console
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
gcloud
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Supprimer les ressources individuelles
Si vous souhaitez réutiliser le projet ultérieurement, vous pouvez le conserver, mais supprimez les ressources que vous avez créées au cours du tutoriel.
Arrêter le pipeline Dataflow
Console
Dans la console Google Cloud , accédez à la page Tâches Dataflow.
Cliquez sur la tâche que vous souhaitez arrêter.
Pour arrêter une tâche, son état doit être en cours d'exécution.
Sur la page des détails de la tâche, cliquez sur Arrêter.
Cliquez sur Annuler.
Pour confirmer votre choix, cliquez sur Arrêter la tâche.
gcloud
Pour annuler votre tâche Dataflow, utilisez la commande gcloud dataflow jobs
.
gcloud dataflow jobs list \
--filter 'NAME=JOB_NAME AND STATE=Running' \
--format 'value(JOB_ID)' \
--region "DATAFLOW_REGION" \
| xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"
Nettoyer les ressources du Google Cloud projet
Console
Supprimez le sujet et l'abonnement Pub/Sub.
Accédez à la page Sujets Pub/Sub dans la console Google Cloud .
Sélectionnez le sujet que vous avez créé.
Cliquez sur Delete (Supprimer) pour supprimer définitivement le sujet.
Accédez à la page Abonnements Pub/Sub dans la console Google Cloud .
Sélectionnez l'abonnement créé avec votre sujet.
Cliquez sur Supprimer pour supprimer définitivement l'abonnement.
Supprimez la table et l'ensemble de données BigQuery.
Dans la console Google Cloud , accédez à la page BigQuery.
Sur le panneau Explorateur, développez votre projet.
À côté de l'ensemble de données que vous souhaitez supprimer, cliquez sur
Afficher les actions, puis sur Supprimer.
Supprimez le bucket Cloud Storage.
Dans la console Google Cloud , accédez à la page Buckets Cloud Storage.
Sélectionnez le bucket que vous souhaitez supprimer, cliquez sur
Supprimer, puis suivez les instructions.
gcloud
Pour supprimer l'abonnement et le sujet Pub/Sub, utilisez les commandes
gcloud pubsub subscriptions delete
etgcloud pubsub topics delete
.gcloud pubsub subscriptions delete SUBSCRIPTION_ID gcloud pubsub topics delete TOPIC_ID
Pour supprimer la table BigQuery, utilisez la commande
bq rm
.bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
Supprimez l'ensemble de données BigQuery. L'ensemble de données seul ne génère aucuns frais.
bq rm -r -f -d PROJECT_ID:tutorial_dataset
Pour supprimer le bucket Cloud Storage et ses objets, utilisez la commande
gcloud storage rm
. Le bucket seul ne génère aucuns frais.gcloud storage rm gs://BUCKET_NAME --recursive
Révoquer les identifiants
Console
Si vous conservez votre projet, révoquez les rôles que vous avez accordés au compte de service Compute Engine par défaut.
- Dans la console Google Cloud , accédez à la page IAM.
Sélectionnez un projet, un dossier ou une organisation.
Recherchez la ligne contenant le compte principal dont vous souhaitez révoquer l'accès. Sur cette ligne, cliquez sur
Modifier le compte principal.Cliquez sur le bouton Supprimer (
) pour chaque rôle que vous souhaitez révoquer, puis cliquez sur Enregistrer.
gcloud
- Si vous conservez votre projet, révoquez les rôles que vous avez accordés au compte de service Compute Engine par défaut. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \ --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \ --role=<var>ROLE</var>
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
Étapes suivantes
- Étendez votre modèle Dataflow avec des fonctions définies par l'utilisateur.
- Apprenez-en plus sur l'utilisation des modèles Dataflow.
- Affichez tous les modèles fournis par Google.
- Découvrez comment utiliser Pub/Sub pour créer et utiliser des sujets, et comment créer un abonnement pull.
- Découvrez comment créer des ensembles de données à l'aide de BigQuery.
- En savoir plus sur les abonnements Pub/Sub
- Découvrez des architectures de référence, des schémas et des bonnes pratiques concernant Google Cloud. Consultez notre Cloud Architecture Center.