Vor einiger Zeit habe ich ein kleines Python-Programm geschrieben, das auf die von meinem Energieversorger bereitgestellte API zugreift. Das Programm sammelt Daten über meinen Stromverbrauch, die es mir ermöglichen, ein Diagramm zu erstellen. Das funktioniert, aber dann dachte ich, ich könnte dieses Problem (Daten aus einer API lesen, in einer Datenbank speichern und Abfragen ausführen) nutzen, um etwas Scala zu üben. Ich werde einem Team beitreten, das Scala als Hauptsprache für Dienste verwendet. Nachdem ich Scala seit etwa einem Jahrzehnt nicht mehr verwendet habe, dachte ich mir, dass eine Auffrischung angebracht wäre.
Ich habe ein Starterprojekt von Typesafe verwendet, um loszulegen. Der Vorteil, das Problem in einer anderen Sprache gelöst zu haben, ist, dass Sie nicht über Algorithmen und Lösungen nachdenken müssen. Alles, was Sie tun müssen, ist Python in Scala zu übersetzen. Ganz einfach.
Was ich am Ende hatte, war mein Python-Programm mit Scala-Syntax, obwohl ich natürlich viele Änderungen vornehmen musste.
Eines der Dinge, die mich gestört haben, war, dass die API meines Energieversorgers wirklich langsam ist, genau wie die App, die sie anbieten(Rant, um den gestrigen Stromverbrauch herauszufinden, muss man zwei Adds schließen, dieselben, die ich seit über zwei Jahren geschlossen habe, dann auf ein Diagramm klicken, dann erhalte ich ein Diagramm nach Jahr, was ich nie will, also kann ich ein Diagramm nach Tag auswählen, dann wählen Sie gestern. Seufz. EndOfRant). Mein Python-Programm verwendet die gleiche API wie die Website, ist also im Grunde auch sehr langsam. Wenn Sie alle Daten seit 2016 laden möchten, müssen Sie sich auf eine lange Wartezeit einstellen. "Warum sollten Sie das mehr als einmal tun?", werden Sie sich fragen. Nun, weil Sie das Docker-Volume, auf dem die Daten gespeichert sind, in einem fehlgeleiteten Versuch, Ihren Laptop zu entrümpeln, entfernt haben (seufz, schon wieder).
Um die Dinge zu beschleunigen, brauchen wir Parallelität. Das könnte man in Python machen, aber als ich mich das letzte Mal damit befasst habe, hat es Betriebssystem-Threads verwendet, so dass Sie nicht schneller sein können als die Anzahl der Threads auf Ihrem Prozessor. Das wäre für meinen Anwendungsfall genau richtig, aber ich war inzwischen über die praktische Seite hinaus. Außerdem ging es mir darum, neue Dinge zu lernen, so dass die Praktikabilität ohnehin nur eine untergeordnete Rolle spielen würde.
Ich habe mit einer wörtlichen Übersetzung meines Python-Codes begonnen. Der Algorithmus sieht in etwa wie folgt aus:
- eine Reihe von Variablen initialisieren
- auf die Api zugreifen, um neue Daten zu erhalten
- die Daten umwandeln und speichern, eine Datensatzzählung aktualisieren
- Prüfen Sie, ob es noch mehr gibt, wenn ja, gehen Sie zu Schritt 2
Hier ist Version 1:
def updateImperativeVersion(startDate: DateTime, endDate: DateTime): Future[UpdateResult] = {
logger.info("imperative update")
val token = pesReader.login()
var count = 0
var startOfInterval = startDate
val endOfPeriod = minDate(endDate, DateTime.now().minusDays(1))
while (shouldIContinue(startOfInterval, endOfPeriod)) {
val newRecordCount = updateDataForInterval(startOfInterval, endOfPeriod, token)
count = count + newRecordCount
startOfInterval = startOfInterval.plusDays(14)
}
logger.info(s"updated $count records")
val updateResult = UpdateResult("200", count)
Future {
updateResult
}
}
Beachten Sie die beiden vars:
var count = 0
var startOfInterval = startDate
Natürlich immer ein Geruch. Selbst wenn wir es wollten, könnte diese Lösung wegen der Variablen nicht parallelisiert werden.
Lassen Sie uns weitermachen. Das Problem kann zumindest etwas funktioneller gelöst werden, etwa so:
def update(startDate: DateTime, endDate: DateTime): Future[UpdateResult] = {
logger.info("sequential update")
val token = pesReader.login()
val endOfPeriod = minDate(endDate, DateTime.now().minusDays(1))
val days = Days.daysBetween(startDate, endDate).getDays
val numberOfDaysUpdated = (0 to days by 14)
.to(LazyList)
.map(i => updateDataForInterval(startDate.plusDays(i), endOfPeriod, token))
.sum
logger.info(s"updated $numberOfDaysUpdated records")
val updateResult = UpdateResult("200", numberOfDaysUpdated)
Future {
updateResult
}
}
Keine Vars mehr. Und ein schönes und modernes
reading data from 2018-01-01T00:00:00.000+01:00 found 360 records
reading data from 2018-01-15T00:00:00.000+01:00 found 360 records
reading data from 2018-01-29T00:00:00.000+01:00 found 96 records
Es ist wirklich einfach, diese Lösung zu parallelisieren. Sie müssen nur ein einfaches .par an der richtigen Stelle hinzufügen:
import scala.collection.parallel.CollectionConverters.ImmutableIterableIsParallelizable
// ...
def updatePar(startDate: DateTime, endDate: DateTime): Future[UpdateResult] = {
logger.info("par update")
val token = pesReader.login()
val endOfPeriod = minDate(endDate, DateTime.now().minusDays(1))
val days = Days.daysBetween(startDate, endDate).getDays
val numberOfDaysUpdated =
(0 to days by 14)
.to(LazyList)
.par // <------- Magic
.map(i => updateDataForInterval(startDate.plusDays(i), endOfPeriod, token))
.sum
logger.info(s"updated $numberOfDaysUpdated records")
val updateResult = UpdateResult("200", numberOfDaysUpdated)
Future {
updateResult
}
}
Beachten Sie, dass der Unterschied zwischen dieser Lösung und der vorherigen nur eine Zeile mit .par und einer Import-Anweisung ist. Raffiniert.
Dieser Code startet Threads und die Anzahl sollte mit ForkJoinPool konfigurierbar sein, aber das übersteigt meine Fähigkeiten und ich hatte keine Lust, viel Zeit darauf zu verwenden, herauszufinden, wie das funktioniert. Denn es gibt eine noch raffiniertere Lösung.
Da ich mich für den Typesafe-Stack entschieden hatte, verwendete ich bereits Akka, also dachte ich, ich könnte auch Actors verwenden. Der folgende Code wurde durch das Streams-Kochbuch in den Akka-Dokumenten inspiriert.
def updateActors(startDate: DateTime, endDate: DateTime): Future[UpdateResult] = {
type Result = Int
logger.info("actor update")
val token = pesReader.login()
val endOfPeriod = minDate(endDate, DateTime.now().minusDays(1))
val days: Int = Days.daysBetween(startDate, endDate).getDays
val data = Source(0 to days by 14)
val worker = Flow[Int].map(i => {
updateDataForInterval(startDate.plusDays(i), endOfPeriod, token)
})
def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): Flow[In, Out, NotUsed] = {
import GraphDSL.Implicits._
Flow.fromGraph(GraphDSL.create() { implicit b =>
val balancer = b.add(Balance[In](workerCount, waitForAllDownstreams = true))
val merge = b.add(Merge[Out](workerCount))
for (_ <- 1 to workerCount) {
balancer ~> worker.async ~> merge
}
FlowShape(balancer.in, merge.out)
})
}
val processedJobs = data.via(balancer(worker, 3))
val updateCounts = Await.result(processedJobs.runWith(Sink.seq), scala.concurrent.duration.Duration(50, TimeUnit.SECONDS))
val updateResult = UpdateResult("200", updateCounts.sum)
Future {
updateResult
}
}
Es gibt noch eine ganze Menge Code, der größtenteils die Einrichtung von balancer beinhaltet, das die Worker konfiguriert. Es könnte sinnvoll sein, diesen Code an eine andere Stelle zu verschieben, damit er wiederverwendet werden kann und die Logik nicht zu sehr durcheinanderbringt.
Die wichtigsten Teile sind:
val data = Source(0 to days by 14)
val worker = Flow[Int].map(i => {
updateDataForInterval(startDate.plusDays(i), endOfPeriod, token)
})
Wir haben also eine Source von Arbeit, in diesem Fall ganze Zahlen, die über den Balancer an workerweitergegeben werden:
// v-- distribute work
val processedJobs = data.via(balancer(worker, 3)) // <-- with 3 workers
// ^-- source ^-- do the job
Der Code Awaitliefert das Ergebnis, das eine Seq der Aktualisierungszählungen ist. Da jeder Worker eine Zählung zurückgibt, erhalten wir eine Liste von Zählungen. Um die Gesamtzahl der aktualisierten Datensätze zu ermitteln, benötigen wir also
Die Ausgabe dieser neuesten Version sieht dann so aus:
[PESActorSystem-akka.actor.default-dispatcher-10] actor update
[PESActorSystem-akka.actor.default-dispatcher-16] reading data from 2019-01-01T00:00:00.000+01:00
[PESActorSystem-akka.actor.default-dispatcher-13] reading data from 2019-01-15T00:00:00.000+01:00
[PESActorSystem-akka.actor.default-dispatcher-17] reading data from 2019-01-29T00:00:00.000+01:00
[PESActorSystem-akka.actor.default-dispatcher-17] found 360 records
[PESActorSystem-akka.actor.default-dispatcher-13] found 360 records
[PESActorSystem-akka.actor.default-dispatcher-16] found 360 records
[PESActorSystem-akka.actor.default-dispatcher-13] reading data from 2019-02-26T00:00:00.000+01:00
[PESActorSystem-akka.actor.default-dispatcher-16] reading data from 2019-02-12T00:00:00.000+01:00
[PESActorSystem-akka.actor.default-dispatcher-13] found 96 records
[PESActorSystem-akka.actor.default-dispatcher-16] found 360 records
Beachten Sie, dass es drei Disponenten gibt, die Chargen bearbeiten: 13, 16 und 17 in diesem Fall. Das ist zu erwarten, da ich den Balancer so konfiguriert habe:
val processedJobs: Source[Result, NotUsed] = data.via(balancer(worker, 3))
also 3 Arbeiter.
Ich bin mit der endgültigen Lösung zufrieden, aber für den täglichen Gebrauch ist die Python-Version viel einfacher auf der Hardware, auf der sie laufen kann. Python für die reale Welt und Scala , warum nicht.
Referenzen
Hervorragende Einführung zum Thema Streams von Aniefiok Akpan
Scala Kochbuch-Code, der einen Pool von Arbeitern erklärt
Quellcode, siehe scala/src/main/scala/nl/vermeir/scala/controller/PESController.scala
Verfasst von

Jan Vermeir
Developing software and infrastructure in teams, doing whatever it takes to get stable, safe and efficient systems in production.
Unsere Ideen
Weitere Blogs
Contact




