Wie behebe ich den Phasenfehler „no space left on device“ bei einem Apache-Spark-Auftrag auf Amazon EMR?

Lesedauer: 6 Minute
0

Wenn ich eine Apache-Spark-Anwendung an einen Amazon-EMR-Cluster übermittle, bricht die Anwendung ab mit dem Phasenfehler „no space left on device“.

Kurzbeschreibung

Bei deiner Apache-Spark-Anwendung tritt möglicherweise aus einem der folgenden Gründe der Fehler no space left on device auf:

  • Während des Shuffle-Vorgangs werden aufgrund des Vorhandenseins von Shuffle-Joins umfangreiche Zwischendaten generiert
  • Ungleichmäßige Verteilung der Datenpartitionen und ungleichmäßige Executor-Workload-Verteilung
  • Falsche Partitionsgröße und -anzahl
  • Unzureichende Verfügbarkeit von Ressourcen wie Datenträger und Arbeitsspeicher

Apache Spark verwendet lokalen Speicher auf den Kern- und Aufgabenknoten, um Zwischendaten (Shuffle) zu speichern. Ist auf den Datenträgern der Instance kein Speicherplatz mehr verfügbar, wird der Auftrag abgebrochen mit der Fehlermeldung no space left on device.

Lösung

Dieser Artikel befasst sich mit den häufigsten Ursachen und Lösungen für Fehler des Typs „no space left on device“. Du musst die Ursache ermitteln, um die entsprechende Lösung implementieren zu können.

Hinweis: Wenn du beim Ausführen von AWS Command Line Interface (AWS CLI)-Befehlen Fehlermeldungen erhältst, findest du weitere Informationen dazu unter Problembehandlung bei der AWS CLI. Stelle außerdem sicher, dass du die neueste Version von AWS CLI verwendest.

Neupartitionierung

Je nachdem, wie viele Kern- und Aufgabenknoten der Cluster umfasst, musst du möglicherweise die Anzahl der Spark-Partitionen erhöhen. Führe den folgenden Befehl aus, um weitere Spark-Partitionen hinzuzufügen:

val numPartitions = 500
val newDF = df.repartition(numPartitions)

Hinweis: Ersetze 500 durch die Anzahl der Partitionen, die deinem Anwendungsfall entspricht.

Spark-Konfigurationen optimieren

Partitionsverwaltung

Wenn während Shuffles zu viel Festplattenüberlauf oder eine ungleichmäßige Datenverteilung zwischen den Partitionen auftritt, optimiere die folgenden Parameter:

spark.default.parallelism=${NUM_CORES * 2} #no. of partitions in RDDs
spark.sql.shuffle.partitions=${NUM_CORES * 2} #no. of shuffle partitions
spark.sql.files.maxPartitionBytes=256MB #max. no. of bytes in a partition when reading files
spark.sql.files.maxRecordsPerFile=10000000

Erhöhe die Parallelität und die Anzahl der Partitionen, wenn eine der folgenden Bedingungen zutrifft:

  • Die Varianz der Aufgabendauer ist größer als das Dreifache der mittleren Dauer
  • Der Überlauf pro Aufgabe ist größer als 20 %

Wenn die durchschnittliche Partitionsgröße weniger als 50 MB beträgt oder zu viele kleine Dateien vorhanden sind, verringere die Parallelität und die Anzahl der Partitionen.

Verwende die folgende Formel, um die optimale Partitionsanzahl zu berechnen:

Initial Partitions = Number of Cores * 2
Optimal Partitions = max(
    Total Input Size / Target Partition Size,
    Initial Partitions
)

Optimierung auf Basis des Datenvolumens

Im Folgenden findest du Konfigurationsparameter für unterschiedliche Datensatzgrößen:

Kleiner Datensatz (<100 GB):

spark.sql.files.maxPartitionBytes=128MB
spark.sql.shuffle.partitions=NUM_CORES * 2
spark.sql.files.maxRecordsPerFile=5000000

Mittlerer Datensatz (100 GB–1 TB):

spark.sql.files.maxPartitionBytes=256MB
spark.sql.shuffle.partitions=NUM_CORES * 3
spark.sql.files.maxRecordsPerFile=10000000

