Diffuser des données en streaming depuis Pub/Sub vers BigQuery


Ce tutoriel utilise le modèle abonnement Pub/Sub vers BigQuery pour créer et exécuter un job de modèle Dataflow à l'aide de la console Google Cloud ou de Google Cloud CLI. Il présente un exemple de pipeline de streaming qui lit les messages encodés en JSON à partir de Pub/Sub et les écrit dans une table BigQuery.

Les pipelines d'analyse de flux et d'intégration de données utilisent Pub/Sub pour ingérer et distribuer les données. Pub/Sub vous permet de créer des systèmes de producteurs et de consommateurs d'événements, appelés éditeurs et abonnés. Les éditeurs envoient des événements au service Pub/Sub de manière asynchrone, et Pub/Sub transmet les événements à tous les services qui doivent y réagir.

Dataflow est un service entièrement géré permettant de transformer et d'enrichir les données par flux (en temps réel) et par lots. Il fournit un environnement de développement de pipeline simplifié qui utilise le SDK Apache Beam pour transformer les données entrantes et générer les données transformées.

L'avantage de ce workflow est que vous pouvez utiliser des UDF pour transformer les données des messages avant de les écrire dans BigQuery.

Avant d'exécuter un pipeline Dataflow pour ce scénario, déterminez si un abonnement Pub/Sub BigQuery avec une UDF répond à vos besoins.

Objectifs

  • Créer un sujet Pub/Sub
  • Créer un ensemble de données BigQuery avec une table et un schéma
  • Utiliser un modèle de streaming fourni par Google pour diffuser des données en streaming de votre abonnement Pub/Sub vers BigQuery à l'aide de Dataflow.

Coûts

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • BigQuery

Pour obtenir une estimation des coûts en fonction de votre utilisation prévue, utilisez le simulateur de coût.

Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Une fois que vous avez terminé les tâches décrites dans ce document, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez la section Effectuer un nettoyage.

Avant de commencer

Cette section explique comment sélectionner un projet, activer des API et attribuer les rôles appropriés au compte utilisateur et au compte de service du nœud de calcul.

Console

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Enable the APIs

  8. Pour suivre la procédure décrite dans ce tutoriel, votre compte utilisateur doit disposer du rôle Utilisateur du compte de service. Le compte de service Compute Engine par défaut doit disposer des rôles suivants : Nœud de calcul Dataflow, Administrateur Dataflow, Éditeur Pub/Sub, Administrateur des objets de l'espace de stockage et Éditeur de données BigQuery. Pour ajouter les rôles requis dans la console Google Cloud  :

    1. Dans la console Google Cloud , accédez à la page IAM.

      Accéder à IAM
    2. Sélectionnez votre projet.
    3. Sur la ligne contenant votre compte utilisateur, cliquez sur Modifier le compte principal, puis sur Ajouter un autre rôle.
    4. Dans la liste déroulante, sélectionnez le rôle Utilisateur du compte de service.
    5. Sur la ligne contenant le compte de service Compute Engine par défaut, cliquez sur Modifier le compte principal, puis sur Ajouter un autre rôle.
    6. Dans la liste déroulante, sélectionnez le rôle Nœud de calcul Dataflow.
    7. Répétez la procédure pour les rôles Administrateur Dataflow, Éditeur Pub/Sub, Administrateur des objets de l'espace de stockage et Éditeur de données BigQuery, puis cliquez surEnregistrer.

      Pour en savoir plus sur l'attribution de rôles, consultez la page Attribuer un rôle IAM à l'aide de la console.

gcloud

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com
  8. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  10. Install the Google Cloud CLI.

  11. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  12. To initialize the gcloud CLI, run the following command:

    gcloud init
  13. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Make sure that billing is enabled for your Google Cloud project.

  15. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com
  16. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  18. Attribuez des rôles à votre compte de service Compute Engine par défaut. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Remplacez les éléments suivants :

    • PROJECT_ID : ID de votre projet.
    • PROJECT_NUMBER : votre numéro de projet. Pour trouver votre numéro de projet, utilisez la commande gcloud projects describe.
    • SERVICE_ACCOUNT_ROLE : chaque rôle individuel.

Créer un bucket Cloud Storage

Commencez par créer un bucket Cloud Storage à l'aide de la console Google Cloud ou de Google Cloud CLI. Le pipeline Dataflow utilise ce bucket comme emplacement de stockage temporaire.

