Some years ago, when Reactive Streams lived in utopia we got the assignment to build a high-volume message broker. A considerable amount of code of the solution we delivered back then was dedicated to prevent this broker being flooded with messages in case an endpoint became slow.
How would we have solved this problem today with the shiny new Akka Reactive Stream (experimental) implementation just within reach?
In this blog we explore Akka Streams in general and TCP Streams in particular. Moreover, we show how much easier we can solve the challenge we faced backed then using Streams.
A use-case for TCP Back Pressure
The high-volume message broker mentioned in the introduction basically did the following:
- Read messages (from syslog) from a TCP socket
- Parse the message
- Forward the message to another system via a TCP connection
For optimal throughput multiple TCP connections were available, which allowed delivering messages to the endpoint system in parallel. The broker was supposed to handle about 4000 – 6000 messages per second. As follows a schema of the noteworthy components and message flow:
Naturally we chose Akka as framework to implement this application. Our approach was to have an Actor for every TCP connection to the endpoint system. An incoming message was then forwarded to one of these connection Actors.
The biggest challenge was related to back pressure: how could we prevent our connection Actors from being flooded with messages in case the endpoint system slowed down or was not available? With 6000 messages per second an Actor’s mailbox is flooded very quickly.
Another requirement was that message buffering had to be done by the client application, which was syslog. Syslog has excellent facilities for that. Durable mailboxes or something the like was out of the question. Therefore, we had to find a way to pull only as many messages in our broker as it could deliver to the endpoint. In other words: provide our own back pressure implementation.
A considerable amount of code of the solution we delivered back then was dedicated to back pressure. During one of our re-occurring innovation days we tried to figure out how much easier the back pressure challenge would have been if Akka Streams would have been available.
Akka Streams in a nutshell
In case you are new to Akka Streams as follows some basic information that help you understand the rest of the blog.
The core ingredients of a Reactive Stream consist of three building blocks:
- A Source that produces some values
- A Flow that performs some transformation of the elements produced by a Source
- A Sink that consumes the transformed values of a Flow
Akka Streams provide a rich DSL through which transformation pipelines can be composed using the mentioned three building blocks.
A transformation pipeline executes asynchronously. For that to work it requires a so called FlowMaterializer, which will execute every step of the pipeline. A FlowMaterializer uses Actor’s for the pipeline’s execution even though from a usage perspective you are unaware of that.
A basic transformation pipeline looks as follows:
[scala]
import akka.stream.scaladsl.
import akka.stream.FlowMaterializer
import akka.actor.ActorSystem
implicit val actorSystem = ActorSystem()
implicit val materializer = FlowMaterializer()
val numberReverserFlow: Flow[Int, String] = Flow[Int].map(.toString.reverse)
numberReverserFlow.runWith(Source(100 to 200), ForeachSink(println))
[/scala]
We first create a Flow that consumes Ints and transforms them into reversed Strings. For the Flow to run we call the runWith method with a Source and a Sink. After runWith is called, the pipeline starts executing asynchronously.
The exact same pipeline can be expressed in various ways, such as:
[scala]
//Use the via method on the Source that to pass in the Flow
Source(100 to 200).via(numberReverserFlow).to(ForeachSink(println)).run()
//Directly call map on the Source.
//The disadvantage of this approach is that the transformation logic cannot be re-used.
Source(100 to 200).map(_.toString.reverse).to(ForeachSink(println)).run()
[/scala]
For more information about Akka Streams you might want to have a look at this Typesafe presentation.
A simple reverse proxy with Akka Streams
Lets move back to our initial quest. The first task we tried to accomplish was to create a stream that accepts data from an incoming TCP connection, which is forwarded to a single outgoing TCP connection. In that sense this stream was supposed to act as a typical reverse-proxy that simply forwards traffic to another connection. The only remarkable quality compared to a traditional blocking/synchronous solution is that our stream operates asynchronously while preserving back-pressure.
[scala]
import java.net.InetSocketAddress
import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.io.StreamTcp
import akka.stream.scaladsl.ForeachSink
implicit val system = ActorSystem("on-to-one-proxy")
implicit val materializer = FlowMaterializer()
val serverBinding = StreamTcp().bind(new InetSocketAddress("localhost", 6000))
val sink = ForeachSink[StreamTcp.IncomingConnection] { connection =>
println(s"Client connected from: ${connection.remoteAddress}")
connection.handleWith(StreamTcp().outgoingConnection(new InetSocketAddress("localhost", 7000)).flow)
}
val materializedServer = serverBinding.connections.to(sink).run()
serverBinding.localAddress(materializedServer)
[/scala]
First we create the mandatory instances every Akka reactive Stream requires, which is an ActorSystem and a FlowMaterializer. Then we create a server binding using the StreamTcp Extension that listens to incoming traffic on localhost:6000. With the ForeachSink[StreamTcp.IncomingConnection] we define how to handle the incoming data for every StreamTcp.IncomingConnection by passing a flow of type Flow[ByteString, ByteString]. This flow consumes ByteStrings of the IncomingConnection and produces a ByteString, which is the data that is sent back to the client.
In our case the flow of type Flow[ByteString, ByteString] is created by means of the StreamTcp().outgoingConnection(endpointAddress).flow. It forwards a ByteString to the given endpointAddress (here localhost:7000) and returns its response as a ByteString as well. This flow could also be used to perform some data transformations, like parsing a message.
Parallel reverse proxy with a Flow Graph
Forwarding a message from one connection to another will not meet our self defined requirements. We need to be able to forward messages from a single incoming connection to a configurable amount of outgoing connections.
Covering this use-case is slightly more complex. For it to work we make use of the flow graph DSL.
[scala]
import akka.util.ByteString
import akka.stream.scaladsl.
import akka.stream.scaladsl.FlowGraphImplicits.
private def parallelFlow(numberOfConnections:Int): Flow[ByteString, ByteString] = {
PartialFlowGraph { implicit builder =>
val balance = Balance[ByteString]
val merge = Merge[ByteString]
UndefinedSource("in") ~> balance
1 to numberOfConnections map { _ =>
balance ~> StreamTcp().outgoingConnection(new InetSocketAddress("localhost", 7000)).flow ~> merge
}
merge ~> UndefinedSink("out")
} toFlow (UndefinedSource("in"), UndefinedSink("out"))
}
[/scala]
We construct a flow graph that makes use of the junction vertices Balance and Merge, which allow us to fan-out the stream to several other streams. For the amount of parallel connections we want to support, we create a fan-out flow starting with a Balance vertex, followed by a OutgoingConnection flow, which is then merged with a Merge vertex.
From an API perspective we faced the challenge of how to connect this flow to our IncomingConnection. Almost all flow graph examples take a concrete Source and Sink implementation as starting point, whereas the IncomingConnection does neither expose a Source nor a Sink. It only accepts a complete flow as input. Consequently, we needed a way to abstract the Source and Sink since our fan-out flow requires them.
The flow graph API offers the PartialFlowGraph class for that, which allows you to work with abstract Sources and Sinks (UndefinedSource and UndefinedSink). We needed quite some time to figure out how they work: simply declaring a UndefinedSource/Sink without a name won’t work. It is essential that you give the UndefinedSource/Sink a name which must be identical to the one that is used in the UndefinedSource/Sink passed in the toFlow method. A bit more documentation on this topic would help.
Once the fan-out flow is created, it can be passed to the handleWith method of the IncomingConnection:
[scala]
…
val sink = ForeachSink[StreamTcp.IncomingConnection] { connection =>
println(s"Client connected from: ${connection.remoteAddress}")
val parallelConnections = 20
connection.handleWith(parallelFlow(parallelConnections))
}
…
[/scala]
As a result, this implementation delivers all incoming messages to the endpoint system in parallel while still preserving back-pressure. Mission completed!
Testing the Application
To test our solution we wrote two helper applications:
- A blocking client that pumps as many messages as possible into a socket connection to the parallel reverse proxy
- A server that delays responses with a configurable latency in order to mimic a slow endpoint. The parallel reverse proxy forwards messages via one of its connections to this endpoint.
The following chart depicts the increase in throughput with the increase in amount of connections. Due to the nondeterministic concurrent behavior there are some spikes in the results but the trend shows a clear correlation between throughput and amount of connections:
End-to-end solution
The end-to-end solution can be found here.
By changing the numberOfConnections variable you can see the impact on performance yourself.
Check it out! …and go with the flow 😉
Information about TCP back pressure with Akka Streams
At the time of this writing there was not much information available about Akka Streams, due to the fact that it is one of the newest toys of the Typesafe factory. As follows some valuable resources that helped us getting started:
- Akka Stream Scaladoc