Blog
Wie man eine Spark-Anwendung in Scala einrichtet und strukturiert

In diesem Artikel werden folgende Themen besprochen:
- SBT-Einrichtung
- Implementierung eines Spark-Traits, der Befehlszeilenargumente richtig liest und eine Spark-Sitzung erstellt
- Testen von Spark-Aufträgen und -Funktionen
- CI/CD
Und warum?
Ich stelle immer wieder fest, dass Unternehmen und Mitarbeiter Schwierigkeiten haben, eine gute Spark-Anwendungsstruktur zu finden. Der Code ist voller input -> transform -> output erledigt.
In diesem Beitrag werden eine Struktur und einige bewährte Praktiken vorgeschlagen, die versuchen, diese Probleme zu lösen.
Erste Schritte
Um loszulegen und einen Blick auf die Projektstruktur zu werfen, klonen Sie das Repository auf github
SBT-Einrichtung
Die ersten Zeilen in build.sbt sind die wichtigsten Zeilen für das Bootstrapping Ihrer Anwendung. Sie enthalten den Projektnamen und die Spark-Abhängigkeiten.
Diese Zeilen definieren die name, version und organization Ihres Projekts und werden benötigt, um einen erfolgreichen Build in einen Binärspeicher hochzuladen, dazu später mehr. Wir verwenden scalaVersion 2.11.11 da dies die Version ist spark kompiliert wird, und zum Zeitpunkt der Erstellung dieses Artikels ist die letzte verfügbare Spark-Version 2.2.0.
Die Abhängigkeiten sorgen dafür, dass spark ist auf dem Klassenpfad für die Kompilierung verfügbar, aber der Geltungsbereich ist Provided da wir davon ausgehen, dass dort, wo wir unsere Anwendung einsetzen, bereits ein Spark-Cluster läuft.
Scopt ist eine sehr nützliche Bibliothek, die beim Lesen von Argumenten aus der Kommandozeile hilft. Wir werden dieses Paket in den Standardumfang Compile aufnehmen. Zuletzt, aber sehr wichtig, wird scalatest hinzugezogen, um das Schreiben geeigneter Unit-Tests und Integrationstests zu unterstützen. Da diese Bibliothek nur für Tests benötigt wird, ist der Anwendungsbereich auf it erkannt wird. Dies geschieht durch die 2 Zeilen:
Standardeinstellungen.itSettings faule val Wurzel = Projekt.in(Datei(".")).Konfigs(IntegrationTest)
Dies ermöglicht den Befehl
$ sbt it:test
um alle Integrationstests im Ordner src/it/
Die Linien
Test / testOptions += Tests.Argument("-oD") IntegrationTest / testOptions += Tests.Argument("-oD")
sind hilfreiche Hilfslinien, die die Zeitmessung für jeden Test ermöglichen, so dass sbt diese im Testbericht ausgeben kann.
Montage
Da wir die Anwendung auf einem Cluster bereitstellen, müssen alle Abhängigkeiten, die nicht auf dem Klassenpfad des Clusters verfügbar sind, in das Jar gepackt werden. Hierfür wird das sbt-assembly Plugin verwendet. Die Datei project/assembly.sbt besteht aus einer einzigen Zeile:
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")
was den Bash-Befehl aktiviert.
$ sbt Montage
Dieser Befehl führt alle Ihre Tests aus und packt alle Compile scope-Bibliotheken in das jar, wodurch ein FAT jar erstellt wird.
Weitere Informationen finden Sie auf der github-Seite für sbt-assembly
Spark-Jobs
Der Hauptfunken trait ist src/main/scala/thw/vancann/SparkJob.scala. Er erfüllt im Wesentlichen 2 Aufgaben:
- Liest alle optionalen und erforderlichen Kommandozeilenargumente ein und analysiert sie in eine
case class - Starten Sie ein
SparkSession, initialisieren Sie einStorageObjekt und rufen Sie die Funktionrunauf.
Das Einzige, was aktuelle Jobs tun müssen, ist die Implementierung der Funktionen appName und run.
Scopt
src/main/scala/thw/vancann/UsageConfig.scala ist die Datei, die die Klasse UsageConfig case und den Parser enthält, der die Befehlszeilenargumente analysiert. Weitere Informationen über scopt finden Sie auf der Github-Seite.
Lagerung
Die src/main/scala/thw/vancann/storage/Storage.scala sollte alle E/A Ihrer Anwendung definieren. Es sollte keine versteckten Lese- und Schreibvorgänge geben, alles sollte über eine Stelle laufen, so dass eine Konsolidierung erreicht werden kann. Beachten Sie, dass wir bei dieser Struktur immer von der Verwendung von Datasetausgehen, die als richtige Fallklassen typisiert sind, wie es auch sein sollte.
Storage als Trait definiert ist, ist es trivial, eine andere Speicherimplementierung für verschiedene Anwendungsfälle oder beim Wechsel des Cloud-Anbieters einzuhängen, wodurch der Code etwas modularer wird.
Beispiel Job
Ein aktuelles Beispiel für einen Sparkjob finden Sie unter src/main/scala/thw/vancann/WordCount.scala. Beachten Sie, dass dieser Job die E/A von der eigentlichen Logik trennt.
Die Funktion run liest alle Quellen ein, die für diesen Auftrag benötigt werden. Diese Funktion sollte außer I/O KEINE LOGIK haben. Die Magie geschieht in der Funktion transform, die von Spark Transformers inspiriert ist. Sie nimmt ein oder mehrere Datasets sowie Hilfsargumente entgegen und sollte die Daten nach Bedarf transformieren. Die Rückgabe der resultierenden Dataset.
Alles, was gleich und niedriger als die Transformationsfunktion ist, sollte KEINE E/A durchführen und generell keine Seiteneffekte haben. Dies macht es trivial, alle Funktionen zu testen, ohne sich auf externe Datenquellen oder Stubbing-Datenbanken verlassen zu müssen. Ja, das erfordert Disziplin, aber es wird sich auszahlen!
Jeder Spark-Auftrag sollte nur an ein Ziel schreiben und in der Regel nur eine Sache tun, d.h. Daten umwandeln und/oder kombinieren, Daten von einer API abrufen, ein Modell erstellen (mit SparkML oder H2o), ein Modell anwenden oder Daten einlesen. Es ist in Ordnung - und in der Regel auch notwendig -, aus mehreren Quellen zu lesen.
Es ist besser, mehrere Aufträge zu haben, die kleinere Dinge erledigen, als einen großen Auftrag, der viele Dinge erledigt! Kleinere Aufträge sind einfacher zu testen, einfacher zu ändern und sicherlich einfacher zu debuggen.
Testen Sie
Zum Testen muss normalerweise für jeden Test eine Spark-Sitzung zur Verfügung stehen. Um dieselbe Sitzung wiederzuverwenden und nicht für jede Testklasse eine neue Sitzung zu starten, wird ein SharedSparkSession in src/test/scala/thw/vancann/SharedSparkSession.scala bereitgestellt. Wenn Sie dieses Singleton in jeden Test importieren, wird dieselbe Sitzung in allen Tests und Suiten wiederverwendet.
Ein Beispiel für einen solchen Test finden Sie unter src/test/scala/thw/vancann/WordCountTest.scala unter Verwendung von FlatSpec von scalatest als Teststyle bzw. Testbibliothek.
Alle E/A-Tests befinden sich im Allgemeinen im Ordner src/it, um die Unit-Tests von den Integrationstests zu trennen. Ein Integrationstest, der eine Datei liest, wird in src/it/scala/thw/vancann/WordCountTest.scala bereitgestellt. Beachten Sie noch einmal die Nützlichkeit der Storage Eigenschaft, da wir einfach eine LocalStorage implementieren können, um aus dem Ordner resources zu lesen, so dass kein Mocking / Stubbing erforderlich ist.
Kontinuierliche Integration (CI) und kontinuierliche Bereitstellung (CD)
Um sicherzustellen, dass jeder Push zu einem Zweig keinen Code oder Styleguides bricht, ist Continuous Ingegration (CI) eine gute Möglichkeit, jeden Übeltäter zu erwischen und die von Ihrem Team getroffenen Vereinbarungen einzuhalten.
Es gibt eine Möglichkeit, CI in Gitlab auszuführen, die es Ihnen erlaubt, CI und CD kostenlos auszuführen, solange Sie Ihren eigenen Runner konfigurieren, was ziemlich einfach und sehr gut dokumentiert ist unter https://docs.gitlab.com/ee/ci/runners/README.html.
Die Datei .gitlab-ci.yml beschreibt, wie Sie drei Phasen in einem unberührten Docker-Container ausführen.
(1) Tests und Codeabdeckung
Führen Sie sowohl die Unit-Tests als auch die Integrationstests aus und messen Sie die Codeabdeckung mit scoverage, das als Plugin in project/plugins.sbt hinzugefügt wurde.
coverageExcludedPackages := ";.*Storage.*" DeckungMinimum := 70 coverageFailOnMinimum := true
in build.sbt definieren einige Einstellungen für die Überdeckung. Diese speziellen Einstellungen legen einen Mindestabdeckungsgrad 70 fest und lassen den Build fehlschlagen, wenn coveragereport feststellt, dass die Abdeckung unter diesen Wert fällt.
(2) Skalastil
Prüfen Sie auf vordefinierte Stilverletzungen, die in scalastyle-config.xml definiert sind. Weitere Informationen finden Sie unter scalastyle.
Die Zeilen
scalastyleFailOnWarning := false scalastyleFailOnError := true
in build.sbt definieren, ob und wann scalastyle die Erstellung fehlschlagen soll.
(3) Erzeugen Sie scaladocs und veröffentlichen Sie das Projekt
Für diesen letzten Schritt müssen Sie einige zusätzliche Parameter in build.sbt auskommentieren. Schauen Sie in der sbt-Dokumentation nach, was diese Zeilen bewirken.
Beachten Sie, dass das Starten jedes einzelnen Docker-Containers für jede Stufe ziemlich langsam ist, da alle sbt Abhängigkeiten herangezogen werden müssen. Eine etwas fortschrittlichere und deutlich schnellere Lösung ist das Vorpacken eines Docker-Images, das alle in build.sbt definierten Abhängigkeiten im Repository-Ordner des Docker-Images .ivy2 enthält.
Letzte Notizen
Da haben Sie es. Ein Projekt zum Booten Ihrer Spark-Anwendung! Sie können es nach Belieben ändern, hinzufügen oder entfernen, aber beachten Sie die vorgeschlagenen Best Practices:
- Gesonderte Bedenken
- Keine unerwünschten Nebenwirkungen
- Ein Spark-Auftrag sollte in der Regel nur eine Ausgabe haben, um den Auftrag auf eine einzige Aufgabe zu beschränken.
- Denken Sie modular!
Möchten Sie Ihr Wissen über Apache Spark erweitern?
Nehmen Sie an dem dreitägigen Schulungskurs Data Science with Spark teil. In diesem Kurs lernen Sie alles, was Sie wissen müssen, um mit Apache Spark mühelos Data Science in großem Maßstab durchzuführen!
Unsere Ideen
Weitere Blogs
Contact



