Blog
Integrationstests von Spark-Anwendungen

Sie haben gerade die Apache Spark-basierte Anwendung fertiggestellt.
Sie haben spark-submit so oft ausgeführt, dass Sie wissen, dass die Anwendung genau wie erwartet funktioniert: Sie lädt die Eingabedateien, verarbeitet die Daten entsprechend der Spezifikation und speichert die Ergebnisse schließlich in einem permanenten Speicher wie HDFS oder AWS S3. Die Anwendung wird an den Scheduler übergeben und funktioniert!
Am nächsten Tag erhalten Sie einen Fehlerbericht - einer der Fälle funktioniert nicht wie erwartet. "Ah, ich weiß, lassen Sie mich das schnell beheben", sagen Sie und wenden eine Korrektur am Code an. Sind Sie sicher, dass die Änderung nicht die bestehende Logik beschädigt hat?
Im nächsten Monat beschließt das Betriebsteam, das brandneue Spark 3.0 zu installieren. Man fragt Sie, ob die Anwendung mit der aktualisierten Version kompatibel ist. "Das hoffe ich doch", sagen Sie. Dann führen Sie die Anwendung ein paar Mal aus und sie sieht gut aus. Sind Sie sicher, dass Sie alle Fälle abgedeckt haben?
Nach ein paar Wochen teilt Ihnen das Data Operations Team mit, dass sich das Schema der Eingabedaten leicht ändern wird. Man fragt Sie, ob die Anwendung damit umgehen kann. "Ja, ich denke schon, aber ich muss das überprüfen!", sagen Sie und führen erneut einige manuelle Tests mit gerade erstellten Beispieldaten durch. Die Anwendung scheint die Änderung gut zu verarbeiten. Sind Sie sicher, dass es in der Produktion funktioniert?
Unit-Tests sind nicht genug?
Das Schreiben einer Apache Spark-Anwendung unterscheidet sich nicht von der Erstellung jeder anderen Anwendung. Ein verantwortungsbewusster Entwickler sollte nicht nur den funktionierenden Code bereitstellen, sondern auch eine Reihe von Unit-Tests, die auf automatisierte Weise beweisen, dass die Implementierung richtig war. Unit-Tests sollten die kleinstmöglichen Codeeinheiten abdecken, wie UDFs oder DataFrames/DataSets API-Operationen auf Eingabedaten. Es ist einfach, Unit-Tests für beides zu erstellen, denn Sie können den Eingabe-Dataframe mocken, die Funktion ausführen und schließlich die Ausgabe mit dem lokalen Spark-Kontext überprüfen.
Was beim Testen mit Unit-Tests ziemlich schwierig ist, ist die Erstellung von Testfällen, die Fragen wie diese beantworten:
- Wird mein Code funktionieren, wenn die Eingabedaten fehlerhaft sind?
- Erkennt mein
spark.read-Aufruf die Partitionen korrekt? - Lädt die App die Daten aus Pfaden, in denen die Daten erwartet werden?
- Was passiert, wenn die Eingabedaten noch nicht vorhanden sind?
- de-dupliziert die App die Daten mit der Ausgabe des vorherigen Laufs auf die richtige Weise?
- Sind die erstellten Hive-Partitionen nach Beendigung der Anwendung lesbar?
- leitet der Code das Schema von CSV/JSON-Dateien korrekt ab?
- Ist die Anwendung idempotent? (erzeugen wiederholte Durchläufe die gleiche Ausgabe wie ein einmaliger Durchlauf?)
Eine Lösung für die oben genannten Herausforderungen besteht darin, sich nicht auf die Unit-Tests zu konzentrieren, sondern auf die "externen" Tests der Anwendung selbst. Dabei wird die Anwendung in der simulierten Umgebung ausgeführt und getestet, ob die Ergebnisse den Erwartungen des jeweiligen Testfalls entsprechen:
- richtigen Exit-Code oder Fehlermeldung,
- Erstellung der Daten im erwarteten Format mit dem erwarteten Inhalt,
- die Aktualisierung der Metadaten (Tabellen und Partitionen).
In den nächsten Abschnitten dieses Artikels werde ich diese Tests als Integrationstests bezeichnen, ähnlich wie wir früher die Tests der Webdienste bezeichnet haben, die den Eingabeaufruf durch den Client simulieren und überprüfen, wie sich der Zustand innerhalb des Dienstes ändert und welches Ergebnis an den Benutzer zurückgegeben wird.
Unser erster Spark-Integrationstest
Lassen Sie uns mit einem einfachen Beispiel beginnen. Stellen Sie sich vor, Sie müssen eine einfache ETL schreiben:
- seine Aufgabe ist es, die eingehenden Daten durch einfache Verknüpfung in täglichen Läufen anzureichern
- das Hauptformat der Datenquelle in Parquet, es ist nach Tagen unterteilt und enthält werbebezogene Ereignisse wie Ad-Impression oder Ad-Click eines Nutzers
- das zweite Datensatzformat ist JSON und enthält Details zu den Anzeigen
- es ist erforderlich, Ereignisse, die nicht zusammengefügt werden können, separat zu speichern (wenn die Metadaten der Anzeige nicht verfügbar sind)
Der Apache Spark-Code, der diese Logik implementieren würde, sieht wie folgt aus:
from pyspark.sql import SparkSession
from argparse import ArgumentParser
# parse arguments
parser = ArgumentParser()
parser.add_argument('--input-events', help='Events, parquet format')
parser.add_argument('--input-ads', help='Ads, JSON format')
parser.add_argument('--output-joined', help='Output location of enriched data')
parser.add_argument('--output-invalid', help='Invalid data')
parser.add_argument('--dt', help='Date partition indicator')
args = parser.parse_args()
# load the data
spark = SparkSession.builder.getOrCreate()
all_events = spark.read.parquet(args.input_events)
events = all_events.where(all_events.dt == args.dt)
ads = spark.read.json(args.input_ads)
# save the reults
events.join(ads, events.ad_id == ads.id) \
.write.parquet(f'{args.output_joined}/dt={args.dt}')
events.join(ads, events.ad_id == ads.id, 'leftanti') \
.write.parquet(f'{args.output_invalid}/dt={args.dt}')
Da der Auftrag in der Produktionsumgebung ausgeführt werden soll, wird er normalerweise mit Oozie oder Airflow geplant, so dass der Parameter dt dynamisch ist. Außerdem befinden sich die Pfade (2 Eingänge und 2 Ausgänge) auf HDFS, S3 oder anderen Speichersystemen. Bei Integrationstests wollen wir nicht, dass der Testprozess von einem bestimmten Zeitplan oder externen Speicherorten abhängt. Bei den üblichen Integrationstests für Webdienste müssten wir eine Art verteilten Speicher simulieren, um sicherzustellen, dass alle abhängigen Schnittstellen verfügbar sind. Glücklicherweise verfügt die Dateisystem-API von Hadoop über eine einfache Implementierung , die das lokale Dateisystem für Lese-/Schreiboperationen verwendet, so dass wir hier keinen zusätzlichen Aufwand betreiben müssen. Außerdem werden wir uns im Testfall auf ein Beispieldatum beschränken.
Das übliche Testszenario besteht aus den 3 Abschnitten:
- gegeben - einen Abschnitt, in dem Sie Mocks/Stubs/Samples vorbereiten, um eine simulierte, kontrollierte Testumgebung zu schaffen
- wenn - einen Abschnitt, in dem Sie Ihre Funktion/Anwendung mit den gegebenen Daten tatsächlich aufrufen
- dann - letzter Abschnitt, der vergleicht, ob die Ergebnisse von when den Erwartungen entsprechen.
In den nächsten Kapiteln werden wir alle diese 3 Abschnitte für ein einfaches Testszenario implementieren.
"Given" - Spotting von Eingabedateien
Jedes Mal, wenn die Anwendung startet, erwartet sie zwei Eingabedatensätze:
- Ereignisse - im Parkettformat, täglich unterteilt
- Anzeigen - im JSON-Format, ohne Partitionierung
Es gibt 2 Möglichkeiten, sie dem Testfall zur Verfügung zu stellen - durch die Erstellung von Beispielen (und deren Speicherung im Repository) oder durch deren Generierung zur Laufzeit. Die erste Methode spart zwar etwas Zeit, aber es ist nicht die beste Praxis, binäre Parkettdateien im Repository zu speichern. Außerdem ist diese Methode nicht sehr flexibel, wenn es um die Weiterentwicklung des Schemas geht (ein Entwickler muss einen neuen Satz von Testdateien erstellen). Stattdessen erstellen wir sie im Testfalllauf selbst.
import unittest
import shutil
import os
import json
from datetime import datetime
from pyspark.sql import SparkSession
class TestIntegration(unittest.TestCase):
INPUT_EVENTS = "/tmp/input_events"
INPUT_ADS = "/tmp/input_ads"
OUTPUT_JOINED = "/tmp/output_joined"
OUTPUT_INVALID = "/tmp/output_invalid"
def test_enrichment(self):
# given
self.add_event(
ts=datetime(2020, 3, 31, 13, 15),
user_id='USER1',
ad_id='AD1')
self.add_ad(
id='AD1',
name='Sample ad'
)
### TODO
def add_event(self, ts, user_id, ad_id):
self.spark.createDataFrame(
[(ts, user_id, ad_id)],
['ts', 'user_id', 'ad_id']) \
.write.parquet(f'{self.INPUT_EVENTS}/dt={ts.date()}', mode='append')
def add_ad(self, id, name):
with open(f'{self.INPUT_ADS}/sample.json', 'a+') as f:
json.dump({'id': id, 'name': name}, f)
f.write('\n')
def setUp(self):
for path in [self.INPUT_EVENTS, self.INPUT_ADS,
self.OUTPUT_JOINED, self.OUTPUT_INVALID]:
shutil.rmtree(path, True)
os.makedirs(path)
@classmethod
def setUpClass(cls):
cls.spark = SparkSession.builder.enableHiveSupport().getOrCreate()
@classmethod
def tearDownClass(cls):
cls.spark.stop()
Wie Sie sehen können, startet unser Testfall eine globale SparkSession und stoppt sie, wenn die Tests abgeschlossen sind. Die Funktion
"Wann" - Ausführen der Spark-Anwendung
Wenn wir die Mock-Daten erstellt haben, ist es an der Zeit, die ETL zu starten. Bei Unit-Tests würden wir einfach eine Funktion einer anderen Klasse aufrufen, aber hier simulieren wir den spark-submitLauf:
import subprocess
class TestIntegration(unittest.TestCase):
def test_enrichment(self):
# given
self.add_event(
ts=datetime(2020, 3, 31, 13, 15),
user_id='USER1',
ad_id='AD1')
self.add_ad(
id='AD1',
name='Sample ad'
)
# when
exit_code = self.run_app("2020-03-31")
### TODO
def run_app(self, date):
return subprocess.run(
[
"spark-submit",
"--conf","spark.sql.shuffle.partitions=1",
"job.py",
"--dt", date,
"--input-events", self.INPUT_EVENTS,
"--input-ads", self.INPUT_ADS,
"--output-joined", self.OUTPUT_JOINED,
"--output-invalid", self.OUTPUT_INVALID
],
stderr=subprocess.DEVNULL,
).returncode
Wie Sie sehen können, wird die Komplexität der Ausführung von spark-submit durch die Hilfsfunktion abgedeckt. Die Pfade sind global statisch und der einzige dynamische Parameter ist das Eingabedatum (das auch innerhalb eines Testfalls statisch sein kann, da es von einer bestimmten Funktionabhängt). Der Aufruf von spark-submit verwendet die Standardparameter mit Ausnahme der Anzahl der Shuffle-Partitionen, die auf 1 begrenzt ist, da wir nicht vorhaben, große Mengen zu verbinden und dies die Ausführung der Tests erheblich beschleunigt.
"Dann" - Validierung der Ausgabedaten
Wenn die resultierenden Daten schließlich in das lokale Dateisystem geschrieben werden, können wir unseren Spark-Kontext wiederverwenden, um zu überprüfen, ob die Geschäftslogik der ETL wie erwartet funktioniert.
class TestIntegration(unittest.TestCase):
def test_enrichment(self):
# given
self.add_event(
ts=datetime(2020, 3, 31, 13, 15),
user_id='USER1',
ad_id='AD1')
self.add_ad(
id='AD1',
name='Sample ad'
)
# when
exit_code = self.run_app("2020-03-31")
# then -> ETL succeeded
self.assertEqual(exit_code, 0)
# then -> verify one joined file
joined = self.spark.read.parquet(self.OUTPUT_JOINED)
self.assertEqual(joined.where(joined.dt == "2020-03-31").count(), 1)
record = joined.where(joined.dt == "2020-03-31").first()
self.assertEquals(record.ts, datetime(2020, 3, 31, 13, 15))
self.assertEquals(record.user_id, 'USER1')
self.assertEquals(record.name, 'Sample ad')
# then -> verify no invalid rows
invalid = self.spark.read.parquet(self.OUTPUT_INVALID)
self.assertEqual(invalid.where(invalid.dt == "2020-03-31").count(), 0)
Es ist sinnvoll, mit einer einfachen Behauptung zu beginnen, dass der Exit-Code der Anwendung 0 ist, was bedeutet, dass Spark die gesamte Anwendung erfolgreich ausgeführt hat. Später validieren wir die Ausgabedatensätze, einen nach dem anderen: Wir überprüfen die Struktur eines generierten Datensatzes im verbundenen Datensatz und kontrollieren, dass keine ungültigen Datensätze gespeichert wurden.
Produktionstaugliche Integrationstests
Die obigen Anleitungen beschreiben ein Testszenario, aber die Struktur erlaubt es uns, so viele Testfälle wie nötig zu schreiben. Um sicherzustellen, dass sie in jeder Umgebung, z.B. auf den Laptops der Entwickler oder in Continuous Integration-Systemen, auf die gleiche Weise funktionieren, empfiehlt es sich, Docker-Images zu verwenden. Für meine Testfälle verwende ich in der Regel bde2020/spark-base-Images - sie sind einfach, klein und benötigen keine zusätzliche Konfiguration zum Starten.
Zusammenfassung
Wie Sie sehen, ist das Schreiben von produktionsgerechten Integrationstests für Spark-Anwendungen keine Zauberei. Es ist eine einfache, 3-stufige Arbeit:
- Erstellen Sie Eingabedaten,
- Führen Sie die Anwendung aus,
- Überprüfen Sie die Ausgaben.
Es wäre möglich, Test Driven Development zu verwenden, aber meiner Erfahrung nach ist das nicht der einfachste Weg, Spark ETLs zu entwickeln. Oft besteht die Arbeit eines Dateningenieurs hauptsächlich in der Datenexploration, und es wäre ziemlich schwierig, sich vorzustellen, wie die Eingabedaten aussehen und wie man sie lädt. Wenn Sie jedoch mindestens einen Testfall haben, können Sie Fehler auf TDD-Art beheben - erstellen Sie Daten, die eine fehlerhafte Ausführung verursachen (in einem bestimmten Teil), legen Sie fest, wie sich die Anwendung verhalten soll (in einem anderen Teil), und sorgen Sie dafür, dass der Test erfolgreich ist, indem Sie den Anwendungscode korrigieren. Schließlich haben Sie einen fehlerfreien Code, ohne dass Sie ihn mit echten Daten ausführen müssen. Und es besteht kein Regressionsrisiko, da der Testfall immer vor den nächsten Versionen ausgeführt wird.
Integrationstests sind auch ein guter Ort für Beispiele, wie die Skripte ausgeführt werden und was die Eingabeparameter bedeuten. Außerdem ermöglichen sie es, die Eingabedaten zu ändern (Abschnitt "given") und zu prüfen, wie sich die Anwendung verhält, wenn das neue Eingabeschema angewendet wird oder ein neues Datenformat erscheint.
Einer der größten Nachteile von Integrationstests ist die Ausführungszeit. Je nach Komplexität kann die Ausführung eines Testfalls sogar mehr als 10 Sekunden dauern. Daher ist es optimal, mehr als ein Szenario in den Testfall einzubauen. In unserem Beispiel könnte es ein Eingabedatensatz mit einem gültigen und einem ungültigen Ereignis sein, um beide Ausgaben zu testen.
Die richtigen Tests hängen nicht von den Details der Implementierung ab. Das bedeutet, dass Sie, wenn Sie sich entscheiden, Ihre Anwendung von Python auf Scala oder sogar von Spark auf Flink umzuschreiben, immer noch dieselben Integrationstests verwenden können, um nachzuweisen, dass die Änderungen die Anforderungen nicht verletzen.
Verfasst von
Mariusz Strzelecki
Unsere Ideen
Weitere Blogs
Contact



