Créer un job personnalisé avec le générateur de jobs

Le générateur de jobs vous permet de créer des jobs Dataflow par lot et en flux continu personnalisés. Vous pouvez également enregistrer les tâches du générateur de tâches en tant que fichiers YAML Apache Beam à partager et à réutiliser.

Créer un pipeline

Pour créer un pipeline dans le générateur de tâches, procédez comme suit :

  1. Accédez à la page Jobs dans la console Google Cloud.

    Accéder aux tâches

  2. Cliquez sur Créer une tâche à partir du générateur.

  3. Dans le champ Nom du job, saisissez un nom pour la tâche.

  4. Sélectionnez Par lot ou Par flux.

  5. Si vous sélectionnez Streaming, sélectionnez un mode de fenêtrage. Saisissez ensuite une spécification pour la fenêtre, comme suit :

    • Fenêtre fixe : saisissez une taille de fenêtre en secondes.
    • Fenêtre glissante : saisissez une taille et une durée de fenêtre, en secondes.
    • Fenêtre de session : saisissez un intervalle de session en secondes.

    Pour en savoir plus sur le fenêtrage, consultez la page Windows et fonctions de fenêtrage.

Ajoutez ensuite des sources, des transformations et des destinations au pipeline, comme décrit dans les sections suivantes.

Ajouter une source au pipeline

Un pipeline doit comporter au moins une source. Initialement, le générateur de tâches est renseigné avec une source vide. Pour configurer la source, procédez comme suit :

  1. Dans le champ Nom de la source, saisissez un nom pour la source ou utilisez le nom par défaut. Le nom apparaît dans le graphique de la tâche lorsque vous l'exécutez.

  2. Dans la liste Type de source, sélectionnez le type de source de données.

  3. Selon le type de source, fournissez des informations de configuration supplémentaires. Par exemple, si vous sélectionnez BigQuery, spécifiez la table à lire.

    Si vous sélectionnez Pub/Sub, spécifiez un schéma de message. Saisissez le nom et le type de données de chaque champ que vous souhaitez lire à partir des messages Pub/Sub. Le pipeline supprime tous les champs qui ne sont pas spécifiés dans le schéma.

  4. Facultatif : Pour certains types de sources, vous pouvez cliquer sur Prévisualiser les données sources pour prévisualiser les données sources.

Pour ajouter une autre source au pipeline, cliquez sur Ajouter une source. Pour combiner des données provenant de plusieurs sources, ajoutez une transformation SQL ou Join à votre pipeline.

Ajouter une transformation au pipeline

Vous pouvez éventuellement ajouter une ou plusieurs transformations au pipeline. Vous pouvez utiliser les transformations suivantes pour manipuler, agréger ou joindre des données à partir de sources et d'autres transformations:

Type de transformation Description Informations sur la transformation YAML Beam
Filtrer (Python) Filtrez les enregistrements avec une expression Python.
Transformation SQL Manipulez des enregistrements ou associez plusieurs entrées à l'aide d'une instruction SQL.
Rejoindre Joignez plusieurs entrées sur des champs égaux.
Mapper les champs (Python) Ajoutez de nouveaux champs ou remappez des enregistrements entiers avec des expressions et des fonctions Python.
Mapper les champs (SQL) Ajoutez ou mappez des champs d'enregistrement avec des expressions SQL.
Grouper par Combinez des enregistrements avec des fonctions telles que count() et sum().
Transformations YAML :
  1. AssertEqual
  2. AssignTimestamps
  3. Combinaison
  4. Fractionner
  5. Filtre
  6. Flatten
  7. Rejoindre
  8. LogForTesting
  9. MLTransform
  10. MapToFields
  11. PyTransform
  12. WindowInfo

Utilisez n'importe quelle transformation du SDK Beam YAML.

Configuration de la transformation YAML: indiquez les paramètres de configuration de la transformation YAML sous la forme d'un mappage YAML. Les paires clé-valeur sont utilisées pour renseigner la section de configuration de la transformation Beam YAML obtenue. Pour connaître les paramètres de configuration compatibles pour chaque type de transformation, consultez la documentation sur la transformation Beam YAML. Exemples de paramètres de configuration:

Combinaison
group_by:
combine:
Rejoindre
type:
equalities:
fields:
Fractionner Fractionnez les enregistrements en aplatissant les champs du tableau.

Pour ajouter une transformation, procédez comme suit :

  1. Cliquez sur Ajouter une transformation.

  2. Dans le champ Transformation, saisissez un nom pour la transformation ou utilisez le nom par défaut. Le nom apparaît dans le graphique de la tâche lorsque vous l'exécutez.

  3. Dans la liste Type de transformation, sélectionnez le type de transformation.

  4. Selon le type de transformation, fournissez des informations de configuration supplémentaires. Par exemple, si vous sélectionnez Filtre (Python), saisissez une expression Python à utiliser comme filtre.

  5. Sélectionnez l'étape d'entrée pour la transformation. L'étape d'entrée est la source ou la transformation dont la sortie fournit l'entrée pour cette transformation.

Ajouter un récepteur au pipeline

Un pipeline doit comporter au moins un sink. Initialement, le générateur de tâches est renseigné avec un récepteur vide. Pour configurer le récepteur, procédez comme suit :

  1. Dans le champ Nom du récepteur, saisissez un nom pour le récepteur ou utilisez le nom par défaut. Le nom apparaît dans le graphique de la tâche lorsque vous l'exécutez.

  2. Dans la liste Type de lavabo, sélectionnez le type de lavabo.

  3. Selon le type de sink, fournissez des informations de configuration supplémentaires. Par exemple, si vous sélectionnez le récepteur BigQuery, sélectionnez la table BigQuery dans laquelle écrire.

  4. Sélectionnez l'étape d'entrée du récepteur. L'étape d'entrée est la source ou la transformation dont la sortie fournit l'entrée pour cette transformation.

  5. Pour ajouter un autre entonnoir au pipeline, cliquez sur Ajouter un entonnoir.

Exécuter le pipeline

Pour exécuter un pipeline à partir du générateur de tâches, procédez comme suit :

  1. Facultatif : Définissez les options de la tâche Dataflow. Pour développer la section "Options de flux de données", cliquez sur la flèche de développement .

  2. Cliquez sur Run Job (Exécuter la tâche). Le générateur de jobs accède au graphique de job pour le job envoyé. Vous pouvez utiliser le graphique de la tâche pour surveiller son état.

Valider le pipeline avant de le lancer

Pour les pipelines dont la configuration est complexe, tels que les filtres Python et les expressions SQL, il peut être utile de vérifier la configuration du pipeline pour détecter les erreurs de syntaxe avant de le lancer. Pour valider la syntaxe du pipeline, procédez comme suit:

  1. Cliquez sur Valider pour ouvrir Cloud Shell et démarrer le service de validation.
  2. Cliquez sur Démarrer la validation.
  3. Si une erreur est détectée lors de la validation, un point d'exclamation rouge s'affiche.
  4. Corrigez les erreurs détectées et vérifiez les corrections en cliquant sur Valider. Si aucune erreur n'est détectée, une coche verte s'affiche.

Exécuter avec la CLI gcloud

Vous pouvez également exécuter des pipelines YAML Beam à l'aide de la CLI gcloud. Pour exécuter un pipeline de création de tâches avec la CLI gcloud:

  1. Cliquez sur Enregistrer le fichier YAML pour ouvrir la fenêtre Enregistrer le fichier YAML.

  2. Effectuez l'une des actions suivantes :

    • Pour enregistrer dans Cloud Storage, saisissez un chemin d'accès Cloud Storage, puis cliquez sur Enregistrer.
    • Pour télécharger un fichier local, cliquez sur Télécharger.
  3. Exécutez la commande suivante dans votre interface système ou votre terminal:

      gcloud dataflow yaml run my-job-builder-job --yaml-pipeline-file=YAML_FILE_PATH
    

    Remplacez YAML_FILE_PATH par le chemin d'accès à votre fichier YAML, localement ou dans Cloud Storage.

Étape suivante