Console

  1. Dans la console Google Cloud , accédez à la page Buckets Cloud Storage.

    Accéder à la page "Buckets"

  2. Cliquez sur Créer.

  3. Sur la page Créer un bucket, sous Nommer votre bucket, saisissez un nom qui respecte les règles de dénomination des buckets. Les noms des buckets Cloud Storage doivent être uniques. Ne sélectionnez pas les autres options.

  4. Cliquez sur Créer.

gcloud

Exécutez la commande gcloud storage buckets create :

gcloud storage buckets create gs://BUCKET_NAME

Remplacez BUCKET_NAME par un nom qui respecte les règles de dénomination des buckets pour votre bucket Cloud Storage. Les noms des buckets Cloud Storage doivent être uniques.

Créer un sujet et un abonnement Pub/Sub

Créez un sujet Pub/Sub, puis créez un abonnement associé à ce sujet.

Console

Pour créer un sujet, procédez comme suit.

  1. Dans la console Google Cloud , accédez à la page Sujets de Pub/Sub.

    Accéder aux sujets

  2. Cliquez sur Create topic (Créer un sujet).

  3. Dans le champ ID du sujet, saisissez un ID pour votre sujet. Pour savoir comment nommer un sujet, consultez la section Consignes de dénomination d'un sujet ou d'un abonnement.

  4. Conservez l'option Ajouter un abonnement par défaut. Ne sélectionnez pas les autres options.

  5. Cliquez sur Créer.

  6. Sur la page des détails du sujet, le nom de l'abonnement créé est indiqué sous ID de l'abonnement. Notez cette valeur pour les étapes ultérieures.

gcloud

Pour créer un sujet, exécutez la commande gcloud pubsub topics create. Pour savoir comment nommer un abonnement, consultez la section Consignes de dénomination d'un sujet ou d'un abonnement.

gcloud pubsub topics create TOPIC_ID

Remplacez TOPIC_ID par le nom de votre sujet Pub/Sub.

Pour créer un abonnement associé à votre sujet, exécutez la commande gcloud pubsub subscriptions create :

gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID

Remplacez SUBSCRIPTION_ID par le nom de votre abonnement Pub/Sub.

Créer une table BigQuery

Au cours de cette étape, vous allez créer une table BigQuery avec le schéma suivant :

Nom de la colonne Type de données
name STRING
customer_id INTEGER

Si vous n'avez pas encore d'ensemble de données BigQuery, commencez par en créer un. Pour en savoir plus, consultez Créer des ensembles de données. Créez ensuite une table vide :

Console

  1. Accédez à la page BigQuery.

    Accéder à BigQuery

  2. Dans le volet Explorateur, développez votre projet, puis sélectionnez un ensemble de données.

  3. Dans la section Informations sur l'ensemble de données, cliquez sur Créer une table.

  4. Dans la liste Créer une table à partir de, sélectionnez Table vide.

  5. Dans le champ Table, saisissez le nom de la table.

  6. Dans la section Schéma, cliquez sur Modifier sous forme de texte.

  7. Collez la définition de schéma suivante :

    name:STRING,
    customer_id:INTEGER
    
  8. Cliquez sur Créer une table.

gcloud

Exécutez la commande bq mk.

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

Remplacez les éléments suivants :

  • PROJECT_ID : ID de votre projet
  • DATASET_NAME : nom de l'ensemble de données
  • TABLE_NAME : nom de la table à créer

Exécuter le pipeline

Exécutez un pipeline de flux de données à l'aide du modèle d'abonnement Pub/Sub vers BigQuery fourni par Google. Le pipeline récupère les données entrantes du sujet Pub/Sub et les génère dans votre ensemble de données BigQuery.

