Blog

Scala-Akteure für das Unternehmen: Einführung in das Akka-Framework

Arjan Blokzijl

Aktualisiert Oktober 23, 2025
10 Minuten

Letzte Woche hatten ich und einige meiner Kollegen das Vergnügen, an einer hervorragenden Schulung von Jonas Bonér teilzunehmen. Das Thema war sein neues Lieblingsprojekt: das Akka-Framework. Vielleicht haben Sie in letzter Zeit mit Scala herumgespielt und auch die ersten Schritte zur Verwendung der Actor-Bibliothek unternommen. Einfach ausgedrückt, ist ein Actor eine Ausführungseinheit, die (in der Regel asynchron) Nachrichten verarbeitet und deren Zustand kapselt. Ein Actor legt seinen Zustand nicht offen und die Nachrichten werden sequentiell verarbeitet. Das Actor-Modell gibt es schon seit geraumer Zeit, aber die bekannteste Actor-Implementierung ist heute Erlang. Das Actor-Modell wurde von Philipp Haller in die Standardbibliothek von Scala implementiert (eine solide Referenz für den interessierten Leser ist z.B. dieser Artikel, in dem erklärt wird, wie Actors in Scala funktionieren). In den meisten in Scala geschriebenen Actor-Beispielen findet man nicht selten nur EchoActors, PingPongActors und FibonacciSolvingActors. Nette Beispiele, aber Sie werden sich vielleicht fragen, ob sie in Enterprise Scala überhaupt von Nutzen sind. Wenn Sie sich darüber hinaus für nebenläufige, nachrichtenübermittelnde Verarbeitungsmodelle, STMs, NoSQL-Datenspeicher interessieren und sich gelegentlich fragen, was die Zukunft des Enterprise Computing bringen könnte, dann ist Akka vielleicht genau das richtige Framework für Sie. Dieser Blog soll eine kurze Einführung in eine Funktion dieses Frameworks geben: Akka's Supervisor Actors. Er basiert größtenteils auf dem Wissen, das Jonas während der Schulung vermittelt hat, und ich hoffe, dass ich Ihnen Appetit darauf machen kann.

Zunächst einmal wird Akka wahrscheinlich das unterstützen, was Ray Racine als 'System of Service' bezeichnet hat. Ich zitiere ihn hier wörtlich aus der Scala Mailingliste:

Es ist nicht zu weit hergeholt zu sagen, dass kommerzielle Anbieter und die meisten Frameworks sehr System-of-Record-orientiert sind. Sie müssen eine neue Anwendung erstellen, um X zu verwalten. Installieren Sie eine RDB, einen Anwendungsserver, wählen Sie ein O/R-Mapping- und GUI-Form-Framework aus, setzen Sie Warmduscher vor Drag-and-Drop-IDEs und legen Sie los. Ein System Of Service muss 1.000e von Anfragen pro Sekunde in Millisekunden bedienen, 24/7/365 mit 99,99 % Zuverlässigkeit, z.B. ein Preisservice. Die Geschäftslogik muss in Mikrosekunden ablaufen. Ihr bevorzugtes O/R-Mapping-Framework hat noch nicht einmal einen JDBC-Aufruf initiiert, es hat noch nicht einmal eine Verbindung aus dem Pool zugewiesen und hat bereits die ihm zugewiesene Zeit von 1 ms ausgeschöpft. Ein Artikel kann im System Of Record einige 10s Preisänderungen pro Jahr erfahren, aber der Preis dieses Artikels kann im System Of Service 100.000 Mal pro Änderung bedient werden.

