Créer un job personnalisé avec le générateur de jobs

Le générateur de jobs vous permet de créer des jobs Dataflow par lot et en flux continu personnalisés. Vous pouvez également enregistrer les jobs du générateur de jobs en tant que fichiers Apache Beam YAML pour les partager et les réutiliser.

Créer un pipeline

Pour créer un pipeline dans le générateur de tâches, procédez comme suit :

  1. Accédez à la page Jobs de la console Google Cloud .

    Accéder aux tâches

  2. Cliquez sur Créer un job à partir du générateur.

  3. Dans le champ Nom du job, saisissez un nom pour la tâche.

  4. Sélectionnez Par lot ou Par flux.

  5. Si vous sélectionnez Streaming, choisissez un mode de fenêtrage. Saisissez ensuite une spécification pour la fenêtre, comme suit :

    • Fenêtre fixe : saisissez une taille de fenêtre en secondes.
    • Fenêtre glissante : saisissez une taille et une durée de fenêtre, en secondes.
    • Fenêtre de session : saisissez un intervalle de session en secondes.

    Pour en savoir plus sur le fenêtrage, consultez la page Windows et fonctions de fenêtrage.

Ajoutez ensuite des sources, des transformations et des récepteurs au pipeline, comme décrit dans les sections suivantes.

Ajouter une source au pipeline

Un pipeline doit comporter au moins une source. Initialement, le générateur de tâches est renseigné avec une source vide. Pour configurer la source, procédez comme suit :

  1. Dans le champ Nom de la source, saisissez un nom pour la source ou utilisez le nom par défaut. Le nom apparaît dans le graphique du job lorsque vous l'exécutez.

  2. Dans la liste Type de source, sélectionnez le type de source de données.

  3. En fonction du type de source, fournissez des informations de configuration supplémentaires. Par exemple, si vous sélectionnez BigQuery, spécifiez la table à partir de laquelle lire les données.

    Si vous sélectionnez Pub/Sub, spécifiez un schéma de message. Saisissez le nom et le type de données de chaque champ que vous souhaitez lire à partir des messages Pub/Sub. Le pipeline supprime tous les champs qui ne sont pas spécifiés dans le schéma.

  4. Facultatif : Pour certains types de sources, vous pouvez cliquer sur Prévisualiser les données sources pour prévisualiser les données sources.

Pour ajouter une autre source au pipeline, cliquez sur Ajouter une source. Pour combiner des données provenant de plusieurs sources, ajoutez une transformation SQL ou Join à votre pipeline.

Ajouter une transformation au pipeline

Vous pouvez éventuellement ajouter une ou plusieurs transformations au pipeline. Vous pouvez utiliser les transformations suivantes pour manipuler, agréger ou joindre des données provenant de sources et d'autres transformations :

Type de transformation Description Informations sur la transformation YAML Beam
Filtrer (Python) Filtrez les enregistrements avec une expression Python.
Transformation SQL Manipulez des enregistrements ou joignez plusieurs entrées avec une instruction SQL.
Mapper les champs (Python) Ajoutez de nouveaux champs ou remappez des enregistrements entiers avec des expressions et des fonctions Python.
Mapper les champs (SQL) Ajoutez ou mappez des champs d'enregistrement avec des expressions SQL.
Transformations YAML :
  1. AssertEqual
  2. AssignTimestamps
  3. Combinaison
  4. Fractionner
  5. Filtre
  6. Flatten
  7. Rejoindre
  8. LogForTesting
  9. MLTransform
  10. MapToFields
  11. PyTransform
  12. WindowInfo

Utilisez n'importe quelle transformation du SDK Beam YAML.

Configuration de la transformation YAML : indiquez les paramètres de configuration de la transformation YAML sous la forme d'un mappage YAML. Les paires clé/valeur sont utilisées pour remplir la section de configuration de la transformation Beam YAML obtenue. Pour connaître les paramètres de configuration compatibles pour chaque type de transformation, consultez la documentation sur la transformation Beam YAML. Exemples de paramètres de configuration :