Console

  1. Dans la console Google Cloud , accédez à la page Tâches Dataflow.

    Accéder aux tâches

  2. Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).

  3. Saisissez un nom de tâche pour votre tâche Dataflow.

  4. Pour le Point de terminaison régional, sélectionnez une région pour votre job Dataflow.

  5. Pour Modèle Dataflow, sélectionnez le modèle Abonnement Pub/Sub vers BigQuery.

  6. Pour Table BigQuery de sortie, sélectionnez Parcourir, puis sélectionnez votre table BigQuery.

  7. Dans la liste Abonnement d'entrée Pub/Sub, sélectionnez l'abonnement Pub/Sub.

  8. Dans le champ Emplacement temporaire, saisissez ce qui suit :

    gs://BUCKET_NAME/temp/
    

    Remplacez BUCKET_NAME par le nom de votre bucket Cloud Storage. Le dossier temp stocke les fichiers temporaires des jobs Dataflow.

  9. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Pour exécuter le modèle dans votre shell ou votre terminal, utilisez la commande gcloud dataflow jobs run.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
    --region DATAFLOW_REGION \
    --staging-location gs://BUCKET_NAME/temp \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME

Remplacez les variables suivantes :

  • JOB_NAME : nom de la tâche
  • DATAFLOW_REGION : région du job
  • PROJECT_ID : nom de votre Google Cloud projet
  • SUBSCRIPTION_ID : nom de votre abonnement Pub/Sub
  • DATASET_NAME : nom de votre ensemble de données BigQuery
  • TABLE_NAME : nom de votre table BigQuery

Publier des messages dans Pub/Sub

Une fois la tâche Dataflow lancée, vous pouvez publier des messages dans Pub/Sub. Le pipeline les écrit dans BigQuery.

Console

  1. Dans la console Google Cloud , accédez à la page Pub/Sub > Sujets.

    Accéder aux sujets

  2. Dans la liste des thèmes, cliquez sur le nom de votre thème.

  3. Cliquez sur Messages.

  4. Cliquez sur Publier des messages.

  5. Dans le champ Nombre de messages, saisissez 10.

  6. Dans le champ Corps du message, saisissez {"name": "Alice", "customer_id": 1}.

  7. Cliquez sur Publier.

gcloud

Pour publier des messages dans votre sujet, utilisez la commande gcloud pubsub topics publish.

for run in {1..10}; do
  gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done

Remplacez TOPIC_ID par le nom de votre sujet.

Afficher les résultats

Affichez les données écrites dans votre table BigQuery. L'affichage des données dans votre tableau peut prendre jusqu'à une minute.

