Blog
Wie Sie Zeit und Geld sparen, indem Sie Spark vor Ort testen

This post was originally published at 47deg.com on June 21, 2022.
Warum sollten Sie noch einmal darüber nachdenken, Spark zu testen?
Manchmal ist es gut, darauf zurückzublicken, wie die Dinge eigentlich gemacht werden sollten, was auf dem Weg dorthin schief gelaufen ist und wie man es wieder gut machen kann. Data Engineers waren durch den Druck des Augenblicks versucht, das Testen ganz aufzugeben. Hier sind die Gründe dafür:
- Der Start des Spark-Jobs vor Ort war sehr günstig.
- Sie brauchten keine eigenen Daten zu generieren, sondern konnten einfach einen Prozentsatz der Produktionsdaten nehmen.
- Außerdem waren keine Unit-Tests erforderlich, da die Anwendung auf einmal getestet wurde, indem sie im Cluster ausgeführt wurde, und die Daten mit einigen einfachen SQL-Befehlen validiert werden konnten.
Diese Vorgehensweise war zwar nicht perfekt, funktionierte aber die meiste Zeit. Dennoch begannen die Data Engineers unter sich wiederholenden Arbeiten zu leiden, da sie darauf warten mussten, dass die Daten geladen oder die Ausgaben validiert wurden. In vielen Fällen landeten diese Aufgaben auf den Schultern der Data Engineers selbst.
Die Cloud ist nicht kostenlos
In den letzten drei Jahren ist das, was einst ein Problem für einige war, zu einem allgemeinen Problem für die meisten Branchen geworden, die Daten nutzen. Der Preis für Cloud Computing ist durch die Kombination aus wachsender Nachfrage und teurerer Hardware gestiegen:
- Unterbrechung der Lieferkette: Engpässe bei Chips (GPU, CPU), Auftragsrückstände, defekte Lagerbestände.
- Covid-19: Fernarbeit wurde von vielen Unternehmen eingeführt, indem sie auf Cloud-Dienste zurückgriffen.
- Datengesteuert: Unternehmen, die ihre internen Prozesse umgestalten, nutzen häufig Cloud-Dienste.
Zusätzlich zu diesen marktbezogenen Gründen geben die meisten Unternehmen zu, dass sie Ressourcen für unnötige Prozesse oder suboptimale Verarbeitung verschwenden. Sie sehen sich außerstande, die Cloud-Kosten zu kontrollieren und zu reduzieren. Weiterführende Materialien zu diesem Thema:
- Unternehmen kämpfen mit explodierenden Cloud-Kosten
- Bericht über den Zustand der Cloud - 2021
- Bericht zum Stand der Cloud - 2019
Behandeln Sie Ihre Dateningenieure gut
Mehrere Studien haben gezeigt, dass Data Engineers auf einen perfekten Sturm des Burnouts zusteuern. Die Hauptgründe sind:
- Unzumutbare Anfragen.
- Manuelle Prozesse: operative Ausführung.
- Das Finden und Beheben von Fehlern nimmt Zeit weg von der Entwicklung von Systemen.
- Schuldzuweisung: Wenn etwas schief geht, geben Sie Ihrem Dateningenieur die Schuld!
- Übermäßig restriktive Regierungsführung.
Das lokale Testen der Spark-Anwendungen hilft den Data Engineering-Teams bei den Punkten 2, 3 und 4. So wird ein Burnout verhindert. Weitere Informationen finden Sie unter 5 Gründe, warum Dateningenieure ausgebrannt sind und 3 Wege, wie Sie ein Burnout bei Dateningenieuren verhindern können.
Was Sie bei einer Spark-Anwendung testen sollten
Einheitstest
Die einfachste Bibliothek, die beim Testen von Spark-Anwendungen hilft, ist die spark-testing-base-Bibliothek von Holden Karau. Sie stellt eine kostenlose SparkSession bereit und reduziert damit Boilerplate und JVM-Overhead. Sie enthält Assertion auf der DataFrame-/DataSet-/RDD-Ebene, so dass Sie keine Aktion zur Validierung des Ergebnisses auslösen müssen.
Integrationstest
Einige der ersten Lösungen waren die Bibliothek spark-integration-tests oder die bereits erwähnte spark-testing-base, die zur Simulation der Integration verwendet werden können. Dennoch werden die meisten Integrationstests durchgeführt, indem Ihr Spark-Job auf einem Cluster gestartet wird. Dieser Cluster war früher eine Sandbox oder eine Vorproduktionsumgebung vor Ort. Heutzutage ist der Cluster eine Cloud-Instanz: eine Sandbox, ein Notebook oder eine andere Cloud-Umgebung.
Eine weitere Möglichkeit, die Integration unserer Spark-Anwendungen zu testen, basiert auf Containern. Containerisierte Integrationstests für Spark-Jobs ermöglichen lokale Integrationstests. Im Grunde genommen:
- Backen Sie das Glas des Spark-Jobs in ein Spark-Image.
- Injizieren Sie Testdaten und Auftragskonfigurationsdateien in den Container, indem Sie ein Volume mounten.
- Verwenden Sie compose, um den Spark-Auftrag und seine Abhängigkeiten zu orchestrieren.
- Führen Sie komponieren aus.
- Führen Sie Assertions für den Inhalt des Zielspeichers aus, nachdem der Auftrag ausgeführt wurde.
Leistung
Spark-Tuning ist eine Meisterleistung für sich. Die meiste Arbeit wird von den Senior-Ingenieuren in Handarbeit geleistet, die versuchen, DAGs zu optimieren, Daten zu verzerren, Fehler zu zählen usw. Werfen Sie einen Blick auf Holden Karaus Spark Autotuning Präsentation von der Strata New York 2018.
Job-Validierung
Selbst wenn der Spark-Auftrag scheinbar einwandfrei funktioniert, kann er in der Produktion echte Probleme verursachen. Holden Karau hat einige Arbeiten zur Validierung von Spark-Jobs durchgeführt. Hier können Sie sich ihren Vortrag Validating Big Data Jobs von Big Data Spain 2018 und die Folien vom SparkAISummit SF 2019 ansehen.
Testen des Kleinen - Einheitstest
Ted Malaska hielt einen Vortrag über das Testen des Kleinen. Für ihn bedeutet das Testen des Kleinen:
"Das Endziel ist [...], die Entwicklungsgeschwindigkeit zu erhöhen, die Stabilität und die Produktionsqualität zu steigern."
Nehmen wir ein paar Daten, die umgewandelt werden müssen.
firstName,surName,department,manager,enrollmentDate
harry,johnson,Back End, ,05012021
sylvia,roth,Data Science, ,08012017
Greg,Thomas,Sales,Mr. Ceo,02282001
Pamela,Griffin,Management, ,12062010
Anforderungen
- Vor- und Nachname sollten groß geschrieben werden.
- Manager sollten den Mitarbeitern auf der Grundlage ihrer Abteilung zugewiesen werden:
- Wenn der Mitarbeiter im Back End arbeitet, muss sein Manager Pierre Graz sein.
- Wenn der Mitarbeiter in der Abteilung Data Science arbeitet, muss sein Manager Luisa Garcia sein.
- Wenn der Mitarbeiter für eine andere Abteilung arbeitet, muss sein Vorgesetzter der Herr CEO sein.
- Das Anmeldedatum muss von 'MMddyyyy' in das ISO 8601-Format 'yyyy-MM-dd' umgewandelt werden.
Dies wäre der erwartete Ausgang:
firstName,surName,department,manager,enrollmentDate
Harry,Johnson,Back End,Pierre Graz,2021-05-01
Sylvia,Roth,Data Science,Luisa Garcia,2017-08-01
Greg,Thomas,Sales,Mr. CEO,2001-02-28
Pamela,Griffin,Management,Mr. CEO,2010-12-06
Lassen Sie uns an dieser Stelle das Szenario modellieren, indem wir eine einfache Fallklasse für den Mitarbeiter erstellen.
case class Employee (
firstName: String,
surName: String,
department: String,
manager: String,
enrollmentDate: String
)
Eine Möglichkeit, die Transformationen zu implementieren, wäre, jede von ihnen als Funktion vom Typ DataFrame => DataFrame zu schreiben. Bei einigen Spalten wird davon ausgegangen, dass sie bestimmte Werte haben. Es werden also nicht alle Fälle abgedeckt. Zum Beispiel muss enrollmentDate in den Quelldaten entweder ein gültiger String oder ein leerer String sein.
object Transformations {
lazy val capitalize: String => Column = columnName => initcap(lower(col(columnName)))
lazy val assignManager: DataFrame => DataFrame = df =>
df.withColumn("manager",
when(col("department") === lit("Back End"), lit("Pierre Graz"))
.otherwise(
when(col("department") === lit("Data Science"), lit("Luisa Garcia"))
.otherwise(
lit("Mr. CEO")
)
)
)
lazy val to_ISO8601_Date: DataFrame => DataFrame = df =>
df.withColumn("enrollmentDate",
when(col("enrollmentDate") === lit(""), lit("")
).otherwise(to_date(col("enrollmentDate"), "MMddyyyy").cast(StringType)))
lazy val capitalizeNames: DataFrame => DataFrame = df =>
df.withColumn("firstName", capitalize("firstName"))
.withColumn("surName", capitalize("surName"))
}
Die frühen Phasen der Spark-Tests
Spark unterstützt direkte Tests mit einigen Umgehungen. Diese Umgehungen fügen unserem Code einige Kesselflicken hinzu. Der folgende Code (oder etwas Ähnliches) muss zu allen Testklassen hinzugefügt werden.
class TestingSparkWithScalaTest extends FunSuite with BeforeAndAfterAll {
var sparkSession: SparkSession = _
override protected def beforeAll(): Unit =
sparkSession = SparkSession.builder()
.appName("testing spark")
.master("local[1]")
.getOrCreate()
override protected def afterAll(): Unit = sparkSession.stop()
Dann werden die Transformationen getestet. Aber zuerst müssen wir einige Dummy-Daten definieren.
lazy val entryData: Seq[Employee] = Vector(
Employee("Ryan", "Wilson", "Back End", "", "01312019"),
Employee("lexy", "Smith", "Data Science", "", "02282018"),
Employee("marlow", "perez", "Sales", "", "05132020"),
Employee("Angela", "costa", "Management", "", "12062021")
)
lazy val expectedExit: Seq[Employee] = Vector(
Employee("Ryan", "Wilson", "Back End", "Pierre Graz", "2019-01-31"),
Employee("Lexy", "Smith", "Data Science", "Luisa Garcia", "2018-02-28"),
Employee("Marlow", "Perez", "Sales", "Mr. CEO", "2020-05-13"),
Employee("Angela", "Costa", "Management", "Mr. CEO", "2021-12-06")
)
Betrachten wir als Beispiel den Assign Manager Test.
test("Testing Assign Manager") {
val dataframe: DataFrame = sparkSession.createDataFrame(EmployeeSample.entryData)
val updatedManager: DataFrame = Transformations.assignManager(dataframe)
assert(updatedManager.where(col("manager") === lit("Mr. CEO")).count() === 2)
assert(updatedManager.where(col("manager") === lit("Pierre Graz")).count() === 1)
assert(updatedManager.where(col("manager") === lit("Luisa Garcia")).count() === 1)
}
Steps
- Besorgen Sie sich für jede Testklasse eine SparkSession.
- Erzeugen Sie einige Dummy-Daten: Das kann eine Sequenz, eine csv-Datei auf Ihrem Laptop oder etwas anderes sein.
- Führen Sie den Test aus und vergleichen Sie Eingabe und Ausgabe (außerhalb des DataFrame-Typs) nur für die Szenarien, die Sie sich ausgedacht haben.
Probleme
- Die Notwendigkeit, für jede Klasse eine SparkSession zu instanziieren, belastet Ihre JVM mit zusätzlicher Arbeit und Overhead.
- Die Generierung der Dummy-Daten ist eine manuelle Aufgabe, die zeitaufwändig und fehleranfällig ist.
- Die Tests werden nur für die gegebene Eingabe durchgeführt und nicht für alle möglichen Eingaben für den gegebenen Typ. In den meisten Fällen muss eine Aktion mit dem DataFrame durchgeführt werden (sammeln, nehmen, zählen ...).
Wie Sie Tests einfacher und sicherer machen
Die Verwendung vorhandener Bibliotheken, die gut unterstützt werden, ist in der Regel hilfreich. Beim Testen von Spark verwenden wir mehrere Bibliotheken zusammen. In dieser Einführung werden wir Ihnen zeigen, wie Sie Ihre Denkweise über das Testen ändern können. Für den Moment werden wir nur die spark-testing-base Bibliothek verwenden.
Kapitel 1: Spark-Testing-Basis und ScalaCheck
Vergleichen von DataFrames
Einer der größten Vorteile der Bibliothek ist die Möglichkeit, zwei gegebene DataFrames zu vergleichen. Im folgenden Test werden alle Transformationen (alle Anforderungen) auf den Eingabedatenrahmen angewendet und das Ergebnis mit den Erwartungen verglichen.
test("Testing Employee Logic") {
val entryDataset: DataFrame = spark.createDataFrame(EmployeeSample.entryData)
val transformedFDF: DataFrame = Employee.applyTransformations(entryDataset)
val exitDataset: DataFrame = spark.createDataFrame(EmployeeSample.expectedExit)
assertDataFrameDataEquals(exitDataset, transformedFDF)
}
Eine kurze Einführung in die Vorteile von ScalaCheck
Spark-testing-base enthält unter der Haube die ScalaCheck-Bibliothek. ScalaCheck ist für seine eigenschaftsbasierten Testfunktionen bekannt. Vorteile:
- Es generiert automatisch Daten für Sie auf der Grundlage des angegebenen Typs oder mit Hilfe eines benutzerdefinierten Generators.
- Führt 100 Tests für jede Eigenschaft durch.
- Sagt Ihnen, welcher Wert den Test zum Absturz gebracht hat (falls vorhanden).
Zum Beispiel:
test("Summing Integers") {
val nPlus1GreaterThanN: Prop = Prop.forAll((i: Int) => (i + 1) > i)
check(nPlus1GreaterThanN)
}
GeneratorDrivenPropertyCheckFailedException was thrown during property evaluation.
Falsified after 3 successful property evaluations.
Occurred when passed generated values ( arg0 = 2147483647 )
Dieser Wert ist genau der Int.MaxValue!
scala> Int.MaxValue
val res0: Int = 2147483647
scala> res0 + 1
val res1: Int = -2147483648
Wenn Sie das bedenken, können Sie den Schwerpunkt vom Vergleich von Eingabe und Ausgabe auf die Überprüfung der Eigenschaften unserer DataFrame => DataFrame Funktionen verlagern.
Beliebige DataFrames mit benutzerdefinierten Feldern
Um dieses Tool zu unserem Vorteil nutzen zu können, müssen benutzerdefinierte Generatoren erstellt werden. Zum Beispiel:
- Vor- und Nachname können sich denselben Generator teilen
def anyNameGen: Gen[String] = for { length <- Gen.chooseNum(5,10) charList <- Gen.listOfN(length, Gen.alphaChar) } yield charList.mkString - Abteilung
def departmentGen: Gen[String] = Gen.oneOf( Vector( "Back End", "Data Science", "Sales", "Management" ) )
Testen von Funktionseigenschaften, nicht von ganzen DataFrames
Oft sind nur einige wenige Spalten erforderlich, um die Transformation durchzuführen. Die übrigen Spalten des DataFrame spielen keine Rolle. DataFrames können sehr groß werden; bis zu Hunderten von Spalten. In solchen Fällen wird es sehr schwierig, den gesamten DataFrame auf Ihrem lokalen Rechner zu testen. Anstatt den gesamten DataFrame zu testen, ist es daher besser, sich auf die Spalten zu konzentrieren, die wirklich an der Funktion beteiligt sind, die wir überprüfen wollen.
Erinnern wir uns an die Logik, um dem Mitarbeiter auf der Grundlage der Abteilung einen Manager zuzuweisen:
- Wenn der Mitarbeiter im Back End arbeitet, muss sein Manager Pierre Graz sein.
- Wenn der Mitarbeiter in der Abteilung Data Science arbeitet, muss sein Manager Luisa Garcia sein.
- Wenn der Mitarbeiter für eine andere Abteilung arbeitet, muss sein Vorgesetzter der Herr CEO sein.
Was sind die Eigenschaften dieser Funktion? Eine ganz direkte ist, dass die Anzahl der Mitarbeiter, die unter Back End arbeiten, gleich der Anzahl der Mitarbeiter sein muss, die Pierre Graz als Manager haben. Dies gilt auch für die Anzahl der Mitarbeiter in Data Science und die Anzahl der Mitarbeiter in allen anderen Abteilungen, die nicht zu den Prioritäten gehören.
test("Testing assigning manager") {
val assigningManager: Prop = Prop.forAll(employeeDfGenerator) { entryDataFrame =>
Transformations.assignManager(entryDataFrame).where(
col("manager").isin("Pierre Graz", "Luisa Garcia")
).count() === entryDataFrame.where(
col("department").isin("Back End", "Data Science")
).count()
}
check(assigningManager)
}
Bei der Funktion, die sowohl den Vor- als auch den Nachnamen groß schreibt, ist die Eigenschaft direkt. Lassen Sie uns prüfen, ob sie einem regulären Ausdruck folgen.
test("Testing capitalized names") {
val capitalizedNames: Prop = Prop.forAll(employeeDfGenerator) { entryDataFrame =>
Transformations.capitalizeNames(entryDataFrame).where(
col("firstName").rlike("[A-Z]{1}[a-z]*") && col("surName").rlike("[A-Z]{1}[a-z]*")
).count() == entryDataFrame.count()
}
check(capitalizedNames)
}
Mit diesem Ansatz könnte die gesamte Pipeline der Transformation für eine bestimmte Tabelle auf einmal überprüft werden. Für den speziellen Fall von Employee ist dies eine der vielen Möglichkeiten, dies zu tun:
test("Testing Employee Logic") {
val employeeLogic: Prop = Prop.forAll(employeeDfGenerator) { entryDataFrame =>
Employee.applyTransformations(entryDataFrame).where(
col("manager").isin("Pierre Graz", "Luisa Garcia") && col("enrollmentDate").rlike("[0-9]{4}-[0-9]{2}-[0-9]{2}") && col("firstName").rlike("[A-Z]{1}[a-z]*") && col("surName").rlike("[A-Z]{1}[a-z]*")
).count() === entryDataFrame.where(
col("department").isin("Back End", "Data Science")
).count()
}
check(employeeLogic)
}
Fazit
Für Dateningenieure
Sobald Sie mit dem Testen Ihrer Spark-Anwendungen beginnen, werden Sie einige Vorteile feststellen:
- Machen Sie sich nicht so viele Gedanken darüber, wie Sie alle Randfälle finden können.
- Verbringen Sie keine Zeit mehr damit, Dummy-Daten zu generieren, Daten aus der Produktion zu sammeln oder die App in einer Instanz zu testen, um zu sehen, ob sie funktioniert.
- Reduzieren Sie den Zeitaufwand für die Fehlersuche und vermeiden Sie die Ausführung der Anwendung auf dem Cluster so weit wie möglich.
- Die Aufteilung von gigantischen Singletons in etwas, das der Geschäftsdomäne besser entspricht, wird sich von selbst ergeben.
- Furchtloses Refactoring, wenn nötig.
Für Manager
Wenn Sie Ihrem Team genügend Zeit zum Testen des Codes geben, können Sie das verhindern:
- Unerwartet steigende Cloud-Kosten.
- Burnout bei Dateningenieuren.
Bleiben Sie dran für Kapitel 2!
Code verfügbar bei 47 Degrees Open Source.
Unsere Ideen
Weitere Blogs

Welche intrinsische Motivation Ihre Kollegen antreibt mit Moving Motivators
Welche intrinsischen Motivationsfaktoren gibt es bei Ihren Mitarbeitern? Die Mitarbeiter sind der wichtigste Teil eines Unternehmens, und Manager...
Irene de Kok

Optimierung von AWS Step Functions: Einblicke vom Amsterdam Summit
Gestern nahm ich am AWS Summit 2025 in Amsterdam teil, wo ich an einer Sitzung über AWS Step Functions teilnahm, die von Adriaan de Jonge, einem...
Simon Karman
Contact

