Charger des données de Cloud Storage vers BigQuery à l'aide de Workflows

Last reviewed 2021-05-12 UTC

Ce tutoriel explique comment exécuter des workflows sans serveur de manière fiable à l'aide de Workflows, de fonctions Cloud Run et de Firestore pour charger des données brutes, telles que les journaux des événements, de Cloud Storage vers BigQuery. Les plates-formes d'analyse disposent généralement d'un outil d'orchestration permettant de charger périodiquement des données dans BigQuery à l'aide de tâches BigQuery, puis de les transformer pour fournir des métriques commerciales à l'aide d'instructions SQL, y compris d'instructions de langage procédural BigQuery. Ce tutoriel s'adresse aux développeurs et aux architectes qui souhaitent créer des pipelines de traitement des données sans serveur basés sur des événements. Pour ce tutoriel, nous partons du principe que vous connaissez bien YAML, SQL et Python.

Architecture

Le schéma suivant illustre l'architecture de haut niveau d'un pipeline d'extraction, chargement et transformation sans serveur utilisant Workflows.

Extraction, chargement et transformation du pipeline.

Dans le schéma précédent, considérons une plate-forme de vente au détail qui collecte régulièrement des événements de vente sous forme de fichiers depuis divers magasins, puis écrit les fichiers dans un bucket Cloud Storage. Les événements permettent de fournir des métriques commerciales via l'importation et le traitement des données dans BigQuery. Cette architecture fournit un système d'orchestration fiable et sans serveur pour importer vos fichiers dans BigQuery. Elle se divise en deux modules :

  • Liste de fichiers : gère la liste des fichiers non traités ajoutés à un bucket Cloud Storage dans une collection Firestore. Ce module fonctionne via une fonction Cloud Run déclenchée par un événement de stockage Object Finalize (Finalisation de l'objet), lequel est généré lorsqu'un nouveau fichier est ajouté au bucket Cloud Storage. Le nom de fichier est ajouté au tableau files de la collection nommée new dans Firestore.
  • Workflow : exécute les workflows planifiés. Cloud Scheduler déclenche un workflow qui exécute une série d'étapes selon une syntaxe basée sur YAML pour orchestrer le chargement, puis transformer les données dans BigQuery en appelant des fonctions Cloud Run. Les étapes du workflow appellent des fonctions Cloud Run pour exécuter les tâches suivantes:

    • Créer et démarrer une tâche de chargement BigQuery.
    • Interroger l'état de la tâche de chargement.
    • Créer et démarrer la tâche de requête de transformation.
    • Interroger l'état de la tâche de transformation.

L'utilisation de transactions pour gérer la liste des nouveaux fichiers dans Firestore permet de garantir qu'aucun fichier ne manque lorsqu'un workflow les importe dans BigQuery. Les exécutions distinctes du workflow deviennent idempotentes lorsque vous stockez les métadonnées et l'état de la tâche dans Firestore.

Objectifs

  • Créez une base de données Firestore.
  • Configurer un déclencheur de fonction Cloud Run pour suivre les fichiers ajoutés au bucket Cloud Storage dans Firestore.
  • Déployez des fonctions Cloud Run pour exécuter et surveiller les tâches BigQuery.
  • Déployer et exécuter un workflow pour automatiser le processus.

Coûts

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Une fois que vous avez terminé les tâches décrites dans ce document, vous pouvez éviter de continuer à payer des frais en supprimant les ressources que vous avez créées. Pour en savoir plus, consultez la section Effectuer un nettoyage.

Avant de commencer

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Cloud Build, Cloud Run functions, Identity and Access Management, Resource Manager, and Workflows APIs.

    Enable the APIs

  4. Accédez à la page Bienvenue et notez l'ID du projet à utiliser dans une étape ultérieure.

    Accéder à la page d'accueil

  5. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

Préparer votre environnement

Pour préparer votre environnement, créez une base de données Firestore, clonez les exemples de code du dépôt GitHub, créez des ressources à l'aide de Terraform, modifiez le fichier YAML des workflows et installez les exigences du générateur de fichiers.

  1. Pour créer une base de données Firestore, procédez comme suit :

    1. Dans Google Cloud Console, accédez à la page Firestore.

      Accéder à Firestore

    2. Cliquez sur Sélectionner le mode natif.

    3. Dans le menu Sélectionner un emplacement, sélectionnez la région dans laquelle vous souhaitez héberger la base de données Firestore. Nous vous recommandons de choisir une région proche de votre emplacement physique.

    4. Cliquez sur Créer une base de données.

  2. Dans Cloud Shell, clonez le dépôt source :

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/workflows-demos
    cd workflows-demos/workflows-bigquery-load
    
  3. Dans Cloud Shell, créez les ressources suivantes à l'aide de Terraform:

    terraform init
    terraform apply \
        -var project_id=PROJECT_ID \
        -var region=REGION \
        -var zone=ZONE \
        --auto-approve
    

    Remplacez les éléments suivants :

    • PROJECT_ID : ID de votre projet Google Cloud
    • REGION : emplacement géographique Google Cloud spécifique dans lequel héberger vos ressources, par exemple us-central1
    • ZONE : emplacement d'une région dans laquelle héberger vos ressources (par exemple, us-central1-b)

    Un message semblable au suivant doit s'afficher : Apply complete! Resources: 7 added, 0 changed, 1 destroyed.

    Terraform peut vous aider à créer, modifier et mettre à niveau une infrastructure à grande échelle de manière sécurisée et prévisible. Les ressources suivantes sont créées dans votre projet :

    • Des comptes de service disposant des droits requis pour garantir un accès sécurisé à vos ressources
    • Un ensemble de données BigQuery nommé serverless_elt_dataset et une table nommée word_count pour charger les fichiers entrants
    • Un bucket Cloud Storage nommé ${project_id}-ordersbucket pour les fichiers d'entrée de préproduction
    • Les cinq fonctions Cloud Run suivantes :
      • file_add_handler ajoute le nom des fichiers intégrés au bucket Cloud Storage dans la collection Firestore.
      • create_job crée une tâche de chargement BigQuery et associe les fichiers de la collection Firebase avec la tâche.
      • create_query crée une requête BigQuery.
      • poll_bigquery_job obtient l'état d'une tâche BigQuery.
      • run_bigquery_job démarre une tâche BigQuery.
  4. Récupérez les URL des fonctions Cloud Run create_job, create_query, poll_job et run_bigquery_job que vous avez déployées à l'étape précédente.

    gcloud functions describe create_job | grep url
    gcloud functions describe poll_bigquery_job | grep url
    gcloud functions describe run_bigquery_job | grep url
    gcloud functions describe create_query | grep url
    

    Le résultat ressemble à ce qui suit :

    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/poll_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/run_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_query
    

    Notez ces URL, car vous en aurez besoin lorsque vous déploierez votre workflow.

Créer et déployer un workflow

  1. Dans Cloud Shell, ouvrez le fichier source du workflow, workflow.yaml:

    main:
      steps:
        - constants:
            assign:
              - create_job_url: CREATE_JOB_URL
              - poll_job_url: POLL_BIGQUERY_JOB_URL
              - run_job_url: RUN_BIGQUERY_JOB_URL
              - create_query_url: CREATE_QUERY_URL
              - region: BQ_REGION
              - table_name: BQ_DATASET_TABLE_NAME
            next: createJob
    
        - createJob:
            call: http.get
            args:
              url: ${create_job_url}
              auth:
                  type: OIDC
              query:
                  region: ${region}
                  table_name: ${table_name}
            result: job
            next: setJobId
    
        - setJobId:
            assign:
              - job_id: ${job.body.job_id}
            next: jobCreateCheck
    
        - jobCreateCheck:
            switch:
              - condition: ${job_id == Null}
                next: noOpJob
            next: runLoadJob
    
        - runLoadJob:
            call: runBigQueryJob
            args:
                job_id: ${job_id}
                run_job_url: ${run_job_url}
                poll_job_url: ${poll_job_url}
            result: jobStatus
            next: loadRunCheck
    
        - loadRunCheck:
            switch:
              - condition: ${jobStatus == 2}
                next: createQueryJob
            next: failedLoadJob
    
        - createQueryJob:
            call: http.get
            args:
              url: ${create_query_url}
              query:
                  qs: "select count(*) from serverless_elt_dataset.word_count"
                  region: "US"
              auth:
                  type: OIDC
            result: queryjob
            next: setQueryJobId
    
        - setQueryJobId:
            assign:
              - qid: ${queryjob.body.job_id}
            next: queryCreateCheck
    
        - queryCreateCheck:
            switch:
              - condition: ${qid == Null}
                next: failedQueryJob
            next: runQueryJob
    
        - runQueryJob:
            call: runBigQueryJob
            args:
              job_id: ${qid}
              run_job_url: ${run_job_url}
              poll_job_url: ${poll_job_url}
            result: queryJobState
            next: runQueryCheck
    
        - runQueryCheck:
            switch:
              - condition: ${queryJobState == 2}
                next: allDone
            next: failedQueryJob
    
        - noOpJob:
            return: "No files to import"
            next: end
    
        - allDone:
            return: "All done!"
            next: end
    
        - failedQueryJob:
            return: "Query job failed"
            next: end
    
        - failedLoadJob:
            return: "Load job failed"
            next: end
    
    
    runBigQueryJob:
      params: [job_id, run_job_url, poll_job_url]
      steps:
        - startBigQueryJob:
            try:
              call: http.get
              args:
                  url: ${run_job_url}
                  query:
                    job_id: ${job_id}
                  auth:
                    type: OIDC
                  timeout: 600
              result: submitJobState
            retry: ${http.default_retry}
            next: validateSubmit
    
        - validateSubmit:
            switch:
              - condition: ${submitJobState.body.status == 1}
                next: sleepAndPollLoad
            next: returnState
    
        - returnState:
            return: ${submitJobState.body.status}
    
        - sleepAndPollLoad:
            call: sys.sleep
            args:
              seconds: 5
            next: pollJob
    
        - pollJob:
            try:
              call: http.get
              args:
                url: ${poll_job_url}
                query:
                  job_id: ${job_id}
                auth:
                  type: OIDC
                timeout: 600
              result: pollJobState
            retry:
              predicate: ${http.default_retry_predicate}
              max_retries: 10
              backoff:
                initial_delay: 1
                max_delay: 60
                multiplier: 2
            next: stateCheck
    
        - stateCheck:
            switch:
              - condition: ${pollJobState.body.status == 2}
                return: ${pollJobState.body.status}
              - condition: ${pollJobState.body.status == 3}
                return: ${pollJobState.body.status}
            next: sleepAndPollLoad

    Remplacez les éléments suivants :

    • CREATE_JOB_URL : URL de la fonction permettant de créer un job
    • POLL_BIGQUERY_JOB_URL : URL de la fonction permettant d'interroger l'état d'une tâche en cours d'exécution
    • RUN_BIGQUERY_JOB_URL : URL de la fonction permettant de démarrer une tâche de chargement BigQuery
    • CREATE_QUERY_URL : URL de la fonction permettant de démarrer une requête BigQuery
    • BQ_REGION: région BigQuery dans laquelle les données sont stockées, par exemple US.
    • BQ_DATASET_TABLE_NAME: nom de la table de l'ensemble de données BigQuery au format PROJECT_ID.serverless_elt_dataset.word_count
  2. Déployez le fichier workflow :

    gcloud workflows deploy WORKFLOW_NAME \
        --location=WORKFLOW_REGION \
        --description='WORKFLOW_DESCRIPTION' \
        --service-account=workflow-runner@PROJECT_ID.iam.gserviceaccount.com \
        --source=workflow.yaml
    

    Remplacez les éléments suivants :

    • WORKFLOW_NAME : nom unique du workflow
    • WORKFLOW_REGION: région dans laquelle le workflow est déployé (par exemple, us-central1)
    • WORKFLOW_DESCRIPTION : description du workflow.
  3. Créez un environnement virtuel Python 3 et installez les éléments requis pour le générateur de fichiers :

    sudo apt-get install -y python3-venv
    python3 -m venv env
    . env/bin/activate
    cd generator
    pip install -r requirements.txt
    

Générer des fichiers à importer

Le script Python gen.py génère un contenu aléatoire au format Avro. Le schéma est identique à la table BigQuery word_count. Ces fichiers Avro sont copiés dans le bucket Cloud Storage spécifié.

Dans Cloud Shell, générez les fichiers :

python gen.py -p PROJECT_ID \
    -o PROJECT_ID-ordersbucket \
    -n RECORDS_PER_FILE \
    -f NUM_FILES \
    -x FILE_PREFIX

Remplacez les éléments suivants :

  • RECORDS_PER_FILE : nombre d'enregistrements dans un seul fichier
  • NUM_FILES : nombre total de fichiers à importer
  • FILE_PREFIX : préfixe des noms des fichiers générés

Afficher les entrées de fichier dans Firestore

Lorsque les fichiers sont copiés dans Cloud Storage, la fonction Cloud Run handle_new_file est déclenchée. Cette fonction ajoute la liste de fichiers au tableau de liste de fichiers dans le document new de la collection jobs dans Firestore.

Pour afficher la liste des fichiers, accédez à la page Données de Firestore dans la console Google Cloud.

Accéder aux données

Liste des fichiers ajoutés à la collection

Déclencher le workflow

Workflows associe une série de tâches sans serveur provenant de Google Cloud et de services d'API. Les étapes individuelles de ce workflow s'exécutent en tant que fonctions Cloud Run et l'état est stocké dans Firestore. Tous les appels aux fonctions Cloud Run sont authentifiés à l'aide du compte de service du workflow.

Dans Cloud Shell, exécutez le workflow suivant :

gcloud workflows execute WORKFLOW_NAME

Le schéma suivant illustre les étapes utilisées dans le workflow :

Étapes utilisées dans le workflow principal et le sous-workflow.

Le workflow se divise en deux parties : le workflow principal et le sous-workflow. Le workflow principal gère la création de tâches et l'exécution conditionnelle, tandis que le sous-workflow exécute une tâche BigQuery. Le workflow effectue les opérations suivantes :

  • La fonction Cloud Run create_job crée un objet de tâche, récupère la liste des fichiers ajoutés à Cloud Storage à partir du document Firestore, puis associe les fichiers à la tâche de chargement. S'il n'y a pas de fichiers à charger, la fonction ne crée pas de job.
  • La fonction Cloud Run create_query associe la requête devant être exécutée avec la région BigQuery dans laquelle la requête doit être exécutée. La fonction crée la tâche dans Firestore et renvoie l'ID de la tâche.
  • La fonction Cloud Run run_bigquery_job obtient l'ID de la tâche qui doit être exécutée, puis appelle l'API BigQuery pour envoyer la tâche.
  • Au lieu d'attendre la fin de la tâche dans la fonction Cloud Run, vous pouvez interroger régulièrement son état.
    • La fonction Cloud Run poll_bigquery_job fournit l'état de la tâche. Elle est appelée à plusieurs reprises jusqu'à la fin de la tâche.
    • Pour ajouter un délai entre les appels à la fonction Cloud Run poll_bigquery_job, une routine sleep est appelée à partir des Workflows.

Afficher l'état de la tâche

Vous pouvez afficher la liste des fichiers et l'état de la tâche.

  1. Dans la console Google Cloud, accédez à la page Données de Firestore.

    Accéder aux données

  2. Un identifiant unique (UUID) est généré pour chaque tâche. Pour afficher les en-têtes job_type et status, cliquez sur l'ID de la tâche. Une tâche peut être de l'un des types suivants, et se trouver dans l'un des états suivants :

    • job_type : type de tâche en cours d'exécution par le workflow avec l'une des valeurs suivantes :

      • 0 : charger les données dans BigQuery.
      • 1 : exécuter une requête dans BigQuery.
    • status : état actuel de la tâche avec l'une des valeurs suivantes :

      • 0 : la tâche a été créée, mais n'a pas démarré.
      • 1 : la tâche est en cours d'exécution.
      • 2 : la tâche a bien été exécutée.
      • 3: une erreur s'est produite et la tâche n'a pas abouti.

    L'objet de tâche contient également des attributs de métadonnées tels que la région de l'ensemble de données BigQuery, le nom de la table BigQuery et, s'il s'agit d'une tâche de requête, la chaîne de requête en cours d'exécution.

Liste des fichiers avec l'état de la tâche mis en évidence

Consulter les données dans BigQuery

Pour vérifier que la tâche ELT a abouti, vérifiez que les données apparaissent dans la table.

  1. Dans la console Google Cloud, accédez à la page Éditeur de BigQuery.

    Accéder à l'éditeur

  2. Cliquez sur la table serverless_elt_dataset.word_count.

  3. Cliquez sur l'onglet Preview (Aperçu).

    Onglet Aperçu affichant les données de la table

Planifier le workflow

Pour exécuter régulièrement le workflow de manière planifiée, vous pouvez utiliser Cloud Scheduler.

Effectuer un nettoyage

Le moyen le plus simple d'éviter la facturation consiste à supprimer le projet Google Cloud que vous avez créé pour le tutoriel. Vous pouvez également supprimer les différentes ressources.

Supprimer les ressources individuelles

  1. Dans Cloud Shell, supprimez toutes les ressources créées à l'aide de Terraform :

    cd $HOME/bigquery-workflows-load
    terraform destroy \
    -var project_id=PROJECT_ID \
    -var region=REGION \
    -var zone=ZONE \
    --auto-approve
    
  2. Dans la console Google Cloud, accédez à la page Données de Firestore.

    Accéder aux données

  3. À côté de Tâches, cliquez sur Menu, puis sélectionnez Supprimer.

    Chemin de menu permettant de supprimer une collection.

Supprimer le projet

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Étapes suivantes