Passer au contenu

Comment puis-je résoudre les échecs d’étape « aucun espace restant sur l’appareil » dans une tâche Apache Spark sur Amazon EMR ?

Lecture de 8 minute(s)
0

Lorsque je soumets une application Apache Spark à un cluster Amazon EMR, l'application échoue avec un échec d’étape « aucun espace restant sur l’appareil ».

Brève description

Votre application Apache Spark peut rencontrer une erreur aucun espace restant sur l'appareil pour l'une des raisons suivantes :

  • Des données intermédiaires importantes sont générées pendant le processus de shuffle en raison de la présence de jointures de shuffle
  • Répartition inégale des partitions de données et de la charge de travail de l’exécuteur
  • Dimensionnement et nombre de partitions incorrects
  • Disponibilité insuffisante de ressources telles que le disque et la mémoire

Apache Spark utilise le stockage local sur les nœuds principaux et de tâche pour stocker les données (shuffle) intermédiaires. Lorsque les disques manquent d'espace sur l'instance, la tâche échoue avec une erreur indiquant qu’il ne reste plus d’espace sur l’appareil.

Résolution

Cet article décrit les causes les plus fréquentes des erreurs de manque d'espace sur l’appareil et les solutions. Vous devez identifier la cause principale pour implémenter le correctif approprié.

Remarque : Si des erreurs surviennent lorsque vous exécutez des commandes de l'interface de la ligne de commande AWS (AWS CLI), consultez la section Résoudre des erreurs liées à l’AWS CLI. Vérifiez également que vous utilisez bien la version la plus récente de l’AWS CLI.

Repartitionnement

En fonction du nombre de nœuds principaux et de nœuds de tâches présents dans le cluster, vous devrez peut-être augmenter le nombre de partitions Spark. Pour ajouter d'autres partitions Spark, exécutez la commande suivante :

val numPartitions = 500
val newDF = df.repartition(numPartitions)

Remarque : Remplacez 500 par le nombre de partitions correspondant à votre cas d'utilisation.

Régler les configurations de Spark

Gestion des partitions

En cas de débordement excessif du disque lors des shuffles ou en cas de répartition inégale des données entre les partitions, réglez les paramètres suivants :

spark.default.parallelism=${NUM_CORES * 2} #no. of partitions in RDDs
spark.sql.shuffle.partitions=${NUM_CORES * 2} #no. of shuffle partitions
spark.sql.files.maxPartitionBytes=256MB #max. no. of bytes in a partition when reading files
spark.sql.files.maxRecordsPerFile=10000000

Augmentez le parallélisme et le nombre de partitions si l'une des conditions suivantes est vraie :

  • L'écart de durée des tâches est supérieur à 3 fois la durée moyenne
  • Le débordement par tâche est supérieur à 20 %

Si la taille moyenne des partitions est inférieure à 50 Mo ou si le nombre de petits fichiers est excessif, diminuez le parallélisme et le nombre de partitions.

Pour calculer le nombre de partitions optimal, utilisez la formule suivante :

Initial Partitions = Number of Cores * 2
Optimal Partitions = max(
    Total Input Size / Target Partition Size,
    Initial Partitions
)

Réglage en fonction du volume de données

Les paramètres de configuration pour différentes tailles de jeux de données sont présentés ci-dessous :

Jeu de données de petite taille (<100 Go) :

spark.sql.files.maxPartitionBytes=128MB
spark.sql.shuffle.partitions=NUM_CORES * 2
spark.sql.files.maxRecordsPerFile=5000000

Jeu de données de taille moyenne (100 Go-1 To) :

spark.sql.files.maxPartitionBytes=256MB
spark.sql.shuffle.partitions=NUM_CORES * 3
spark.sql.files.maxRecordsPerFile=10000000

Jeu de données de grande taille (>1 To) :

spark.sql.files.maxPartitionBytes=512MB
spark.sql.shuffle.partitions=NUM_CORES * 4
spark.sql.files.maxRecordsPerFile=20000000

Optimisation de la mémoire et du stockage

Pour optimiser la mémoire et le stockage, mettez à jour vos paramètres de configuration :

spark.memory.fraction=0.8 # Higher for compute-intensive jobs
spark.memory.storageFraction=0.3 # Lower for shuffle-heavy workloads
spark.executor.memoryOverhead=0.2 # 20% of executor memory
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=${EXECUTOR_MEMORY * 0.2}

Pour calculer l'allocation de mémoire totale pour un conteneur d'exécuteur Spark, les quatre composants de mémoire suivants sont combinés :

  • Mémoire de l'exécuteur (spark.executor.memory)
  • Surcharge de mémoire (spark.executor.memoryOverhead)
  • Mémoire hors tas (spark.memory.offHeap.size)
  • Mémoire PySpark (spark.executor.pyspark.memory)

Mémoire totale du conteneur d'exécution = spark.executor.memory + spark.executor.memoryOverhead + spark.memory.offHeap.size + spark.executor.pyspark.memory

Le calcul suivant détermine l'allocation de mémoire interne de Spark :

Storage Memory = Executor Memory * memory.fraction * memory.storageFraction
Execution Memory = Executor Memory * memory.fraction * (1 - memory.storageFraction)
Off-Heap Memory = Executor Memory * 0.2

Gestion des fichiers et des disques

En cas d'asymétrie des partitions ou si le nombre de partitions est trop élevé ou trop faible, ajustez les configurations de gestion de fichiers. Définissez maxPartitionNum à 2 fois le nombre total de cœurs et minPartitionNum à 1, sauf si vos cas d'utilisation requièrent une valeur différente.