Großer Datensatz (>1 TB):

spark.sql.files.maxPartitionBytes=512MB
spark.sql.shuffle.partitions=NUM_CORES * 4
spark.sql.files.maxRecordsPerFile=20000000

Optimierung von Arbeits- und Festplattenspeicher

Um Arbeits- und Festplattenspeicher zu optimieren, passe deine Konfigurationsparameter an:

spark.memory.fraction=0.8 # Higher for compute-intensive jobs
spark.memory.storageFraction=0.3 # Lower for shuffle-heavy workloads
spark.executor.memoryOverhead=0.2 # 20% of executor memory
spark.memory.offHeap.enabled=true
spark.memory.offHeap.size=${EXECUTOR_MEMORY * 0.2}

Um die gesamte Arbeitsspeicherzuweisung für einen Spark-Executor-Container zu berechnen, werden die folgenden vier Arbeitsspeicherkomponenten addiert:

  • Executor-Speicher (spark.executor.memory)
  • Arbeitsspeicher-Overhead (spark.executor.memoryOverhead)
  • Off-Heap-Arbeitsspeicher (spark.memory.offHeap.size)
  • PySpark-Speicher (spark.executor.pyspark.memory)

Gesamter Executor-Container-Arbeitsspeicher = spark.executor.memory + spark.executor.memoryOverhead + spark.memory.offHeap.size + spark.executor.pyspark.memory

Die folgende Berechnung bestimmt die interne Speicherzuweisung von Spark:

Storage Memory = Executor Memory * memory.fraction * memory.storageFraction
Execution Memory = Executor Memory * memory.fraction * (1 - memory.storageFraction)
Off-Heap Memory = Executor Memory * 0.2

Dateien und Festplattenverwaltung

Wenn eine ungleichmäßige Partitionsverteilung vorliegt oder die Anzahl der Partitionen zu hoch oder zu niedrig ist, müssen die Dateiverwaltungskonfigurationen angepasst werden. Setze maxPartitionNum auf das Doppelte der Gesamtzahl der Kerne und minPartitionNum auf 1, es sei denn, deine Anwendungsfälle erfordern einen anderen Wert.

# File Management
spark.sql.files.minPartitionNum=${NUM_CORES}
spark.sql.files.maxPartitionNum=${NUM_CORES * 4}
spark.shuffle.file.buffer=64k

Wenn maxPartitionNum zu niedrig eingestellt ist, kann dies die Parallelität einschränken und es werden möglicherweise nicht alle Fälle von ungleichmäßiger Partitionierung verhindert.

AQE und Umgang mit Verzerrungen

AQE (Adaptive Query Execution) in Spark ist eine Laufzeitoptimierung, die Abfragepläne auf Grundlage von Echtzeitstatistiken anpasst.

AQE ist in Amazon-EMR-Version 5.30.0 und höher standardmäßig aktiviert. AQE in Spark kann Join-Strategien und Shuffling automatisch optimieren. Dieser Mechanismus kann auch effektiv mit Datenverzerrungen durch dynamische Partitionsaufteilung umgehen. Dies verbessert das Load Balancing und die Abfrageleistung.

# Skew Management
spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=10
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB

Wenn du eine frühere Version von Spark verwendest, die AQE nicht unterstützt, verwende eine der folgenden Methoden zur Behandlung von Datenverzerrungen:

  • Optimiere den Schwellenwert spark.sql.autoBroadcastJoinThreshold für Broadcast-Joins. Dies ist sinnvoll, wenn ein Datensatz bei Joins deutlich kleiner ist als der andere.
  • Verwende repartition() oder coalesce() in deinem Code, um die Datenverteilung zu verbessern.
  • Wende SKEW-Hinweise auf die größere oder verzerrte Tabelle an und übertrage die kleinere Tabelle. SKEW-Hinweise informieren den Spark-Optimierer darüber, dass eine Tabelle verzerrte Daten enthält, und helfen bei der Optimierung von Join-Strategien.

Es folgt ein Beispiel für SKEW-Hinweise in Spark-SQL-Abfragen:

-- Using SKEW hint in Spark SQL
SELECT /*+ SKEW('t1') */
    t1.key, t2.value
FROM table1 t1 JOIN table2 t2 ON t1.key = t2.key

-- Using MAPJOIN hint along with SKEW
SELECT /*+ SKEW('t1'), MAPJOIN(t2) */
    t1.key, t2.value
FROM table1 t1
JOIN table2 t2
ON t1.key = t2.key

Bootstrap-Aktion zum dynamischen Hochskalieren des Festplattenspeichers

Du kannst die Bootstrap-Aktion über Amazon-CloudWatch-Überwachung und Lambda-Automatisierung verwenden, um den Festplattenspeicher für Amazon-EMR-Cluster automatisch zu skalieren. Wenn der verfügbare Datenträger-Speicherplatz unter einen festgelegten Schwellenwert fällt, startet Amazon CloudWatch eine AWS-Lambda-Funktion. Diese Funktion hängt neue Amazon-Elastic-Block-Store(Amazon EBS)-Volumes an die Clusterknoten an. Die Funktion führt dann Skripte aus, um die Volumes zu formatieren, zu mounten und in das Hadoop Distributed File System (HDFS) zu integrieren.

Dieser automatisierte Ansatz verhindert Clusterausfälle, die durch Datenträger-Speicherengpässe verursacht werden, und gewährleistet die Kosteneffizienz, da Kapazitäten nur im Bedarfsfall hinzugefügt werden. Für die Implementierung sind geeignete Identity-and-Access-Management(IAM)-Rollen, Amazon-CloudWatch-Alarme, eine AWS-Lambda-Konfiguration und benutzerdefinierte Skripte für das Volume-Management erforderlich. Weitere Informationen findest du unter Dynamisches Hochskalieren des Festplattenspeichers auf Amazon-EMR-Clustern.

Weitere Amazon-EBS-Kapazitäten hinzufügen

Für neue Cluster: Größere EBS-Volumes verwenden

Starte einen Amazon EMR-Cluster und wähle einen Amazon Elastic Compute Cloud (Amazon EC2)-Instance-Typ mit größeren EBS-Volumes. Weitere Informationen findest du unter Standardmäßiger Amazon-EBS-Festplattenspeicher für Instances.

Für laufende Cluster: Weitere EBS-Volumes hinzufügen

Führe die folgenden Schritte aus:

  1. Wenn ein größeres EBS-Volume das Problem nicht löst, füge weitere EBS-Volumes an die Kern- und Aufgabenknoten an.

  2. Formatiere und mounte die angehängten Volumes. Achte darauf, die richtige Festplattennummer zu verwenden, z. B. /mnt1 oder /mnt2 statt /data.

  3. Verwende einen SSH-Client, um eine Verbindung zum Knoten herzustellen.

  4. Erstelle ein /mnt2/yarn-Verzeichnis und lege dann den YARN-Benutzer als Eigentümer des Verzeichnisses fest:

    sudo mkdir /mnt2/yarn
    sudo chown yarn:yarn /mnt2/yarn
  5. Füge das Verzeichnis /mnt2/yarn der Eigenschaft yarn.nodemanager.local-dirs von /etc/hadoop/conf/yarn-site.xml hinzu.
    Beispiel:

    <property>
        <name>yarn.nodemanager.local-dirs</name>
        <value>/mnt/yarn,/mnt1/yarn,/mnt2/yarn</value>
    </property>
  6. Starte den NodeManager-Service neu:
    Amazon EMR Release-Versionen 4.x-5.29.0

    sudo stop hadoop-yarn-nodemanager
    sudo start hadoop-yarn-nodemanager

    Amazon EMR 5.30.0 und neuere Release-Versionen

    sudo systemctl stop hadoop-yarn-nodemanager
    sudo systemctl start hadoop-yarn-nodemanager

Ähnliche Informationen

Wie kann ich Stufenfehler in Spark-Jobs auf Amazon EMR beheben?

Was ist Amazon EMR auf EKS?

Was ist Amazon EMR Serverless?

Bewährte Methoden auf der AWS-Open-Data-Analytics-Website

AWS OFFICIAL
AWS OFFICIALAktualisiert vor 24 Tagen