Conseils pour régler des tâches Spark

Les sections suivantes vous fournissent des conseils pour vous aider à affiner vos applications Dataproc Spark.

Utiliser des clusters éphémères

Lorsque vous utilisez le modèle de cluster "éphémère" Dataproc, vous créez un cluster dédié pour chaque tâche, et lorsque la tâche est terminée, vous supprimez le cluster. Avec le modèle éphémère, vous pouvez traiter le stockage et le calcul séparément, en enregistrant les données d'entrée et de sortie de la tâche dans Cloud Storage ou BigQuery, et en utilisant le cluster uniquement pour le calcul et le stockage de données temporaires.

Écueils des clusters persistants

L'utilisation de clusters éphémères à une seule tâche évite les écueils et les problèmes potentiels suivants associés à l'utilisation de clusters "persistants" partagés et de longue durée:

  • Points de défaillance uniques: un état d'erreur de cluster partagé peut entraîner l'échec de toutes les tâches, bloquant ainsi l'ensemble d'un pipeline de données. L'examen et la récupération après une erreur peuvent prendre des heures. Étant donné que les clusters éphémères ne conservent que des états temporaires au sein du cluster, ils peuvent être rapidement supprimés et recréés en cas d'erreur.
  • Difficulté à gérer et à migrer les états de cluster dans HDFS, MySQL ou des systèmes de fichiers locaux
  • Conflits de ressources entre les jobs qui affectent négativement les SLO
  • Démons de service non réactifs dus à une pression de mémoire
  • Accumulation de journaux et de fichiers temporaires pouvant dépasser la capacité de stockage du disque
  • Échec de l'ajustement à la hausse en raison d'indisponibilité de la zone de cluster
  • L'absence de prise en charge des versions obsolètes des images de cluster

Avantages des clusters éphémères

À l'inverse, les clusters éphémères vous permettent d'effectuer les opérations suivantes:

  • Configurez différentes autorisations IAM pour différentes tâches avec différents comptes de service de VM Dataproc.
  • Optimisez les configurations matérielles et logicielles d'un cluster pour chaque tâche, en modifiant les configurations du cluster si nécessaire.
  • Mettez à niveau les versions d'image dans les nouveaux clusters pour obtenir les derniers correctifs de sécurité, correctifs de bugs et optimisations.
  • Résolvez les problèmes plus rapidement sur un cluster à tâche unique isolé.
  • Réduisez vos coûts en ne payant que le temps d'exécution du cluster éphémère, et non le temps d'inactivité entre les jobs sur un cluster partagé.

Utiliser Spark SQL

L'API DataFrame Spark SQL est une optimisation importante de l'API RDD. Si vous interagissez avec du code qui utilise des RDD, envisagez de lire les données sous forme de DataFrame avant de transmettre un RDD dans le code. En code Java ou Scala, envisagez d'utiliser l'API Dataset Spark SQL en tant que sur-ensemble de RDD et de DataFrames.

Utiliser Apache Spark 3

Dataproc 2.0 installe Spark 3, qui inclut les fonctionnalités et les améliorations de performances suivantes:

  • Compatibilité avec les GPU
  • Lire des fichiers binaires
  • Amélioration des performances
  • Éliminer des partitions dynamiques
  • L'exécution des requêtes adaptable, qui optimise les tâches Spark en temps réel.

Utiliser l'allocation dynamique

Apache Spark inclut une fonctionnalité d'allocation dynamique qui ajuste le nombre d'exécuteurs Spark sur les nœuds de calcul d'un cluster. Cette fonctionnalité permet à une tâche d'utiliser le cluster Dataproc complet, même lorsqu'il évolue à la hausse. Cette fonctionnalité est activée par défaut sur Dataproc (spark.dynamicAllocation.enabled est défini sur true). Pour plus d'informations, consultez la section Allocation dynamique Spark.

Utilisez l'autoscaling Dataproc.

L'autoscaling Dataproc ajoute et supprime dynamiquement des nœuds de calcul Dataproc dans un cluster pour vous assurer que les tâches Spark disposent des ressources nécessaires pour se terminer rapidement.

Il est recommandé de configurer la règle d'autoscaling pour ne faire évoluer que les nœuds de calcul secondaires.

Utiliser le mode de flexibilité améliorée de Dataproc

Les clusters avec des VM préemptives ou une règle d'autoscaling peuvent recevoir des exceptions FetchFailed lorsque les nœuds de calcul sont préemptés ou supprimés avant de terminer la diffusion des données de brassage auprès des réducteurs. Cette exception peut entraîner des tentatives d'exécution de tâches et des délais d'exécution plus longs.