Console

  1. Dans la console Google Cloud , accédez à la page BigQuery.
    Accéder à la page BigQuery

  2. Dans l'éditeur de requête, saisissez la requête suivante :

    SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`
    LIMIT 1000
    

    Remplacez les variables suivantes :

    • PROJECT_ID : nom de votre Google Cloudprojet
    • DATASET_NAME : nom de votre ensemble de données BigQuery
    • TABLE_NAME : nom de votre table BigQuery

gcloud

Vérifiez les résultats dans BigQuery en exécutant la requête suivante :

bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'

Remplacez les variables suivantes :

  • PROJECT_ID : nom de votre Google Cloudprojet
  • DATASET_NAME : nom de votre ensemble de données BigQuery
  • TABLE_NAME : nom de votre table BigQuery

Utiliser une UDF pour transformer les données

Ce tutoriel part du principe que les messages Pub/Sub sont au format JSON et que le schéma de la table BigQuery correspond aux données JSON.

Vous pouvez éventuellement fournir une fonction définie par l'utilisateur (UDF) en JavaScript qui transforme les données avant leur écriture dans BigQuery. Cette fonction permet d'effectuer un traitement supplémentaire, tel que le filtrage, la suppression des informations permettant d'identifier personnellement l'utilisateur ou l'enrichissement des données à l'aide de champs supplémentaires.

Pour en savoir plus, consultez Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.

Utiliser une table de lettres mortes

Pendant l'exécution du job, il est possible que le pipeline ne parvienne pas à écrire des messages individuels dans BigQuery. Voici les erreurs possibles :

  • Erreurs de sérialisation, y compris un format JSON mal formaté.
  • Erreurs de conversion de type, causées par une incohérence dans le schéma de la table et les données JSON.
  • Champs supplémentaires dans les données JSON qui ne sont pas présents dans le schéma de la table.

Le pipeline écrit ces erreurs dans une table de lettres mortes dans BigQuery. Par défaut, le pipeline crée automatiquement une table de lettres mortes nommée TABLE_NAME_error_records, où TABLE_NAME correspond au nom de la table de sortie. Pour utiliser un autre nom, définissez le paramètre de modèle outputDeadletterTable.

Effectuer un nettoyage

Pour éviter que les ressources utilisées lors de ce tutoriel soient facturées sur votre compte Google Cloud, supprimez le projet contenant les ressources, ou conservez le projet et supprimez les ressources individuelles.

Supprimer le projet

Le moyen le plus simple d'empêcher la facturation est de supprimer le projet Google Cloud que vous avez créé pour ce tutoriel.

Console

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

gcloud

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Supprimer les ressources individuelles

Si vous souhaitez réutiliser le projet ultérieurement, vous pouvez le conserver, mais supprimez les ressources que vous avez créées au cours du tutoriel.

Arrêter le pipeline Dataflow

Console

  1. Dans la console Google Cloud , accédez à la page Tâches Dataflow.

    Accéder aux tâches

  2. Cliquez sur la tâche que vous souhaitez arrêter.

    Pour arrêter une tâche, son état doit être en cours d'exécution.

  3. Sur la page des détails de la tâche, cliquez sur Arrêter.

  4. Cliquez sur Annuler.

  5. Pour confirmer votre choix, cliquez sur Arrêter la tâche.

gcloud

Pour annuler votre tâche Dataflow, utilisez la commande gcloud dataflow jobs.

gcloud dataflow jobs list \
  --filter 'NAME=JOB_NAME AND STATE=Running' \
  --format 'value(JOB_ID)' \
  --region "DATAFLOW_REGION" \
  | xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"

Nettoyer les ressources du Google Cloud projet

Console

  1. Supprimez le sujet et l'abonnement Pub/Sub.

    1. Accédez à la page Sujets Pub/Sub dans la console Google Cloud .

      Accéder aux sujets

    2. Sélectionnez le sujet que vous avez créé.

    3. Cliquez sur Delete (Supprimer) pour supprimer définitivement le sujet.

    4. Accédez à la page Abonnements Pub/Sub dans la console Google Cloud .

      Accéder aux abonnements

    5. Sélectionnez l'abonnement créé avec votre sujet.

    6. Cliquez sur Supprimer pour supprimer définitivement l'abonnement.

  2. Supprimez la table et l'ensemble de données BigQuery.

    1. Dans la console Google Cloud , accédez à la page BigQuery.

      Accéder à BigQuery

    2. Sur le panneau Explorateur, développez votre projet.

    3. À côté de l'ensemble de données que vous souhaitez supprimer, cliquez sur Afficher les actions, puis sur Supprimer.

  3. Supprimez le bucket Cloud Storage.

    1. Dans la console Google Cloud , accédez à la page Buckets Cloud Storage.

      Accéder à la page "Buckets"

    2. Sélectionnez le bucket que vous souhaitez supprimer, cliquez sur Supprimer, puis suivez les instructions.

gcloud

  1. Pour supprimer l'abonnement et le sujet Pub/Sub, utilisez les commandes gcloud pubsub subscriptions delete et gcloud pubsub topics delete.

    gcloud pubsub subscriptions delete SUBSCRIPTION_ID
    gcloud pubsub topics delete TOPIC_ID
    
  2. Pour supprimer la table BigQuery, utilisez la commande bq rm.

    bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
    
  3. Supprimez l'ensemble de données BigQuery. L'ensemble de données seul ne génère aucuns frais.

    bq rm -r -f -d PROJECT_ID:tutorial_dataset
    
  4. Pour supprimer le bucket Cloud Storage et ses objets, utilisez la commande gcloud storage rm. Le bucket seul ne génère aucuns frais.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Révoquer les identifiants

Console

Si vous conservez votre projet, révoquez les rôles que vous avez accordés au compte de service Compute Engine par défaut.

  1. Dans la console Google Cloud , accédez à la page IAM.

Accéder à IAM

  1. Sélectionnez un projet, un dossier ou une organisation.

  2. Recherchez la ligne contenant le compte principal dont vous souhaitez révoquer l'accès. Sur cette ligne, cliquez sur Modifier le compte principal.

  3. Cliquez sur le bouton Supprimer () pour chaque rôle que vous souhaitez révoquer, puis cliquez sur Enregistrer.

gcloud

  • Si vous conservez votre projet, révoquez les rôles que vous avez accordés au compte de service Compute Engine par défaut. Exécutez la commande suivante une fois pour chacun des rôles IAM suivants :
    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
      gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \
      --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \
      --role=<var>ROLE</var>
    

  • Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  • Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Étapes suivantes