Last week me and some of my colleagues had the pleasure of being on the receiving end of an excellent training given by Jonas Bonér. The topic was his new pet project: the Akka framework. Perhaps you’ve played around with Scala lately, and also have taken the first steps in using its Actor library. Simply stated, an Actor is a unit of execution that (usually asynchronously) processes messages and encapsulate their state. An actor does not expose its state, and messages are processed sequentially. The Actor model has been around for quite some time, but today the best-known Actor implementation is Erlang.
The Actor model has been implemented in the standard Scala library by Philipp Haller (for the interested reader, a solid reference is for instance this article explaining how actors in Scala work). In most Actor examples written in Scala, it is not uncommon to find only EchoActors, PingPongActors, and FibonacciSolvingActors. Nice examples, but perhaps you might wonder if they are of any use in enterprise Scala at all. Next to this, if you’re interested in concurrent, message passing processing models, STM’s, NoSQL data stores, and occasionally wonder what the future in enterprise computing might bring, than Akka might be just the framework you’re looking for. This blog is intended to provide a brief introduction into one feature of this framework: Akka’s supervisor Actors. It is mostly based on the knowledge extracted from Jonas during the training, and I hope to whet your appetite for it.
First of all, Akka is likely to support what Ray Racine has coined to be a ‘System of Service’. I’m quoting him here literally from the scala mailing list:
Its not too far of stretch to say commercial vendors, and most frameworks are very System Of Record oriented. Need to build a new app to maintain X.
Install RDB, app server, select O/R mapping and GUI form framework, place warm bodies in front of drag and drop IDEs and go for it.
A System Of Service must service 1,000s of requests per second in millisecs, 24/7/365 with 99.99 % reliability e.g. a pricing service. The business
logic has to run in microseconds. Your favorite O/R mapping framework hasn’t even initiated a JDBC call, heck hasn’t even allocated a connection from the pool and its already exhausted its 1 ms in allotted time.
An item may undergo a few 10s of price changes a year on the System Of Record, yet that item’s price may be served 100,000 times for each change on the System Of Service’.
In other words a System of Service is a system designed to have extreme throughput, and almost zero downtime. It may not be your everyday CRUD application, but if you need to build them, it is nice to know of some framework that supports it. In response to Ray Racine’s post Jonas Bonér posted his vision for the Akka framework. These posts provide plenty of highly interesting reading material, and can be found here – www.nabble.com/Akka-Actor-Kernel—Re:–scala–Lift-and-Goat-Rodeo-td24124685.html.
Akka is therefore meant to be a kind of framework that supports a System Of Service application. It has many features, too much to cover all at once, and therefore we will start of with having a look at Akka’s actors supervisor capabilities.
Akka has an Actor model with supervision trees, based on Erlang’s OTP design principles. Supervisors are processes that monitor a set of workers, that do the actual processing. The basic idea of a supervisor is that it should keep its child processes alive by restarting them when necessary. A worker is an Actor, that receives and processes a message. When a worker dies abnormally (by throwing an Exception), it is up to the supervisor to determine what to do. It may decide to restart the worker, and try to process the message again, with a maximum number of retries set. The approach is to expect that failures can and eventually will happen. Instead of trying to prevent this, the idea is to Let it crash (TM Jonas Bonér), reset the worker to a stable state, restart it. This idea has served Ericsson well in building highly fault tolerant, distributed message passing software. We can now profit from this idea in Scala as well via Akka.
In order to achieve this, Akka has its own version of the Actors library, that have more ‘system of service’ features than the standard Actors library found in the Scala distribution. To demonstrate a simple example of a supervisor class, here’s some sample code to demonstrate the idea. First, we start of with a simple Actor:
[scala]
import se.scalablesolutions.akka.config.ScalaConfig.
import se.scalablesolutions.akka.actor.{OneForOneStrategy, Actor}
sealed trait Message
case class Supervise(worker: Worker) extends Message
case class DoWork(work: Work) extends Message
case object Die extends Message
class Worker(workerName:String) extends Actor {
lifeCycleConfig = Some(LifeCycle(Permanent, 100))
def receive: PartialFunction[Any, Unit] = {
case DoWork(work:String) =>
log.debug("start working… at: " + work)
case Reset =>
log.info("%s has been reset", toString)
case Die =>
log.debug("Dying…")
throw new RuntimeException("I’m dead: " + this.toString)
case other =>
log.error("Unknown event: %s", other)
}
override def preRestart(reason: AnyRef, config: Option[AnyRef]) {
log.debug("pre-restarting " + this)
}
override def postRestart(reason: AnyRef, config: Option[AnyRef]) {
log.debug("post-restarting " + this)
}
override def toString = "[" + workerName + "]"
[/scala]
As regards to sending message passing, an Akka actor follows the same API as the ‘normal’ Scala ones: you can send a message to an actor using the ! (pronounced bang) method. This is a ‘fire and forget’ message, which is then processed asynchronously by the actor. There are some more options to send messages, but we’ll skip these for now. In Akka, Actors have to be started explicitly by calling actor.start for it to be able to process any messages. It can be stopped by calling actor.stop.
Compared to the standard Scala actors, the API for processing messages is more limited: there is exactly one method that you must override. This is the receive method, which must return a PartialFunction. The receive method uses a pattern match to distinguish between various messages it can process. A pattern match is compiled down to a PartialFunction, which is the return value of the pattern match.
To send messages to actors, we’ve defined a Message trait, which various case classes that represent the actual messages extend. In principle, you can send any message to an Actor, however, in practice however, the only messages you should send should be immutable. In Scala, case classes and case objects are immutable by default, and thus are a natural fit for this. In this case, we’ve defined a ‘DoWork’ message (all messages extends from the same Message trait), which should set the Worker actor to do something that hopefully will be be of some use. When the worker receives a Die message, it will throw an exception. This is a somewhat silly example of course, but hopefully it demonstrates the idea.
So far, nothing special, apart perhaps the pre- and postRestart methods, and the field lifeCycleConfig = Some(LifeCycle(Permanent, 100)). This is configuration for Akka’s actors supervisor hierarchy capabilities. To see further how this works, let’s code up a supervisor:
[scala]
import se.scalablesolutions.akka.actor.OneForOneStrategy
object WorkerSupervisor extends Actor {
trapExit = true
faultHandler = Some(OneForOneStrategy(3, 100))
def receive: PartialFunction[Any, Unit] = {
case DoSupervise(worker:Worker) =>
log.info("Supervising worker: " + worker)
startLink(worker)
case unknown =>
log.error("Unknown event: %s", unknown)
}
}
[/scala]
Our supervisor is also an Akka actor (in this case, it is an object, i.e. a singleton in Scala), that processes ‘DoSupervise(worker)’ messages. It handles this message by starting to supervise the worker by the invoking the ‘startLink(worker)’ method, which is defined in the Actor class. This will add our worker to the list of its linked actors, and also handles starts the linked Actor at the same time. Of cours the nice thing is that the Actor class will handle thread safety for us, so we don’t need to worry about any multithreading issues here.
The linking achieves exactly what it promises: if a linked Actor (the worker in our case) throws an exception, the supervising actor (our WorkerSupervisor) will be notified of this. For this, it is necessary to override the variable trapExit, and set it to true. It may then decide what to do based on the fault handler strategy Strategy that is defined by defining the faultHandler variable. There are currently two strategies: OneForOne, meaning that only the actor that has crashed will be restarted, and AllForOne, meaning all linked actors (including the crashed one) will be restarted. This is also where the lifeCycleConfig defined on our worker Actor comes in. LifeCycle(Permanent, 100) means our actor has a permanent LifeCycle and will always be restarted in case of an exception. The preRestart and postRestart are callback methods that may be overridden by the supervised actor to do some initialization work when it is restarted. Note that our supervised actors will probably be some kind of application services, instead of our useless Worker actor, and therefore may really need to do some usefull initialization work like setting up resources, connections, etc, in order to operate properly.
Instead of manually coding a Supervisor like we’ve done, there’s an alternative in setting up a set of linked services in a more declarative way by extending Akka’s SupervisorFactory class, as follows:
[scala]
import se.scalablesolutions.akka.actor.SupervisorFactory
import se.scalablesolutions.akka.config.ScalaConfig.{LifeCycle, SupervisorConfig, RestartStrategy, Supervise, OneForOne, Permanent}
class MySupervisorFactory extends SupervisorFactory {
val worker = new Worker("worker-1")
override protected def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(OneForOne, 3, 10),
Supervise(
worker,
LifeCycle(Permanent, 1000)) :: Nil)
}
}
object factory extends MySupervisorFactory
[/scala]
This defines a list of supervised actors (in our case, a one element list containing our worker) and then use the following code:
[scala]
val supervisor = factory.newSupervisor
supervisor.startSupervisor
[/scala]
which will link all workers to one supervisor (defined within the framework) and (re)start them based on the configured strategy. You can therefore pick your preferred way of configuring.
All that remains is to see whether it actually works. For this we need a small simulation class, defined as follows:
[scala]
class Simulation {
var worker:Worker =
@Before
def setUp = {
worker = new Worker("worker-1")
WorkerSupervisor ! DoSupervise(worker)
}
@After
def tearDown = {
WorkerSupervisor.stop
}
@Test
def testSuperviseWorker = {
Thread.sleep(500)
println("\n===> start working")
worker ! DoWork("Some work")
Thread.sleep(500)
worker ! Die
Thread.sleep(1000)
worker ! DoWork("Some more work")
println("\n===> finished")
}
}
[/scala]
Running this shows output like the following:
INF [20091020-19:28:25.242] akka: Supervising worker: [worker-1] DEB [20091020-19:28:25.242] akka: Linking actor [[worker-1]] to actor [Actor[1256143591312:class akka.supervision.WorkerSupervisor$]] ===> start working DEB [20091020-19:28:25.750] akka: start working... at: Some work DEB [20091020-19:28:26.249] akka: Dying... java.lang.RuntimeException: I'm dead: [worker-1] at akka.supervision.Worker$$anonfun$receive$1.apply(Worker.scala:48) at akka.supervision.Worker$$anonfun$receive$1.apply(Worker.scala:38) at scala.PartialFunction$$anon$1.apply(PartialFunction.scala:38) at se.scalablesolutions.akka.actor.Actor$class.transactionalDispatch(Actor.scala:496) at se.scalablesolutions.akka.actor.Actor$class.invoke(Actor.scala:461) at akka.supervision.Worker.invoke(Worker.scala:33) at se.scalablesolutions.akka.actor.ActorMessageInvoker.invoke(Actor.scala:39) at se.scalablesolutions.akka.reactor.EventBasedThreadPoolDispatcher$$anon$1$$anon$2. run(EventBasedThreadPoolDispatcher.scala:99) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619) DEB [20091020-19:28:26.260] labs: pre-restarting [worker-1] INF [20091020-19:28:26.262] labs: Restarting actor [class akka.supervision.Worker] configured as PERMANENT. DEB [20091020-19:28:26.265] labs: post-restarting [worker-1] ===> finished DEB [20091020-19:28:27.251] labs: start working... at: Some more work Process finished with exit code 0
Which indeed shows that our rudimentary supervisors work.
This concludes the introduction into Akka’s supervising actors. There is much more that Akka’s actors are capable of than shown here (remoting, transactions, STM, cassandra and mongo backend, to name just a few), but those are materials for future blogs. It may be perhaps a bit much to take all at once, as it requires to learn an API that is likely to be unknown to most of us. I won’t deny that the Akka framework certainly has some learning curve, but the reward is great once you get the hang of it.
Akka is relatively young and still evolving. If you’re interested in its future directions, check out the roadmap for the 0.6 release.