Blog

Using Vulcan Codecs with Kafka Java APIs

13 Apr, 2023
Xebia Background Header Wave

Working on training material for Circe last year and talking about Kafka, I was introduced to Vulcan.

For those that aren’t familiar, Vulcan is a functional Avro encoding library that uses the official Apache Avro library under the hood. The difference between this and the official Avro build plugins approach is that the types are defined in plain Scala. Then the Avro schema is generated from those instead of defining the Avro schema and getting code generated at compile time that adheres to that schema.

The obvious benefit for Scala development is that dealing with Java types is no longer needed.

I’m not a big fan of working with Java types in the model, having to convert back and forth between Scala’s and Java’s types, so Vulcan was exciting to me, and I took a deep dive into it.

Vulcan example

If this sounds interesting to you so far, let me show you what I mean with some examples before we focus on the main topic of this post. Here is a quick and simple definition of a model with an Avro schema:

import vulcan.Codec
import vulcan.generic.*
import java.time.Instant
import java.util.UUID

case class Data(id: UUID, timestamp: Instant, value: String)

object Data:
    given Codec[Data] = Codec.derive[Data]

Looks clean, doesn’t it? At a basic level, this is all we need. But, of course, we can add more things to it to customize the resulting Avro schema, like metadata:

@AvroNamespace("com.example")
@AvroDoc("Type containing some data as a String as well as id and timestamp")
case class Data(id: UUID, timestamp: Instant, value: String)

There are other annotations like @AvroName and @AvroNullDefault; using them for the fields is also possible. But suppose you aren’t a big fan of annotations like myself. In that case, especially when case classes get bigger, and it isn’t as nice and straightforward as in blogpost examples, it’s also possible to define the Codec directly without using derivation:

Codec.record[Data](
  namespace = "com.example",
  doc = Some("Type containing some data as a String as well as id and timestamp")
) { field =>
  (
    field("id", _.id),
    field("timestamp", _.timestamp),
    field("value", _.value),
  ).mapN(Data(_, _, _))
}

For the full spec, check out the Vulcan documentation.

Codec.encode and Codec.decode

Ok, enough introductions. Now let’s talk about something that a lot of you might be thinking:

Cool, but where is my Avro schema? How do I actually use it with Kafka?

These are good questions, and you’ll find throughout this article that they might be a bit more involved than you might think, but bear with me; it’s worth it.

For the first one, the Avro schema is part of the Codec. It’s accessible through a method in the Codec class that returns an Either[AvroError, Schema]:

summon[Codec[Data]].schema

The second question is more complex, so let’s take it step by step.

First, look at the Codec class definition (simplified):

abstract class Codec[A] {
  /**
    * The Java type that this codec will encode to. The resulting value will in turn be
    * converted to a binary or JSON-based Avro format by the underlying Avro SDK.
    *
    * This type is of interest mainly because it determines what Avro type the data
    * will ultimately be encoded to; therefore, we express it using type aliases named
    * according to the Avro type they represent.
    */
  type AvroType

  /** The schema or an error if the schema could not be generated. */
  def schema: Either[AvroError, Schema]

  /** Attempts to encode the specified value using the provided schema. */
  def encode(a: A): Either[AvroError, AvroType]

  /** Attempts to decode the specified value using the provided schema. */
  def decode(value: Any, schema: Schema): Either[AvroError, A]
}

As you probably noticed, the encode method output is of type AvroType if the operation is successful, but what type is that exactly?

Well, this will be the representation of the Avro schema as Scala types, similar to the Json type in libraries like Circe. In fact, the same ideas are useful to get an understanding of what’s going on here: first, we create an intermediate representation (AvroType), and then we "print it" to the target representation (text in the case of Circe, bytes in our case).

The list and correspondence between Scala and Avro types are defined in vulcan.Avro object.

Similar to encode, the decode method will expect some kind of intermediate representation, so we cannot simply pass to it plain bytes. Here is where serdes come into play. This is a two-step process.

So let’s recap where we are at the moment with a simple diagram. It will help to visualize the missing pieces.

   ┌── Codec[Data].encode ──┐   ┌────────── A? ───────────┐
   │                        │   │                         │
┌──┴───┐                ┌───▼───┴───┐                ┌────▼────┐
│ Data │                │  AvroType │                │  Bytes  │
└──▲───┘                └───┬───▲───┘                └────┬────┘
   │                        │   │                         │
   └── Codec[Data].decode ──┘   └────────── B? ───────────┘

Let’s examine what A and B are and how to define them.

Serializers and Deserializers

Typically, A would be called Serializer and B Deserializer, but there is also the concept of Serde (SERializer + DEserialize) which is just the two together.

Looking into Vulcan’s documentation, you might have found that it doesn’t provide a way to go from AvroType to Bytes by itself. Instead, this is implemented in another library from the same org that integrates Vulcan with FS2: fs2-kafka. Working with this library, the classes that accomplish this transformation are AvroSerializer and AvroDeserializer and it’s nicely done so the amount of boilerplate is minimal. In fact, if working with simple types like String, Int, or even UUID, a serializer/deserializer with be provided.

But how do we accomplish the same without working with fs2 (and effect types) but with Kafka APIs directly? For example, using KafkaProducer or Kafka Streams? Vulcan doesn’t do this for us, so let’s try implementing a simple version ourselves.

Implementing a Serde

The way we’re going to go about this is by implementing org.apache.kafka.common.serialization.Serializer and org.apache.kafka.common.serialization.Deserializer, and delegating to io.confluent.kafka.serializers.KafkaAvroSerializer to handle the registration of the schema with Schema registry and other logic.

