Mode de flexibilité améliorée de Dataproc

Le mode de flexibilité améliorée (EFM) de Dataproc gère les données de brassage afin de minimiser les retards de progression de tâche causés par la suppression de nœuds d'un cluster en cours d'exécution. EFM décharge les données de brassage en les écrivant sur les nœuds de calcul principaux. Les nœuds de calcul sont extraits de ces nœuds distants pendant la phase de réduction. Ce mode n'est disponible que pour les tâches Spark.

Étant donné qu'EFM ne stocke pas de données de brassage intermédiaires sur les nœuds de calcul secondaires, EFM convient bien aux clusters utilisant des VM préemptives ou procédant uniquement à un autoscaling du groupe de nœuds de calcul secondaires.

L'EFM est compatible avec les versions d'image Dataproc 2.0.31+, 2.1.6+, 2.2.50+ et ultérieures.

Limites :

  • Les tâches Apache Hadoop YARN qui ne sont pas compatibles avec la relocalisation d'AppMaster peuvent échouer dans le mode de flexibilité améliorée (consultez la section Quand attendre qu'AppMasters se termine).
  • Le mode de flexibilité améliorée n'est pas recommandé dans les cas suivants :
    • sur un cluster ne comportant que des nœuds de calcul principaux
    • sur les tâches de streaming, car le nettoyage des données de brassage intermédiaires peut prendre jusqu'à 30 minutes après la fin de la tâche.
    • sur un cluster qui exécute des notebooks, car les données de mélange peuvent ne pas être nettoyées pendant la durée de la session.
    • Lorsque les tâches Spark s'exécutent sur un cluster avec mise hors service concertée activée. La mise hors service concertée et l'EFM peuvent être contradictoires, car le mécanisme de mise hors service concertée YARN conserve les nœuds DECOMMISSIONING jusqu'à ce que toutes les applications concernées soient terminées.
    • sur un cluster qui exécute à la fois des tâches Spark et des tâches non Spark.
  • Le mode de flexibilité améliorée n'est pas compatible :
    • Lorsque l'autoscaling du nœud de calcul principal est activé. Dans la plupart des cas, les nœuds de calcul principaux continuent de stocker les données de brassage qui ne sont pas automatiquement migrées. La réduction de la capacité du groupe de nœuds de calcul principaux annule les avantages EFM.

Utilisation du mode de flexibilité amélioré

La flexibilité améliorée est activée lorsque vous créez un cluster en définissant la propriété du cluster dataproc:efm.spark.shuffle sur primary-worker.

Exemple :

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
     other flags ...

Exemple pour Apache Spark

  1. Exécutez une tâche WordCount sur du texte Shakespeare public à l'aide du fichier jar d'exemples Spark sur le cluster EFM.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Configurer des disques SSD locaux

Étant donné que l'EFM écrit des données de brassage intermédiaires sur les disques associés aux VM, il bénéficie du débit et des IOPS supplémentaires fournis par les disques SSD locaux. Pour faciliter l'allocation des ressources, ciblez un objectif d'environ une partition SSD locale par 4 processeurs virtuels lors de la configuration des machines de nœud de calcul principal.

Pour associer des SSD locaux, transmettez l'option --num-worker-local-ssds à la commande gcloud dataproc clusters create.

En général, vous n'avez pas besoin de disques SSD locaux sur les nœuds de calcul secondaires. L'ajout de disques SSD locaux aux nœuds de calcul secondaires d'un cluster (à l'aide de l'option --num-secondary-worker-local-ssds) est souvent moins important, car les nœuds de calcul secondaires n'écrivent pas de données de brassage localement. Toutefois, comme les SSD locaux améliorent les performances des disques locaux, vous pouvez décider d'ajouter des SSD locaux aux nœuds de calcul secondaires si vous prévoyez que les tâches seront liées aux E/S en raison de l'utilisation du disque local: votre tâche utilise une partie importante du disque local pour l'espace de travail temporaire ou vos partitions sont trop volumineuses pour tenir en mémoire et seront transférées sur le disque.

Ratio de nœuds de calcul secondaire

Étant donné que les nœuds de calcul secondaires écrivent leurs données de brassage dans les nœuds de calcul principaux, votre cluster doit contenir un nombre suffisant de nœuds de calcul principaux disposant de ressources de processeur, de mémoire et de disque suffisantes pour prendre en charge la charge de brassage de votre tâche. Pour les clusters d'autoscaling, pour empêcher le groupe principal de procéder à un scaling et de provoquer un comportement indésirable, définissez minInstances sur la valeur maxInstances dans la stratégie d'autoscaling du groupe de nœuds de calcul principaux.

Si vous avez un ratio nœuds de calcul principaux/nœud de calculs secondaires élevé (par exemple, 10:1), surveillez l'utilisation du processeur, du réseau et de l'espace disque des nœuds de calcul principaux pour déterminer s'ils sont surchargés. Procédez comme suit :

  1. Accédez à la page Instances de VM de la console Google Cloud.

  2. Cochez la case située à gauche du nœud de calcul principal.

  3. Cliquez sur l'onglet SURVEILLANCE pour afficher l'utilisation du processeur, les IOPS du disque, les octets du réseau et d'autres métriques du nœud de calcul principal.

Si les nœuds de calcul principaux sont surchargés, envisagez d'effectuer un scaling manuel des nœuds de calcul primaires.

Redimensionner le groupe de nœuds de calcul principaux

Le groupe de nœuds de calcul principaux peut faire l'objet d'un scaling à la hausse en toute sécurité, mais sa réduction peut affecter négativement la progression de la tâche. Les opérations qui réduisent le groupe de nœuds de calcul principaux doivent utiliser la mise hors service concertée, qui est activée en définissant l'option --graceful-decommission-timeout.

