Blog

How to use Akka Streams or Reactor with Vert.X

13 Nov, 2019
Xebia Background Header Wave

With a few lines of code you can process Vert.X streams using any Reactive Streams implementation, while preserving backpressure. The Reactive Streams helpers can convert Vert.X streams to the Publishers and Subscribers of Reactive Streams. The API is a bit confusing at first, this article will clear it up.
streaming river

Why not use RxJava?

There is builtin support for RxJava which can extends the Vert.X core API. The RxJava helpers convert Vert.X streams to RxJava observable. You might want to use Reactor because it’s built by Pivotal or prefer the Akka streams API. Check out this comparison between Reactor and Rxjava or this comparison between Reactor and Akka Streams.
But even if you are going to stick to RxJava it’s good to know Vert.X is compatible with any Reactive Streams implementation.

How to use the Reactive Streams helpers

Vert.X provides a ReactiveWriteStream and a ReactiveReadStream. They can be seen as pipes to convert between the Reactive Streams and Vert.X streams world. You can write to a ReactiveWriteStream using a Vert.X ReadStreams and read from it using Reactive Streams because it implements the Publisher interface. The ReactiveReadStream is Vert.X ReadStream that implements the Subscriber interface. The naming of these helpers makes sense from the Vert.X side, however from the Reactive Streams side you are reading from a WriteStream and writing to a ReadStream. The diagram below might make it more clear.

Diagram showing Reactor and Vert.X

Events from a Vert.X Readstream are piped through Reactor to a Vert.X Writestream

Using this knowledge you can create your own Reactive Streams helpers. The example below allows processing of Vert.X streams by passing them through Reactor. The processing itself takes place in the processor function. A Flux of the input is passed as parameter and a Flux of the output it returned.
[gist id=091849b8bb8d935d7343416f52d72a3e]

Next steps

If you want to take this a step further and want to extend the Vert.X Core APIs with a different Reactive Streams implementation you could have a look a how the Vert.X implementation for RxJava works. Most of the extension methods are automatically generated. It’s good that the RxJava extension methods are not baked into the Vert.X core. That makes me confident you could make it work with RxJava alternatives.

Wrap up

The interoperability that comes from the Reactive Streams specification ensures you can mix and match implementations. Even if you are not switching yet, it’s good to know that it is possible in the future.

Questions?

Get in touch with us to learn more about the subject and related solutions

Explore related posts