Blog

Überraschungen für Uneingeweihte

Giovanni Lanzani

Aktualisiert Oktober 21, 2025
6 Minuten

Vor kurzem habe ich einen Spark-Kurs gehalten. In einer der Übungen wurden die Studenten aufgefordert, einen Spark DataFrame in zwei sich nicht überschneidende Teile zu teilen.

Einer der Studenten hat sich dafür eine kreative Lösung einfallen lassen.

geteilt

Er begann damit, dem DataFrame eine monoton ansteigende ID-Spalte hinzuzufügen. Spark hat dafür eine eingebaute Funktion, monotonically_increasing_id - wie man sie verwendet, finden Sie in den Dokumenten unter https://spark.apache.org/docs/latest/api/python/pyspark.sql.html.

Seine Idee war ziemlich einfach: Nachdem er eine neue Spalte mit dieser steigenden ID erstellt hatte, wählte er eine Teilmenge des ursprünglichen DataFrame aus und führte dann einen Anti-Join mit der ursprünglichen Spalte durch, um das Komplement zu finden1.

Dies funktionierte jedoch nicht. Sein ursprünglicher DataFrame hatte 10000 Zeilen. Die erste Unterauswahl hatte 3000 Zeilen. Das Ergebnis des Anti-Join hatte jedoch keine 7000 Einträge.

War sein Code falsch? Im Grunde genommen war der Code sehr einfach. Er würde zunächst einen DataFrame lesen

df = (
    spark
    .read.csv('chicken.csv', header=True)
    .repartition(40)  # simulate "big" for illustrative purposes
    .withColumn('increasing_id', sf.monotonically_increasing_id())
)

gefolgt von einer Unterauswahl

df_small = df.filter(sf.col('weight') < 100)

und schließlich das Anti-Join

df_large_on_increasing_id = df.join(
    df_small, how='leftanti', on='increasing_id'
)

(Haftungsausschluss: Dieser Code ist vereinfacht; df_large_on_increasing_id kann natürlich auch durch die Auswahl von allem mit einem Gewicht größer oder gleich 100 erhalten werden. Danke, dass Sie es bemerkt haben!)

Wenn wir nun alles aus df_large_on_increasing_id auswählen, dessen Gewicht kleiner als 100 ist, sollten wir einen leeren DataFrame erhalten. Tun wir das?

df_large_on_increasing_id.filter(sf.col('weight') < 100).show(n=6)
increasing_ididGewichtZeitKükenDiät
229687.08.0272
518242.00.0171
1140376.06.0363
858993459237749.02.0343
858993459634262.04.0313
858993460115281.018.0131

Offensichtlich nicht! Aber was ist dann los? Bei der Erstellung von df_small sollte ich nur Datensätze mit einer Gewichtung kleiner als 100 belassen. Zum Beispiel

df_small.show(n=3)
increasing_ididGewichtZeitKükenDiät
858993459237749.02.0343
858993459634262.04.0313
858993460115281.018.0131

Wenn Sie einen Anti-Join mit dem ursprünglichen DataFrame durchführen, sollten diese Datensätze auf jeden Fall entfernt werden: die increasing_ids 8589934592, 8589934596 und 8589934601 sind in df_small vorhanden, also sollten sie nicht in df_large_on_increasing_id = df - df_small vorhanden sein!

Das Problem ist subtil, aber wenn Sie mit der Funktionsweise von Spark vertraut sind, sollten Sie es bereits bemerkt haben!

Spark ist standardmäßig faul: Das bedeutet, dass Spark beim Aufruf von monotonically_increasing_id nichts weiter tut, als zu verfolgen, wann wir die Spalte increasing_id tatsächlich benötigen, um sie mit der Funktion monotonically_increasing_id zu berechnen.

Aber wann brauchen wir diese Spalte? Nun, es stellt sich heraus, dass wir sie erst ziemlich spät im Prozess brauchen: erst wenn wir show aufrufen, beginnt die Berechnung!

Na und, werden Sie vielleicht sagen! Wenn die Berechnung ausgelöst wird, warum sollte das Ergebnis dann anders sein?

Nun, der Hinweis steht in der Dokumentation der Funktion monotonically_increasing_id (Hervorhebung von mir):

Die Funktion ist nicht-deterministisch, da ihr Ergebnis von den Partitions-IDs abhängt.