Clusters avec autoscaling : le scaling du groupe de nœuds de calcul principaux est désactivé sur les clusters EFM avec des stratégies d'autoscaling. Pour redimensionner le groupe de nœuds de calcul principaux sur un cluster avec autoscaling, procédez comme suit :

  1. Désactivez l'autoscaling.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. Effectuez un scaling du groupe principal.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. Réactivez l'autoscaling :

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

Surveiller l'utilisation du disque de nœuds de calcul principaux

Les nœuds de calcul principaux doivent disposer d'un espace disque suffisant pour les données de brassage du cluster. Vous pouvez surveiller surveiller cela indirectement via la métrique remaining HDFS capacity. Lorsque le disque local se remplit, une partie de l'espace disque devient indisponible pour HDFS et la capacité restante diminue.

Par défaut, lorsque le disque local d'un nœud de calcul principal dépasse 90 % de la capacité, le nœud est marqué comme UNHEALTHY dans l'UI du nœud YARN. Si vous rencontrez des problèmes de capacité de disque, vous pouvez supprimer les données inutilisées de HDFS ou faire évoluer à la hausse le pool de nœuds de calcul principaux.

Configuration avancée

Partitionnement et parallélisme

Lorsque vous envoyez une tâche Spark, configurez un niveau de partitionnement approprié. Choisir le nombre de partitions d'entrée et de sortie pour une étape de brassage affecte les caractéristiques de performances différentes. Il est préférable de tester les valeurs qui fonctionnent pour vos formes de tâches.

Partitions d'entrée

Le partitionnement d'entrée Spark et MapReduce est déterminé par l'ensemble de données d'entrée. Lors de la lecture de fichiers depuis Cloud Storage, chaque tâche traite environ une "taille de bloc" de données.

  • Pour les tâches Spark SQL, la taille de partition maximale se contrôle avec la propriété spark.sql.files.maxPartitionBytes. Nous vous conseillons de l'augmenter à 1 Go en spécifiant spark.sql.files.maxPartitionBytes=1073741824.

  • Pour les RDD Spark, la taille de partition est généralement contrôlée avec fs.gs.block.size, qui est de 128 Mo par défaut. Nous vous conseillons de l'augmenter à 1 Go. Exemple : --properties spark.hadoop.fs.gs.block.size=1073741824

Partitions de sortie

Plusieurs propriétés servent à contrôler le nombre de tâches aux étapes suivantes. Pour les tâches volumineuses traitant plus de 1 To, nous vous conseillons de prévoir au moins 1 Go par partition.

  • Pour Spark SQL, le nombre de partitions de sortie est contrôlé par spark.sql.shuffle.partitions.

  • Pour les tâches Spark utilisant l'API RDD, vous pouvez spécifier le nombre de partitions de sortie ou définir spark.default.parallelism.

Réglages du brassage pour le brassage de nœuds de calcul principaux

La propriété la plus importante est --properties yarn:spark.shuffle.io.serverThreads=<num-threads>. Notez qu'il s'agit d'une propriété YARN au niveau du cluster, car le serveur de brassage Spark s'exécute sur le gestionnaire de nœuds. Le nombre de threads par défaut est de deux fois (2x) le nombre de cœurs de la machine (par exemple, 16 threads sur un n1-highmem-8). Si la valeur "Shuffle Read Blocked Time" (durée du blocage en lecture du brassage) est supérieure à une seconde et que les nœuds de calcul principaux n'ont pas atteint les limites de réseau, de processeur ou de disque, nous vous conseillons d'augmenter le nombre de threads du serveur de brassage.

Sur les types de machines plus volumineux, envisagez d'augmenter spark.shuffle.io.numConnectionsPerPeer, dont la valeur par défaut est 1. (Par exemple, définissez-le sur 5 connexions par paire d'hôtes).

Augmenter le nombre de tentatives

Le nombre maximal de tentatives autorisées pour les applications maîtres, les tâches et les étapes peut être configuré en définissant les propriétés suivantes :

yarn:yarn.resourcemanager.am.max-attempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Comme les applications maîtres et les tâches sont arrêtées plus fréquemment dans les clusters utilisant de nombreuses VM préemptives ou l'autoscaling sans mise hors service concertée, augmenter les valeurs des propriétés ci-dessus dans ces clusters peut aider (notez qu'utiliser EFM avec Spark et la mise hors service concertée n'est pas pris en charge).

Mise hors service concertée YARN sur les clusters EFM

La mise hors service concertée YARN peut être utilisée pour supprimer rapidement des nœuds avec un impact minimal sur l'exécution des applications. Pour les clusters d'autoscaling, le délai d'inactivité de la mise hors service concertée peut être défini dans une AutoscalingPolicy associée au cluster EFM.

Améliorations apportées à EFM pour une mise hors service concertée

  1. Les données intermédiaires étant stockées dans un système de fichiers distribué, les nœuds peuvent être supprimés d'un cluster EFM dès la fin de l'exécution de tous les conteneurs sur ces nœuds. En comparaison, les nœuds ne sont pas supprimés sur les clusters Dataproc standards tant que l'exécution de l'application n'est pas terminée.

  2. Pour supprimer des nœuds, il n'est pas nécessaire d'attendre la fin de l'exécution des applications maîtres sur un nœud. Lorsque le conteneur de l'application maître est arrêté, il est reprogrammé sur un autre nœud qui n'est pas mis hors service. La progression de la tâche n'est pas perdue : la nouvelle application maître récupère rapidement l'état de l'application maître précédente en lisant l'historique des tâches.