Blog

Wie man Code mit der Spark Dataframe API schreibt: Kompatibilität und Testen im Fokus

Giovanni Lanzani

Giovanni Lanzani

Aktualisiert Oktober 21, 2025
5 Minuten

Ich habe kürzlich darüber nachgedacht, wie wir Spark-Code mit der Dataframe-API schreiben sollten. Es stellte sich heraus, dass es viele verschiedene Möglichkeiten gibt, die Sie wählen können, und manchmal können harmlos aussehende Entscheidungen auf lange Sicht ins Auge gehen.

Die Frage vor der Frage: DataFrame API oder Spark SQL?

(TL;DR: Verwenden Sie die DataFrame API!)

Bevor Sie beginnen: Ich gehe davon aus, dass Sie den Spark-Kontext als spark erstellt haben und dass Sie einen Datenrahmen mit dem Namen df besitzen.

Der erste Punkt, den Sie beachten sollten, ist, dass Sie Code mit der Dataframe-API wie folgt schreiben können1:

df = ...  # IO here
df_my_column = df.select("my_column") 

Oder verwenden Sie Spark SQL, wie hier

df_my_column = spark.sql("SELECT my_column FROM input")

Beide Methoden haben ihre Vor- und Nachteile.

Die Spark SQL-Methode ist allen Analysten und SQL-Kennern sehr vertraut. Ein Nachteil ist, dass sie zwar einen Datenrahmen zurückgibt, aber jeder Datenrahmen als temporäre Ansicht registriert werden muss, bevor er abgefragt werden kann:

df = source_df.select('a_column')
try:
    spark.sql("select mean(a_column) from df")
except:  # a py4j exception is raised here
    print("It can't find df")

df.createOrReplaceTempView("df")
spark.sql("select mean(a_column) from df")  # now it works

Die Dataframe-API ist viel übersichtlicher:

import pyspark.sql.functions as sf

df = source_df.select('a_column')
df.select(sf.mean('a_column'))

Auf der anderen Seite kann es ziemlich kompliziert und "beängstigend" werden.

from pyspark.sql import Window
d_types = ...
c_types = ...
df.withColumn('type',
              sf.when(sf.sum(sf.col('vehicle').isin(d_types).cast('Int'))
                        .over(Window.partitionBy('id')) > 0, 'd_type')
                .when(sf.col('vehicle').isin(c_types), 'c_type')
                .otherwise('other_type')))

(Um fair zu sein, wäre es auch ziemlich entmutigend, den obigen Teil in SQL zu schreiben).

Aber für mich liegt der eigentliche Vorteil darin, dass ich Objekte auf eine abstraktere Art und Weise zusammenstellen und mit ihnen umgehen kann. Der obige Codeschnipsel sollte im Idealfall eine Funktion sein:

def my_function(df, d_types, c_types):
    return df.withColumn('type',
                         sf.when(sf.sum(sf.col('vehicle').isin(d_types).cast('Int'))
                                   .over(Window.partitionBy('id')) > 0, 'd_type')
                           .when(sf.col('vehicle').isin(c_types), 'c_type')
                           .otherwise('other_type')))

Wenn ich das in Spark SQL umschreiben würde, müsste ich Folgendes tun

def my_function(df, d_types, c_types):
    # do something with d_types and c_types to be able to pass them to SQL
    table_name = 'find_a_unique_table_name_not_to_clash_with_other'
    df.createOrReplaceTempView(table_name)
    return spark.sql("""
                        YOUR SQL HERE WITH %s AND MORE %s's TO INSERT c_types, d_types AND table_name
                     """ % (c_types, d_types, table_name)) 

Die obige Funktion vermischt IO (die createOrReplaceTempView) mit Logik (die SQL-Ausführung). Als Sahnehäubchen obendrauf führt sie eine String-Interpolation durch, was schlecht ist (wirklich sehr schlecht!).

Entwirren würde bedeuten, sie so umzuschreiben

def register_df_as_table(df):
    table_name = .... # generate some random unique name here
    df.createOrReplaceTempView(table_name)
    return table_name

def my_function(table_name, d_types, c_types): 
    # do something with d_types and c_types to be able to pass them to SQL 
    return spark.sql("""
                        YOUR SQL HERE WITH %s AND MORE %s's TO INSERT c_types, d_types AND table_name
                     """ % (c_types, d_types, table_name)) 

Im Prinzip könnten Sie einen Dekorator aus register_df_as_table erstellen und my_function dekorieren, aber Sie sehen, dass das ziemlich kompliziert wird. Mit der Datenrahmen-API können Sie Funktionen viel einfacher zusammenstellen.

Weiter weg komponieren

Nachdem das geklärt ist, lassen Sie uns sehen, wie Sie Ihre Funktionen weiter zusammenstellen und testen können.

Ich werde den Code hier nicht schreiben, aber nehmen wir an, wir haben zwei zusätzliche Funktionen, a_function und another_function, mit einem Ablauf wie diesem:

def load_data(..):
    pass

def my_function(df, other_args):
    pass

def a_function(df, other_args):
    pass

def another_function(df): 
    pass

def main():
    df_1 = load_data(..)
    df_2 = my_function(df_1, args_1)
    df_3 = a_function(df_2, args_2)
    df_4 = another_function(df_3)
    return df_4

Die Benennung dieser Variablen (df_{1..4}) ist furchtbar, aber wie Sie alle wissen, gibt es in der Informatik nur zwei schwierige Probleme: die Benennung von Dingen, die um einen Fehler abweichen, und das Überschreiben von Variablen (z.B. wenn Sie sie alle df nennen).

Eine bessere Alternative wäre es, die verschiedenen Funktionen über Pipelines zu verbinden

def pipe(data, *funcs):
    for func in funcs:
        data = func(data)
    return data

def main():
    partial_my_function = lambda df: my_function(df, args1)
    partial_a_function = lambda df: a_function(df, args2)
    return pipe(load_data(),
                partial_my_function,
                partial_a_function,
                another_function)

Das macht es in meinen Augen viel besser. Das Testen eines solchen Ablaufs würde dann wie folgt aussehen

def get_test_data():
    # do something
    return data

def test_my_function():
    data = get_test_data()
    assert my_function(data, args_1) == something  # ideally this is a bit more involved

def test_a_function():
    partial_my_function = lambda df: my_function(df, args1)
    data = pipe(get_test_data, partial_my_function)
    assert a_function(data) == something

def test_another_function():
    partial_my_function = lambda df: my_function(df, args1) 
    partial_a_function = lambda df: a_function(df, args2)
    data = pipe(get_test_data, partial_my_function, partial_a_function) 
    assert another_function(data) == something

# other tests here

Auf diese Weise werden alle nachfolgenden Tests fehlschlagen, wenn eine der Funktionen nicht funktioniert.2.

Ok, das war eine Menge (Dummy-)Code. Lassen Sie mich wie immer wissen, was Sie denken, vor allem, wenn Sie anderer Meinung sind (ich bin @gglanzani auf Twitter, wenn Sie mich erreichen wollen!).

Wir stellen ein


  1. Technisch gesehen (danke Andrew) vermischt diese Syntax die DataFrame- und die SQL-API. Die DataFrame-Syntax lautet oder oder . Ich bevorzuge immer noch df.select('my_column'), da es meine Absicht besser ausdrückt.  
  2. Sie sollten trotzdem isolierte Tests schreiben, die nicht die Pipeline verwenden, für den Fall, dass Sie zwei Regressionen in verschiedenen Teilen der Pipeline einführen, die ihre Fehler aufheben!

Verfasst von

Giovanni Lanzani

Contact

Let’s discuss how we can support your journey.