Interessant: die IDs hängen von der Partitions-ID ab! Das bedeutet, dass die Funktion die Partitions-IDs von df_small verwendet, wenn sie die Berechnung für df_small durchführt, während sie die Partitions-IDs von df verwendet, wenn sie die Berechnung für df durchführt!

Wie können wir überprüfen, ob dies wahr ist? (Oder True, wenn Sie pythonisch veranlagt sind)

Wir können die Spalte einfach zwischenspeichern und dann materialisieren , bevor wir sie verwenden!

Dazu führen wir zunächst cache den DataFrame aus - cache sagt Spark: Sobald dieser DataFrame berechnet wurde, halten Sie ihn so lange wie möglich im Speicher - und führen dann ein count aus - count sagt Spark: Jetzt möchte ich die Ergebnisse materialisieren (und mir dann sagen, wie viele Datensätze sie enthalten!)

df = (
    spark
    .read.csv('chicken.csv', header=True)
    .repartition(40)  # simulate "big"
    .withColumn('increasing_id', sf.monotonically_increasing_id())
)
# once an action will be triggered, count in this case,
# Spark will hold df as it is now in memory
df.cache()
# this is the action that materializes df
df.count()

Wenn wir nun den Code von vorhin erneut ausführen, erhalten wir ein ganz anderes Ergebnis!

df_small = df.filter(sf.col('weight') < 100)
df_large_on_increasing_id = df.join(
    df_small, how='leftanti', on='increasing_id'
)
df_large_on_increasing_id.filter(sf.col('weight') < 100).show()
increasing_ididGewichtZeitKükenDiät

Der DataFrame ist leer, wie wir erwartet haben!!

Das war eine gute Lösung, aber funktioniert sie auch in der Produktion?

Die Antwort liegt in dem, was ich zuvor geschrieben habe: Spark behält den DataFrame nur so lange im Speicher, wie es kann! Wenn der Speicher, aus welchem Grund auch immer, knapp wird, wird der DataFrame entfernt und neu berechnet, so dass Sie die falschen Ergebnisse erhalten!

Wenn Sie Andrew Snare als Kollegen haben, haben Sie es wahrscheinlich schon gehört: Die Verwendung von cache zur Sicherstellung der Korrektheit in Spark ist ein gefährliches Anti-Muster.2.

Die Studenten fragten sofort, ob es eine Möglichkeit gibt, dieses Verhängnis zu umgehen. Die Antwort ist einfach: Schreiben Sie den DataFrame zurück auf die Festplatte, nachdem Sie die Spalte hinzugefügt haben. So verschwenderisch das auch erscheinen mag3 es ist die einzige Möglichkeit, die Korrektheit zu gewährleisten, ohne die Logik zu ändern (d.h. den Anti-Join mit der monoton steigenden ID durchzuführen).4

Sind Sie an einer Schulung zu Data Science mit Spark interessiert?

Der Kurs, über den ich am Anfang dieses Beitrags geschrieben habe, ist einer unserer besten. Lesen Sie mehr auf der entsprechenden Seite.


Wenn Sie mehr über den Monat erfahren möchten, folgen Sie mir auf Twitter: Ich bin dort gglanzani!


  1. Die Details sind ein wenig komplizierter. Ein Anti-Join ist im Grunde genommen eine Subtraktion. Wenn Sie df1.anti-join(df2) schreiben, machen Sie effektiv df1 - df2, d.h. geben Sie mir alle Datensätze in df1, die nicht in df2 vorkommen.  

  2. Es kann immer noch nützlich für die Leistung sein, aber verlassen Sie sich nicht darauf, wenn es um die Korrektheit geht.

  3. Erzählerstimme: Das ist es.

  4. Dies ist der richtige Ort, um alle daran zu erinnern, dass es auch andere Möglichkeiten gibt, einen DataFrame aufzuteilen, und dass die Übung nicht die Verwendung einer monoton ansteigenden ID erforderte, um durchgeführt zu werden. Dennoch war dies einer der Höhepunkte des Tages für die Studenten: all das theoretische Wissen, das sie bis dahin aufgesaugt hatten, zu sammeln, um etwas zu beheben, das scheinbar ein Fehler in Spark, tatsächlich aber ein Fehler in ihrem Code war.  

Verfasst von

Giovanni Lanzani

Contact

Let’s discuss how we can support your journey.