Combinaison
group_by:
combine:
Rejoindre
type:
equalities:
fields:
Journal Enregistre les journaux dans les journaux des nœuds de calcul du job.
Grouper par Combiner des enregistrements avec des fonctions comme count() et sum().
Rejoindre Joignez plusieurs entrées sur des champs égaux.
Fractionner Fractionnez les enregistrements en aplatissant les champs du tableau.

Pour ajouter une transformation, procédez comme suit :

  1. Cliquez sur Ajouter une transformation.

  2. Dans le champ Transformation, saisissez un nom pour la transformation ou utilisez le nom par défaut. Le nom apparaît dans le graphique du job lorsque vous l'exécutez.

  3. Dans la liste Type de transformation, sélectionnez le type de transformation.

  4. En fonction du type de transformation, fournissez des informations de configuration supplémentaires. Par exemple, si vous sélectionnez Filtrer (Python), saisissez une expression Python à utiliser comme filtre.

  5. Sélectionnez l'étape d'entrée de la transformation. L'étape d'entrée est la source ou la transformation dont la sortie fournit l'entrée pour cette transformation.

Ajouter un récepteur au pipeline

Un pipeline doit comporter au moins un récepteur. Initialement, le générateur de tâches est renseigné avec un récepteur vide. Pour configurer le récepteur, procédez comme suit :

  1. Dans le champ Nom du récepteur, saisissez un nom pour le récepteur ou utilisez le nom par défaut. Le nom apparaît dans le graphique du job lorsque vous l'exécutez.

  2. Dans la liste Type de récepteur, sélectionnez le type de récepteur.

  3. En fonction du type de récepteur, fournissez des informations de configuration supplémentaires. Par exemple, si vous sélectionnez le récepteur BigQuery, sélectionnez la table BigQuery dans laquelle écrire.

  4. Sélectionnez l'étape d'entrée du récepteur. L'étape d'entrée est la source ou la transformation dont la sortie fournit l'entrée pour cette transformation.

  5. Pour ajouter un autre récepteur au pipeline, cliquez sur Ajouter un récepteur.

Exécuter le pipeline

Pour exécuter un pipeline à partir du générateur de tâches, procédez comme suit :

  1. Facultatif : Définissez les options de la tâche Dataflow. Pour développer la section "Options du Dataflow", cliquez sur la flèche de développement .

  2. Cliquez sur Run Job (Exécuter la tâche). Le générateur de jobs accède au graphique de job pour le job envoyé. Vous pouvez utiliser le graphique de job pour surveiller l'état du job.

Valider le pipeline avant de le lancer

Pour les pipelines avec une configuration complexe, tels que les filtres Python et les expressions SQL, il peut être utile de vérifier la configuration du pipeline pour détecter les erreurs de syntaxe avant de le lancer. Pour valider la syntaxe du pipeline, procédez comme suit :

  1. Cliquez sur Valider pour ouvrir Cloud Shell et démarrer le service de validation.
  2. Cliquez sur Commencer la validation.
  3. Si une erreur est détectée lors de la validation, un point d'exclamation rouge s'affiche.
  4. Corrigez les erreurs détectées, puis validez les corrections en cliquant sur Valider. Si aucune erreur n'est détectée, une coche verte s'affiche.

Exécuter avec gcloud CLI

Vous pouvez également exécuter des pipelines Beam YAML à l'aide de la gcloud CLI. Pour exécuter un pipeline de job builder avec gcloud CLI :

  1. Cliquez sur Enregistrer le fichier YAML pour ouvrir la fenêtre Enregistrer le fichier YAML.

  2. Effectuez l'une des actions suivantes :

    • Pour enregistrer le fichier dans Cloud Storage, saisissez un chemin d'accès Cloud Storage, puis cliquez sur Enregistrer.
    • Pour télécharger un fichier local, cliquez sur Télécharger.
  3. Exécutez la commande suivante dans votre interface système ou votre terminal :

      gcloud dataflow yaml run my-job-builder-job --yaml-pipeline-file=YAML_FILE_PATH
    

    Remplacez YAML_FILE_PATH par le chemin d'accès à votre fichier YAML, en local ou dans Cloud Storage.

Étapes suivantes