First, we need to get a Serializer given a Codec, so a method that does that could look something like this:

def serializer[A](using codec: Codec[A]): Serializer[A] = new Serializer[A] {
  override def serialize(topic: String, data: A): Array[Byte] = ???
}

Now,

how do we go from A to Array[Byte]?

Well, just use KafkaAvroSerializer from confluent as we said, right? Not exactly, because it expects an IndexedRecord or primitive types (it also supports user-defined classes, but it uses reflection to find a Serializer for the given class, so we won’t go into that), not any class like Data (remember it’s just a plain case class). To make it work, we must first transform A into a record, and Codec.encode does that for us. So far, we have:

def serializer[A](schemaRegistryUrl: String, isKey: Boolean)(using
    codec: Codec[A]
): Serializer[A] = new Serializer[A] {

  private val serializer = new KafkaAvroSerializer
  serializer.configure(Map("schema.registry.url" -> schemaRegistryUrl).asJava, isKey)

  override def serialize(topic: String, data: A): Array[Byte] =
    serializer.serialize(topic, codec.encode(data) match {
      case Left(err)     => throw new RuntimeException(err.message)
      case Right(record) => record
    })
}

Let’s go over the changes:

  • KafkaAvroSerializer needs some configuration to know how to reach Schema Registry and if the serializer is for a key or a value. Those two things were added as a parameter to our function.
  • Codec[A].encode returns an Either[AvroError, AvroType]. Pattern matching is used to distinguish between the possible cases.
  • In the case of failure to encode the data, the only possible thing to do is to raise an exception since the method’s signature cannot be changed to accommodate an Either or some other form of dealing with failure in the types.

Now that we have a serializer, let’s try to do the same with Deserializer.

def avroDeserializer[T](schemaRegistryUrl: String, isKey: Boolean)(using
    codec: Codec[T]
): Deserializer[T] =
  new Deserializer[T] {
    private val deserializer = new KafkaAvroDeserializer
    deserializer.configure(Map("schema.registry.url" -> schemaRegistryUrl).asJava, isKey)

    override def deserialize(topic: String, data: Array[Byte]): T =
      (for {
        readerSchema <- codec.schema.left.map(_.throwable)
        avro         <- Try(deserializer.deserialize(topic, data, readerSchema)).toEither
        decoded      <- codec.decode(avro, readerSchema).left.map(_.throwable)
      } yield decoded).fold(err => throw new RuntimeException(err.getMessage), identity)
  }

This one is a bit more complicated because more things like the reader schema need to be handled. Also, there are more points of failure. Let’s go over it:

  • First, we do the same as before, but this time with KafkaAvroDeserializer.
  • Then, for the implementation of deserialize, the first step is to get the schema from Codec.schema, which may not exist. That’s why it returns an Either[AvroError, Schema]. To compose the left side of Either with the next step, AvroError is mapped to a Throwable.
  • Next, deserialize the bytes with deserializer.deserialize. This call may fail, so it is wrapped in Try to catch a possible exception and convert the result to an Either
  • The last step is to decode using codec.decode and the reader’s schema. This can also fail, and it returns an Either[AvroError, A] and, as before, the error is mapped to a Throwable.

At the end, the return type must be of type A so the Either is folded to return the contents of the Right branch or throw with the error from the Left branch.

Lastly, to get a Serde[A]:

def avroSerde[T](schemaRegistryUrl: String, isKey: Boolean)(using Codec[T]): Serde[T] =
  Serdes.serdeFrom(
    avroSerializer(schemaRegsitryUrl, isKey),
    avroDeserializer(schemaRegistryUrl, isKey)
  )

Caveats

This implementation has a limitation inherited from the KafkaAvroSerializer, which is that it isn’t possible to use sealed traits (union types) as top-level subjects when publishing to Kafka. This is because of the underlying serialization mechanism not having proper support for union types, being a Java library.

This limitation was encountered in fs2-kafka, and it has a workaround that can be applied here as well:

The relevant part for us is the override of the serialize method of KafkaAvroSerializer, so instead of just creating it without parameters, it should be created like this:

private val avroSerializer = codec.schema match {
  case Left(_) => new KafkaAvroSerializer
  case Right(schema) =>
    new KafkaAvroSerializer {
      // Overrides the default auto-registration behaviour, which attempts to guess the
      // writer schema based on the encoded representation used by the Java Avro SDK.
      // This works for types such as Records, which contain a reference to the exact schema
      // that was used to write them, but doesn't work so well for unions (where
      // the default behaviour is to register just the schema for the alternative
      // being produced) or logical types such as timestamp-millis (where the logical
      // type is lost).
      val parsedSchema = new AvroSchema(schema.toString)
      override def serialize(topic: String, record: AnyRef): Array[Byte] = {
        if (record == null) {
          return null
        }
        serializeImpl(
          getSubjectName(topic, isKey, record, parsedSchema),
          record,
          parsedSchema
        )
      }
    }
}

This code has been taken from the PR in fs2-kafka linked above.

Conclusions

To wrap up, we have explored the functionalities provided by Vulcan and implemented a way to integrate it with Kafka APIs that expect Serializer and Deserializer since Vulcan does not support this out of the box.

By working on this article, I’ve learned a lot about the internals of Vulcan and Kafka serialization, but I’m not an expert by any means. So these examples might have some rough edges that need some polish, but I hope they helped to provide a better understanding of this cool library.

If you have different ideas or improvements, don’t hesitate to contact us and propose them.

The code used for this article can be found here for reference, and if you want to dig deeper into this, here is a list of resources that I used and related projects:

Questions?

Get in touch with us to learn more about the subject and related solutions

Explore related posts