Recommandation: Utilisez le mode de flexibilité améliorée de Dataproc, qui ne stocke pas les données de brassage intermédiaires sur les nœuds de calcul secondaires, afin que ces derniers puissent être préemptés ou réduits en toute sécurité.

Configurer le partitionnement et le brassage

Spark stocke les données dans des partitions temporaires sur le cluster. Si votre application regroupe ou joint des DataFrames, elle brasse les données dans de nouvelles partitions en fonction du regroupement et de la configuration de bas niveau.

Le partitionnement des données a un impact important sur les performances de l'application: trop peu de partitions limitent le parallélisme des tâches et l'utilisation des ressources du cluster. Un trop grand nombre de partitions ralentit la tâche en raison d'un traitement et d'un brassage supplémentaires des partitions.

Configurer des partitions

Les propriétés suivantes régissent le nombre et la taille de vos partitions:

  • spark.sql.files.maxPartitionBytes: taille maximale des partitions lorsque vous lisez des données dans Cloud Storage. La valeur par défaut est de 128 Mo, ce qui est suffisamment volumineux pour la plupart des applications qui traitent moins de 100 To.

  • spark.sql.shuffle.partitions: nombre de partitions après avoir effectué un brassage. La valeur par défaut est de 200, ce qui convient pour les clusters comportant moins de 100 processeurs virtuels au total. Recommandation: Définissez ce paramètre sur trois fois le nombre de processeurs virtuels de votre cluster.

  • spark.default.parallelism: nombre de partitions renvoyées après avoir effectué des transformations RDD nécessitant des brassages, tels que join, reduceByKey et parallelize. La valeur par défaut correspond au nombre total de processeurs virtuels dans votre cluster. Lorsque vous utilisez des RDD dans des tâches Spark, vous pouvez définir ce nombre sur trois fois vos processeurs virtuels.

Limiter le nombre de fichiers

Une perte de performances survient lorsque Spark lit un grand nombre de petits fichiers. Stockez les données dans des tailles de fichier plus importantes, par exemple entre 256 Mo et 512 Mo. De même, limitez le nombre de fichiers de sortie (pour forcer le brassage, consultez la section Éviter les brassages inutiles).

Configurer l'exécution de requêtes adaptable (Spark 3)

L'exécution de requêtes adaptable (activée par défaut dans la version 2.0 de l'image Dataproc) offre des améliorations de performances des tâches Spark, y compris:

Bien que les paramètres de configuration par défaut soient acceptables dans la plupart des cas, il peut être utile de définir spark.sql.adaptive.advisoryPartitionSizeInBytes sur spark.sqlfiles.maxPartitionBytes (128 Mo par défaut).

Éviter les brassages inutiles

Spark permet aux utilisateurs de déclencher manuellement un brassage afin de rééquilibrer leurs données avec la fonction repartition. Les brassages étant coûteux, les brassages de données doivent être utilisés avec précaution. La définition des configurations de partition devrait être suffisante pour permettre à Spark de partitionner automatiquement vos données.

Exception:Lors de l'écriture de données partitionnées par colonne dans Cloud Storage, le repartitionnement sur une colonne spécifique évite d'écrire de nombreux petits fichiers pour obtenir des temps d'écriture plus rapides.

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Stocker des données dans Parquet ou Avro

Spark SQL utilise par défaut la lecture et l'écriture de données dans des fichiers Parquet compressés avec Snappy. Parquet est un format de fichiers en colonnes efficace qui permet à Spark de ne lire que les données nécessaires à l'exécution d'une application. C'est un avantage important lorsque vous travaillez avec de grands ensembles de données. D'autres formats en colonnes, tels qu'Apache ORC, fonctionnent également bien.

Apache Avro fournit un format de fichier de ligne binaire efficace pour les données non en colonnes. Bien que généralement plus lent que Parquet, les performances d'Avro sont meilleures que celles de formats texte, tels que CSV ou JSON.

Optimiser la taille du disque

Le débit des disques persistants évolue avec la taille du disque, ce qui peut affecter les performances des tâches Spark, car elles écrivent des métadonnées et brassent les données sur le disque. Lorsque vous utilisez des disques persistants standards, la taille du disque doit être d'au moins 1 téraoctet par nœud de calcul (consultez la section Performances par taille de disque persistant).