Mit anderen Worten: Ein System of Service ist ein System, das für einen extremen Durchsatz und fast keine Ausfallzeiten ausgelegt ist. Das ist vielleicht nicht Ihre alltägliche CRUD-Anwendung, aber wenn Sie sie bauen müssen, ist es gut zu wissen, dass es ein Framework gibt, das sie unterstützt. Als Antwort auf den Beitrag von Ray Racine hat Jonas Bonér seine Vision für das Akka-Framework veröffentlicht. Diese Beiträge bieten eine Menge hochinteressanten Lesestoff und sind hier zu finden - www.nabble.com/Akka-Actor-Kernel---Re:--scala--Lift-and-Goat-Rodeo-td24124685.html. Akka ist also als eine Art Framework gedacht, das eine System-of-Service-Anwendung unterstützt. Es hat viele Funktionen, zu viele, um sie alle auf einmal zu behandeln, und deshalb werden wir uns zunächst die Supervisor-Fähigkeiten von Akka ansehen. Akka verfügt über ein Actor-Modell mit Supervisor-Bäumen, das auf den OTP-Designprinzipien von Erlang basiert. Supervisors sind Prozesse, die eine Reihe von Workern überwachen, die die eigentliche Verarbeitung vornehmen. Die Grundidee eines Supervisors ist, dass er seine Kindprozesse am Leben erhält, indem er sie bei Bedarf neu startet. Ein Worker ist ein Actor, der eine Nachricht empfängt und verarbeitet. Wenn ein Worker auf abnormale Weise stirbt (indem er eine Exception auslöst), muss der Supervisor entscheiden, was zu tun ist. Er kann beschließen, den Worker neu zu starten und zu versuchen, die Nachricht erneut zu verarbeiten, wobei eine maximale Anzahl von Wiederholungsversuchen festgelegt wird. Der Ansatz besteht darin, damit zu rechnen, dass es zu Fehlern kommen kann und auch kommen wird. Anstatt zu versuchen, dies zu verhindern, besteht die Idee darin, es abstürzen zu lassen (TM Jonas Bonér), den Worker in einen stabilen Zustand zu versetzen und ihn neu zu starten. Diese Idee hat Ericsson bei der Entwicklung hochgradig fehlertoleranter, verteilter Message-Passing-Software gute Dienste geleistet. Mit Akka können wir nun auch in Scala von dieser Idee profitieren. Um dies zu erreichen, verfügt Akka über eine eigene Version der Actors-Bibliothek, die über mehr "System of Service"-Funktionen verfügt als die Standard-Actors-Bibliothek in der Scala-Distribution. Zur Veranschaulichung eines einfachen Beispiels einer Supervisor-Klasse finden Sie hier einige Codebeispiele, die die Idee veranschaulichen. Zunächst beginnen wir mit einem einfachen Actor: [scala] import se.scalablesolutions.akka.config.ScalaConfig. import se.scalablesolutions.akka.actor.{OneForOneStrategy, Actor} sealed trait Message case class Supervise(worker: Worker) extends Message case class DoWork(work: Work) extends Message case object Die extends Message class Worker(workerName:String) extends Actor { lifeCycleConfig = Some(LifeCycle(Permanent, 100)) def receive: PartialFunction[Any, Unit] = { case DoWork(work:String) => log.debug("Arbeit beginnen... bei: " + Arbeit) case Reset => log.info("%s wurde zurückgesetzt", toString) case Die => log.debug("Ich sterbe...") throw new RuntimeException("Ich bin tot: " + this.toString) case other => log.error("Unbekanntes Ereignis: %s", other) } override def preRestart(reason: AnyRef, config: Option[AnyRef]) { log.debug("pre-restarting " + this) } override def postRestart(reason: AnyRef, config: Option[AnyRef]) { log.debug("post-restarting " + this) } override def toString = "[" + workerName + "]" [/scala] Was die Übermittlung von Nachrichten betrifft, so folgt ein Akka-Akteur der gleichen API wie die 'normalen' Scala-Akteure: Sie können eine Nachricht an einen Akteur senden, indem Sie die Methode ! (sprich: bang) verwenden. Dies ist eine 'fire and forget'-Nachricht, die dann asynchron vom Akteur verarbeitet wird. Es gibt noch einige weitere Möglichkeiten, Nachrichten zu senden, aber die lassen wir jetzt mal außen vor. In Akka müssen Actors explizit durch den Aufruf von actor.start gestartet werden, damit sie Nachrichten verarbeiten können. Sie können durch den Aufruf von actor.stop gestoppt werden. Im Vergleich zu den Standard-Scala-Actors ist die API für die Verarbeitung von Nachrichten eingeschränkter: Es gibt genau eine Methode, die Sie überschreiben müssen. Dies ist die Methode receive, die eine PartialFunction zurückgeben muss. Die Empfangsmethode verwendet eine Musterübereinstimmung, um zwischen den verschiedenen Nachrichten zu unterscheiden, die sie verarbeiten kann. Eine Musterübereinstimmung wird zu einer PartialFunction kompiliert, die der Rückgabewert der Musterübereinstimmung ist. Um Nachrichten an Akteure zu senden, haben wir eine Eigenschaft Message definiert, die von verschiedenen Fallklassen, die die eigentlichen Nachrichten darstellen, erweitert wird. Im Prinzip können Sie jede beliebige Nachricht an einen Actor senden, in der Praxis sollten Sie jedoch nur unveränderliche Nachrichten senden. In Scala sind Case-Klassen und Case-Objekte standardmäßig unveränderlich und eignen sich daher sehr gut für diesen Zweck. In diesem Fall haben wir eine 'DoWork'-Nachricht definiert (alle Nachrichten entstammen der gleichen Nachrichteneigenschaft), die den Worker-Akteur veranlassen soll, etwas zu tun, das hoffentlich von Nutzen ist. Wenn der Worker eine 'Die'-Nachricht erhält, wird er eine Ausnahme auslösen. Das ist natürlich ein etwas albernes Beispiel, aber ich hoffe, es veranschaulicht die Idee. So weit nichts Besonderes, außer vielleicht die pre- und postRestart-Methoden und das Feld lifeCycleConfig = Some(LifeCycle(Permanent, 100)). Dies ist die Konfiguration für die Hierarchiefähigkeit der Supervisor-Akteure von Akka. Um zu sehen, wie das funktioniert, lassen Sie uns einen Supervisor programmieren: [scala] import se.scalablesolutions.akka.actor.OneForOneStrategy object WorkerSupervisor extends Actor { trapExit = true faultHandler = Some(OneForOneStrategy(3, 100)) def receive: PartialFunction[Any, Unit] = { case DoSupervise(worker:Worker) => log.info("Supervising worker: " + worker) startLink(worker) case unknown => log.error("Unbekanntes Ereignis: %s", unbekannt) [/scala] } } Unser Supervisor ist ebenfalls ein Akka-Actor (in diesem Fall ein Objekt, d.h. ein Singleton in Scala), der 'DoSupervise(worker)'-Meldungen verarbeitet. Er verarbeitet diese Nachricht, indem er beginnt, den Worker zu überwachen, indem er die Methode 'startLink(worker)' aufruft, die in der Klasse Actor definiert ist. Dadurch wird unser Worker zur Liste der verknüpften Actoren hinzugefügt und gleichzeitig wird der verknüpfte Actor gestartet. Das Schöne daran ist natürlich, dass die Actor-Klasse die Thread-Sicherheit für uns übernimmt, so dass wir uns hier keine Gedanken über Multithreading-Probleme machen müssen. Die Verknüpfung hält genau das, was sie verspricht: Wenn ein verknüpfter Actor (in unserem Fall der Worker) eine Ausnahme auslöst, wird der überwachende Actor (unser WorkerSupervisor) darüber informiert. Dazu ist es notwendig, die Variable trapExit zu überschreiben und auf true zu setzen. Er kann dann entscheiden, was auf der Grundlage der Fault-Handler-Strategie Strategy zu tun ist, die durch die Definition der Variable faultHandler festgelegt wird. Derzeit gibt es zwei Strategien: OneForOne, was bedeutet, dass nur der abgestürzte Akteur neu gestartet wird, und AllForOne, was bedeutet, dass alle verbundenen Akteure (einschließlich des abgestürzten) neu gestartet werden. Hier kommt auch die LifeCycleConfig ins Spiel, die auf unserem Worker-Actor definiert ist. LifeCycle(Permanent, 100) bedeutet, dass unser Actor einen permanenten LifeCycle hat und im Falle einer Ausnahme immer neu gestartet wird. Die Methoden preRestart und postRestart sind Callback-Methoden, die vom überwachten Akteur überschrieben werden können, um einige Initialisierungsarbeiten durchzuführen, wenn er neu gestartet wird. Beachten Sie, dass es sich bei unseren überwachten Akteuren wahrscheinlich um eine Art von Anwendungsdiensten handelt und nicht um unseren nutzlosen Worker-Akteur. Daher müssen sie möglicherweise einige nützliche Initialisierungsarbeiten durchführen, wie z.B. die Einrichtung von Ressourcen, Verbindungen usw., um ordnungsgemäß zu funktionieren. Anstatt einen Supervisor manuell zu programmieren, wie wir es getan haben, gibt es eine Alternative, indem wir eine Reihe von verknüpften Diensten auf eine deklarativere Weise einrichten, indem wir die SupervisorFactory-Klasse von Akka wie folgt erweitern: [scala] import se.scalablesolutions.akka.actor.SupervisorFactory import se.scalablesolutions.akka.config.ScalaConfig.{LifeCycle, SupervisorConfig, RestartStrategy, Supervise, OneForOne, Permanent} class MySupervisorFactory extends SupervisorFactory { val worker = new Worker("worker-1") override protected def getSupervisorConfig: SupervisorConfig = { SupervisorConfig( RestartStrategy(OneForOne, 3, 10), Supervise( worker, LifeCycle(Permanent, 1000)) :: Nil) } } object factory extends MySupervisorFactory [/scala] Dies definiert eine Liste von überwachten Akteuren (in unserem Fall eine Liste mit einem Element, die unseren worker enthält) und verwendet dann den folgenden Code: [scala] val supervisor = factory.newSupervisor supervisor.startSupervisor [/scala] Damit werden alle Arbeiter mit einem (im Rahmen definierten) Supervisor verknüpft und auf der Grundlage der konfigurierten Strategie (neu) gestartet. Sie können also Ihre bevorzugte Art der Konfiguration wählen.Jetzt müssen wir nur noch sehen, ob es tatsächlich funktioniert. Dazu benötigen wir eine kleine Simulationsklasse, die wie folgt definiert ist: [scala] class Simulation { var worker:Worker = @Before def setUp = { worker = new Worker("worker-1") WorkerSupervisor ! DoSupervise(worker) } @After def tearDown = { WorkerSupervisor.stop } @Test def testSuperviseWorker = { Thread.sleep(500) println("n===> start working") worker ! DoWork("Some work") Thread.sleep(500) worker ! Stirb Thread.sleep(1000) worker ! DoWork("Etwas mehr Arbeit") println("n===> fertig") } } [/scala] Wenn Sie dies ausführen, erhalten Sie eine Ausgabe wie die folgende:

INF [20091020-19:28:25.242] akka: Überwachender Arbeiter: [worker-1]
DEB [20091020-19:28:25.242] akka: Verknüpfung von Akteur [[worker-1]] mit Akteur
[Actor[1256143591312:class akka.supervision.WorkerSupervisor$]]
===>  anfangen zu arbeiten
DEB [20091020-19:28:25.750] akka: Beginn der Arbeit... bei: Einige Arbeit
DEB [20091020-19:28:26.249] akka: Sterben...
java.lang.RuntimeException: Ich bin tot: [worker-1]
  at akka.supervision.Worker$$$anonfun$receive$1.apply(Worker.scala:48)
  at akka.supervision.Worker$$anonfun$receive$1.apply(Worker.scala:38)
  at scala.PartialFunction$$anon$1.apply(PartialFunction.scala:38)
  at se.scalablesolutions.akka.actor.Actor$class.transactionalDispatch(Actor.scala:496)
  at se.scalablesolutions.akka.actor.Actor$class.invoke(Actor.scala:461)
  at akka.supervision.Worker.invoke(Worker.scala:33)
  at se.scalablesolutions.akka.actor.ActorMessageInvoker.invoke(Actor.scala:39)
  at se.scalablesolutions.akka.reactor.EventBasedThreadPoolDispatcher$$anon$1$$$anon$2.
  run(EventBasedThreadPoolDispatcher.scala:99)
  at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
  at java.lang.Thread.run(Thread.java:619)
DEB [20091020-19:28:26.260] labs: pre-restarting [worker-1]
INF [20091020-19:28:26.262] labs: Neustart des Akteurs [Klasse akka.supervision.Worker]
  als PERMANENT konfiguriert.
DEB [20091020-19:28:26.265] labs: Nach dem Neustart von [worker-1]
===>  fertig
DEB [20091020-19:28:27.251] labs: anfangen zu arbeiten... bei: Etwas mehr Arbeit
Prozess mit Exit Code 0 beendet

Das zeigt in der Tat, dass unsere rudimentären Supervisor funktionieren. Damit ist die Einführung in die Supervisor-Akteure von Akka abgeschlossen. Es gibt noch viel mehr, was die Akteure von Akka können, als hier gezeigt wurde (Remoting, Transaktionen, STM, Cassandra und Mongo Backend, um nur einige zu nennen), aber das ist Stoff für zukünftige Blogs. Es ist vielleicht ein bisschen viel, alles auf einmal zu nehmen, da man eine API lernen muss, die den meisten von uns unbekannt sein dürfte. Ich will nicht leugnen, dass das Akka-Framework eine gewisse Lernkurve hat, aber die Belohnung ist groß, wenn Sie den Dreh erst einmal raus haben.Akka ist relativ jung und wird noch weiterentwickelt. Wenn Sie sich für die zukünftige Entwicklung interessieren, sehen Sie sich die Roadmap für die Version 0.6 an.

Verfasst von

Arjan Blokzijl

Contact

Let’s discuss how we can support your journey.