# File Management
spark.sql.files.minPartitionNum=${NUM_CORES}
spark.sql.files.maxPartitionNum=${NUM_CORES * 4}
spark.shuffle.file.buffer=64k

Si maxPartitionNum est défini à une valeur trop faible, cela pourrait limiter le parallélisme et ne pas empêcher tous les scénarios d'asymétrie.

AQE et traitement de l’asymétrie

L'AQE (Adaptive Query Execution) de Spark est une optimisation de l'exécution qui ajuste les plans de requêtes en fonction des statistiques en temps réel.

L'AQE est activée par défaut dans les versions 5.30.0 et ultérieures d'Amazon EMR. L'AQE dans Spark peut optimiser les stratégies de jointure et le shuffling automatique. Il peut également gérer efficacement l'asymétrie des données grâce à la division dynamique des partitions. Cela améliore l'équilibrage de charge et les performances des requêtes.

# Skew Management
spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=10
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB

Si vous utilisez une version antérieure de Spark qui ne prend pas en charge l'AQE, appliquez l'une des méthodes suivantes pour gérer l’asymétrie des données :

  • Réglez le seuil de spark.sql.autoBroadcastJoinThreshold pour les jointures de diffusion. Cela est utile lorsqu'un jeu de données est significativement plus petit que l'autre en termes de jointure.
  • Utilisez repartition() ou coalesce() dans votre code pour améliorer la distribution des données.
  • Appliquez des indicateurs SKEW au tableau le plus grand ou asymétrique et diffusez le tableau de plus petite taille. Les indicateurs SKEW notifient l'optimiseur Spark qu'une table contient des données asymétriques et permet d'optimiser les stratégies de jointure.

Voici un exemple d'indicateurs SKEW dans les requêtes Spark SQL :

-- Using SKEW hint in Spark SQL
SELECT /*+ SKEW('t1') */
    t1.key, t2.value
FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key

-- Using MAPJOIN hint along with SKEW
SELECT /*+ SKEW('t1'), MAPJOIN(t2) */
    t1.key, t2.value
FROM table1 t1
JOIN table2 t2
ON t1.key = t2.key

Action Bootstrap pour augmenter le stockage de façon dynamique

Vous pouvez utiliser l'action Bootstrap via la surveillance Amazon CloudWatch et l'automatisation Lambda pour mettre à l’échelle automatiquement le stockage pour les clusters Amazon EMR. Lorsque l'espace disque disponible tombe en dessous d'un seuil défini, Amazon CloudWatch lance une fonction AWS Lambda. Cette fonction associe de nouveaux volumes Amazon Elastic Block Store (Amazon EBS) aux nœuds du cluster. La fonction exécute ensuite des scripts pour formater, monter et intégrer les volumes dans le système de fichiers distribué Hadoop (HDFS).

Cette approche automatisée permet d'éviter les pannes de cluster causées par des contraintes de stockage et garantit la rentabilité en ajoutant des capacités uniquement lorsque cela est nécessaire. La mise en œuvre requiert des rôles de gestion des identités et des accès (IAM) appropriés, des alarmes Amazon CloudWatch, une configuration AWS Lambda et des scripts personnalisés pour la gestion des volumes. Pour plus d'informations, consultez la section Augmenter le stockage de façon dynamique sur les clusters Amazon EMR.

Augmenter la capacité d’Amazon EBS

Pour les nouveaux clusters, utilisez des volumes EBS de plus grande taille

Lancez un cluster Amazon EMR et choisissez un type d'instance Amazon Elastic Compute Cloud (Amazon EC2) avec des volumes EBS de plus grande taille. Pour plus d'informations, consultez la section Stockage Amazon EBS par défaut pour les instances.

Pour l’exécution des clusters, ajouter d'autres volumes EBS

Procédez comme suit :

  1. Si un volume EBS de plus grande taille ne résout pas le problème, attachez d'autres volumes EBS au nœud principal et au nœud de tâches.

  2. Formatez et montez les volumes attachés. Veillez à utiliser le numéro de disque correct, par exemple /mnt1 ou /mnt2 au lieu de /data.

  3. Utilisez un client SSH pour vous connecter au nœud.

  4. Créez un répertoire /mnt2/yarn, puis définissez la propriété du répertoire sur l'utilisateur YARN :

    sudo mkdir /mnt2/yarn
    sudo chown yarn:yarn /mnt2/yarn
  5. Ajoutez le répertoire /mnt2/yarn à la propriété yarn.nodemanager.local-dirs de /etc/hadoop/conf/yarn-site.xml.
    Exemple :

    <property>
        <name>yarn.nodemanager.local-dirs</name>
        <value>/mnt/yarn,/mnt1/yarn,/mnt2/yarn</value>
    </property>
  6. Redémarrez le service NodeManager :
    Versions finales d'Amazon EMR 4.x-5.29.0

    sudo stop hadoop-yarn-nodemanager
    sudo start hadoop-yarn-nodemanager

    Amazon EMR 5.30.0 et versions finales ultérieures

    sudo systemctl stop hadoop-yarn-nodemanager
    sudo systemctl start hadoop-yarn-nodemanager

Informations connexes

Comment résoudre les échecs d’étape dans les tâches Spark sur Amazon EMR ?

Qu'est-ce qu'Amazon EMR sur EKS ?

Qu'est-ce qu'Amazon EMR sans serveur ?

Bonnes pratiques sur le site Web d’AWS Open Data Analytics

AWS OFFICIELA mis à jour il y a un an