Pour surveiller le débit des disques de calcul dans Google Cloud Console:

  1. Cliquez sur le nom du cluster sur la page Clusters.
  2. Cliquez sur l'onglet Instances de VM.
  3. Cliquez sur le nom d'un nœud de calcul.
  4. Cliquez sur l'onglet "MONITORING", puis faites défiler la page jusqu'à la section "Débit du disque" pour afficher le débit des nœuds de calcul.

Considérations relatives aux disques

Les clusters Dataproc éphémères, qui ne bénéficient pas du stockage persistant peuvent utiliser des disques SSD locaux. Les disques SSD locaux sont rattachés physiquement au cluster et offrent un débit plus élevé que les disques persistants (voir le tableau des performances). Les disques SSD locaux sont disponibles dans une taille fixe de 375 gigaoctets, mais vous pouvez en ajouter plusieurs pour augmenter les performances.

Les disques SSD locaux ne conservent pas les données après l'arrêt d'un cluster. Si vous avez besoin d'un stockage persistant, vous pouvez utiliser des disques persistants SSD, qui offrent un débit plus élevé pour leur taille par rapport aux disques persistants standards. Les disques persistants SSD constituent également un bon choix si la taille de la partition est inférieure à 8 Ko. Toutefois, évitez les petites partitions.

Associer des GPU à votre cluster

Spark 3 est compatible avec les GPU. Utilisez des GPU avec l'action d'initialisation RAPIDS pour accélérer les tâches Spark à l'aide de l'accélérateur SQL RAPIDS. L'action d'initialisation du pilote de GPU pour configurer un cluster avec des GPU.

Échecs et correctifs courants des tâches

Mémoire saturée

Par exemple :

  • "Exécuteur perdu"
  • "java.lang.OutOfMemoryError: limite de surcharge GC dépassée"
  • "Conteneur supprimé par YARN pour avoir dépassé les limites de mémoire"

Corrections possibles :

Échecs de la récupération de fichiers brassés

Par exemple :

  • "FetchFailedException" (erreur Spark)
  • "Échec de la connexion à..." (Erreur Spark)
  • "Échec de la récupération" (erreur MapReduce)

généralement causée par la suppression prématurée des nœuds de calcul contenant toujours des données de brassage à diffuser.

Causes et correctifs possibles:

  • Les VM de nœuds de calcul préemptifs ont été récupérées ou les VM de nœuds de calcul non préemptifs ont été supprimés par l'autoscaler. Solution: utilisez le mode de flexibilité améliorée pour rendre les nœuds de calcul secondaires préemptifs ou évolutifs en toute sécurité.
  • L'exécuteur ou le mappeur a planté en raison d'une erreur "OutOfMemory". Solution : augmentez la mémoire de l'exécuteur ou du mappeur.
  • Le service de brassage Spark peut être surchargé. Solution : réduisez le nombre de partitions de tâches.

Les nœuds YARN ne sont PAS OPÉRATIONNELS

Exemples (à partir des journaux YARN):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

Souvent associée à un espace disque insuffisant pour les données de brassage. Diagnostiquez le problème en affichant les fichiers journaux:

  • Ouvrez la page Clusters de votre projet dans la console Google Cloud, puis cliquez sur le nom du cluster.
  • Cliquez sur View logs (Afficher les journaux).
  • Filtrez les journaux par hadoop-yarn-nodemanager.
  • Recherchez "UNHEALTHY".

Solutions possibles:

  • Le cache utilisateur est stocké dans le répertoire spécifié par la propriété yarn.nodemanager.local-dirs dans yarn-site.xml file. Ce fichier se trouve à l'emplacement /etc/hadoop/conf/yarn-site.xml. Vous pouvez vérifier l'espace libre dans le chemin /hadoop/yarn/nm-local-dir et libérer de l'espace en supprimant le dossier de cache utilisateur /hadoop/yarn/nm-local-dir/usercache.
  • Si le journal affiche l'état "UNHEALTHY", recréez votre cluster avec un espace disque plus important, ce qui augmentera le plafond de débit.

Échec de la tâche en raison d'une mémoire de pilote insuffisante

Lorsque vous exécutez des tâches en mode cluster, elles échouent si la taille de mémoire du nœud maître est nettement supérieure à celle du nœud de calcul.

Exemple tiré des journaux des pilotes:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

Solutions possibles:

  • Définissez spark:spark.driver.memory comme inférieur à yarn:yarn.scheduler.maximum-allocation-mb.
  • Utilisez le même type de machine pour les nœuds maître et de calcul.

Pour en savoir plus