Anyone who was written code that has to coordinate complex asynchronous workflows knows it can be a real pain, especially when you limit yourself to using only callbacks directly. Various tools have arisen to tackle these issues, like Reactive Extensions and Javascript promises.
Clojure‘s answer comes in the form of core.async: An implementation of CSP for both Clojure and Clojurescript. In this post I want to demonstrate how powerful core.async is under a variety of circumstances. The context will be writing a Vert.x event-handler.
Vert.x is a young, light-weight, polyglot, high-performance, event-driven application platform on top of the JVM. It has an actor-like concurrency model, where the coarse-grained actors (called verticles) can communicate over a distributed event bus. Although Vert.x is still quite young, it’s sure to grow as a big player in the future of the reactive web.
Scenarios
The scenario is as follows. Our verticle registers a handler on some address and depends on 3 other verticles.
1. Composition
Imagine the new Mars rover got stuck against some Mars rock and we need to send it instructions to destroy the rock with its inbuilt laser. Also imagine that the controlling software is written with Vert.x. There is a single verticle responsible for handling the necessary steps:
- Use the sensor to locate the position of the rock
- Use the position to scan hardness of the rock
- Use the hardness to calibrate and fire the laser. Report back status
- Report success or failure to the main caller
As you can see, in each step we need the result of the previous step, meaning composition.
A straightforward callback-based approach would look something like this:
[clojure]
(ns example.verticle
(:require [vertx.eventbus :as eb]))
(eb/on-message
"console.laser"
(fn [instructions]
(let [reply-msg eb/*current-message*]
(eb/send "rover.scope" (scope-msg instructions)
(fn [coords]
(eb/send "rover.sensor" (sensor-msg coords)
(fn [data]
(let [power (calibrate-laser data)]
(eb/send "rover.laser" (laser-msg power)
(fn [status]
(eb/reply* reply-msg (parse-status status))))))))))))
[/clojure]
A code structure quite typical of composed async functions. Now let’s bring in core.async:
[clojure]
(ns example.verticle
(:refer-clojure :exclude [send])
(:require [ vertx.eventbus :as eb]
[ clojure.core.async :refer [go chan put! <!]]))
(defn send [addr msg]
(let [ch (chan 1)]
(eb/send addr msg #(put! ch %))
ch))
(eb/on-message
"console.laser"
(fn [instructions]
(go (let [coords (<! (send "rover.scope" (scope-msg instructions)))
data (<! (send "rover.sensor" (sensor-msg coords)))
power (calibrate-laser data)
status (<! (send "rover.laser" (laser-msg power)))]
(eb/reply (parse-status status))))))
[/clojure]
We created our own reusable send function which returns a channel on which the result of eb/send will be put. Apart from the <!'s inside of the 'go' expression, this reads as sequential code.
2. Concurrent requests
Another thing we might want to do is query different handlers concurrently. Although we can use composition, this is not very performant as we do not need to wait for reply from service-A in order to call service-B.
As a concrete example, imagine we need to collect atmospheric data about some geographical area in order to make a weather forecast. The data will include the temperature, humidity and wind speed which are requested from three different independent services. Once all three asynchronous requests return, we can create a forecast and reply to the main caller. But how do we know when the last callback is fired? We need to keep some memory (mutable state) which is updated when each of the callback fires and process the data when the last one returns.
core.async easily accommodates this scenario without adding extra mutable state for coordinations inside your handlers. The state is contained in the channel.
[clojure]
(eb/on-message
"forecast.report"
(fn [coords]
(let [ch (chan 3)]
(eb/send "temperature.service" coords #(put! ch {:temperature %}))
(eb/send "humidity.service" coords #(put! ch {:humidity %}))
(eb/send "wind-speed.service" coords #(put! ch {:wind-speed %}))
(go (let [data (merge (<! ch) (<! ch) (<! ch))
forecast (create-forecast data)]
(eb/reply forecast))))))
[/clojure]
3. Fastest response
Sometimes there are multiple services at your disposal providing similar functionality and you just want the fastest one. With just a small adjustment, we can make the previous code work for this scenario as well.
[clojure]
(eb/on-message
"server.request"
(fn [msg]
(let [ch (chan 3)]
(eb/send "service-A" msg #(put! ch %))
(eb/send "service-B" msg #(put! ch %))
(eb/send "service-C" msg #(put! ch %))
(go (eb/reply (<! ch))))))
[/clojure]
We just take the first result on the channel and ignore the other results. After the go block has replied, there are no more takers on the channel. The results from the services that were too late are still put on the channel, but after the request finished, there are no more references to it and the channel with the results can be garbage-collected.
4. Handling timeouts and choice with alts!
We can create timeout channels that close themselves after a specified amount of time. Closed channels can not be written to anymore, but any messages in the buffer can still be read. After that, every read will return nil.
One thing core.async provides that most other tools don’t is choice. From the examples:
One killer feature for channels over queues is the ability to wait on many channels at the same time (like a socket select). This is done with `alts!!` (ordinary threads) or `alts!` in go blocks.
This, combined with timeout channels gives the ability to wait on a channel up a maximum amount of time before giving up. By adjusting example 2 a bit:
[clojure]
(eb/on-message
"forecast.report"
(fn [coords]
(let [ch (chan)
t-ch (timeout 3000)]
(eb/send "temperature.service" coords #(put! ch {:temperature %}))
(eb/send "humidity.service" coords #(put! ch {:humidity %}))
(eb/send "wind-speed.service" coords #(put! ch {:wind-speed %}))
(go-loop [n 3 data {}]
(if (pos? n)
(if-some [result (alts! [ch t-ch])]
(recur (dec n) (merge data result))
(eb/fail 408 "Request timed out"))
(eb/reply (create-forecast data)))))))
[/clojure]
This will do the same thing as before, but we will wait a total of 3s for the requests to finish, otherwise we reply with a timeout failure. Notice that we did not put the timeout parameter in the vert.x API call of eb/send. Having a first-class timeout channel allows us to coordinate these timeouts more more easily than adding timeout parameters and failure-callbacks.
Wrapping up
The above scenarios are clearly simplified to focus on the different workflows, but they should give you an idea on how to start using it in Vert.x.
Some questions that have arisen for me is whether core.async can play nicely with Vert.x, which was the original motivation for this blog post. Verticles are single-threaded by design, while core.async introduces background threads to dispatch go-blocks or state machine callbacks. Since the dispatched go-blocks carry the correct message context the functions eb/send, eb/reply, etc.. can be called from these go blocks and all goes well.
There is of course a lot more to core.async than is shown here. But that is a story for another blog.