Blog

Verwendung von Vulcan Codecs mit Kafka Java APIs

Aktualisiert Oktober 15, 2025
10 Minuten

Als ich letztes Jahr an Schulungsmaterial für Circe arbeitete und über Kafka sprach, wurde ich in Vulcan eingeführt.

Für diejenigen, die damit nicht vertraut sind: Vulcan ist eine funktionale Avro-Kodierungsbibliothek, die die offizielle Apache Avro-Bibliothek unter der Haube verwendet. Der Unterschied zu den offiziellen Avro Build-Plugins besteht darin, dass die Typen in einfachem Scala definiert werden. Daraus wird dann das Avro-Schema generiert, anstatt das Avro-Schema zu definieren und zur Kompilierzeit einen Code zu erzeugen, der sich an dieses Schema hält.

Der offensichtliche Vorteil für die Scala-Entwicklung ist, dass der Umgang mit Java-Typen nicht mehr erforderlich ist.

Ich bin kein großer Fan davon, mit Java-Typen im Modell zu arbeiten und zwischen Scala- und Java-Typen hin und her zu konvertieren. Deshalb war Vulcan für mich so aufregend, dass ich mich intensiv damit befasst habe.

Beispiel Vulkan

Wenn sich das für Sie interessant anhört, lassen Sie mich Ihnen anhand einiger Beispiele zeigen, was ich meine, bevor wir uns auf das Hauptthema dieses Beitrags konzentrieren. Hier ist eine schnelle und einfache Definition eines Modells mit einem Avro-Schema:

import vulcan.Codec
import vulcan.generic.*
import java.time.Instant
import java.util.UUID

case class Data(id: UUID, timestamp: Instant, value: String)

object Data:
    given Codec[Data] = Codec.derive[Data]

Sieht sauber aus, nicht wahr? Im Grunde ist das alles, was wir brauchen. Aber natürlich können wir weitere Dinge hinzufügen, um das resultierende Avro-Schema anzupassen, z. B. Metadaten:

@AvroNamespace("com.example")
@AvroDoc("Type containing some data as a String as well as id and timestamp")
case class Data(id: UUID, timestamp: Instant, value: String)

Es gibt noch andere Anmerkungen wie @AvroName und @AvroNullDefault; die Verwendung dieser Anmerkungen für die Felder ist ebenfalls möglich. Aber nehmen wir an, Sie sind kein großer Fan von Annotationen, so wie ich. In diesem Fall, insbesondere wenn die Fallklassen größer werden und es nicht so schön und einfach ist wie in den Blogpost-Beispielen, ist es auch möglich, den Codec direkt zu definieren, ohne eine Ableitung zu verwenden:

Codec.record[Data](
  namespace = "com.example",
  doc = Some("Type containing some data as a String as well as id and timestamp")
) { field =>
  (
    field("id", _.id),
    field("timestamp", _.timestamp),
    field("value", _.value),
  ).mapN(Data(_, _, _))
}

Die vollständigen Spezifikationen finden Sie in der Vulcan-Dokumentation.

Codec.encode und Codec.decode

Ok, genug der Vorrede. Lassen Sie uns jetzt über etwas sprechen, das viele von Ihnen vielleicht denken:

Cool, aber wo ist mein Avro-Schema? Wie verwende ich es eigentlich mit Kafka?

Das sind gute Fragen, und Sie werden im Laufe dieses Artikels feststellen, dass sie etwas komplizierter sind, als Sie vielleicht denken, aber haben Sie Geduld mit mir; es lohnt sich.

Bei der ersten Variante ist das Avro-Schema Teil des Codec. Es ist über eine Methode in der Klasse Codec zugänglich, die ein Either[AvroError, Schema] zurückgibt:

summon[Codec[Data]].schema

Die zweite Frage ist komplexer, also lassen Sie uns Schritt für Schritt vorgehen.

Sehen Sie sich zunächst die Definition der Klasse Codec an (vereinfacht):

abstract class Codec[A] {
  /**
    * The Java type that this codec will encode to. The resulting value will in turn be
    * converted to a binary or JSON-based Avro format by the underlying Avro SDK.
    *
    * This type is of interest mainly because it determines what Avro type the data
    * will ultimately be encoded to; therefore, we express it using type aliases named
    * according to the Avro type they represent.
    */
  type AvroType

  /** The schema or an error if the schema could not be generated. */
  def schema: Either[AvroError, Schema]

  /** Attempts to encode the specified value using the provided schema. */
  def encode(a: A): Either[AvroError, AvroType]

  /** Attempts to decode the specified value using the provided schema. */
  def decode(value: Any, schema: Schema): Either[AvroError, A]
}

Wie Sie wahrscheinlich bemerkt haben, ist die Ausgabe der Methode encode vom Typ AvroType, wenn der Vorgang erfolgreich war, aber welcher Typ ist das genau?

Nun, dies wird die Darstellung des Avro-Schemas als Scala-Typen sein, ähnlich wie der Json-Typ in Bibliotheken wie Circe. In der Tat sind die gleichen Ideen nützlich, um zu verstehen, was hier vor sich geht: zuerst erstellen wir eine Zwischendarstellung (AvroType) und dann "drucken" wir sie in die Zieldarstellung (Text im Fall von Circe, Bytes in unserem Fall).

Die Liste und die Korrespondenz zwischen Scala- und Avro-Typen sind in vulcan.Avro object definiert.

Ähnlich wie bei encode erwartet die Methode decode eine Art von Zwischendarstellung, so dass wir ihr nicht einfach nur Bytes übergeben können. Hier kommen die Serden ins Spiel. Dies ist ein zweistufiger Prozess.

Lassen Sie uns also anhand eines einfachen Diagramms rekapitulieren, wo wir im Moment stehen. Es wird Ihnen helfen, die fehlenden Teile zu visualisieren.

   ┌── Codec[Data].encode ──┐   ┌────────── A? ───────────┐
   │                        │   │                         │
┌──┴───┐                ┌───▼───┴───┐                ┌────▼────┐
│ Data │                │  AvroType │                │  Bytes  │
└──▲───┘                └───┬───▲───┘                └────┬────┘
   │                        │   │                         │
   └── Codec[Data].decode ──┘   └────────── B? ───────────┘

Lassen Sie uns untersuchen, was A und B sind und wie man sie definiert.

Serialisierer und Deserialisierer

Typischerweise wird A als Serializer und B als Deserializer bezeichnet, aber es gibt auch das Konzept von Serde (SERializer + DEserialize), das nur die beiden zusammen darstellt.

Wenn Sie sich die Dokumentation von Vulcan ansehen, werden Sie feststellen, dass es keine Möglichkeit bietet, von AvroType zu Bytes zu wechseln. Stattdessen ist dies in einer anderen Bibliothek der gleichen Organisation implementiert, die Vulcan mit FS2 integriert: fs2-kafka. Bei der Arbeit mit dieser Bibliothek sind die Klassen, die diese Umwandlung bewerkstelligen, folgende AvroSerializer und AvroDeserializer und es ist gut gemacht, so dass die Menge an Boilerplate minimal ist. Wenn Sie mit einfachen Typen wie String, Int oder sogar UUID arbeiten, wird sogar ein Serialisierer/Deserialisierer bereitgestellt.

Aber wie können wir dasselbe erreichen, ohne mit fs2 (und Effekttypen) zu arbeiten, sondern direkt mit Kafka-APIs? Zum Beispiel, indem wir KafkaProducer oder Kafka Streams verwenden? Vulcan tut dies nicht für uns, also versuchen wir, eine einfache Version selbst zu implementieren.

Eine Serde implementieren

Dazu implementieren wir org.apache.kafka.common.serialization.Serializer und org.apache.kafka.common.serialization.Deserializer und delegieren die Registrierung des Schemas bei der Schemaregistrierung und andere Logik an io.confluent.kafka.serializers.KafkaAvroSerializer.

Zunächst müssen wir einen Serializer mit einem Codec abrufen. Eine Methode, die dies tut, könnte etwa so aussehen:

def serializer[A](using codec: Codec[A]): Serializer[A] = new Serializer[A] {
  override def serialize(topic: String, data: A): Array[Byte] = ???
}

Jetzt,

Wie kommen wir von A zu Array[Byte]?

Nun, verwenden Sie einfach KafkaAvroSerializer von confluent, wie wir gesagt haben, richtig? Nicht ganz, denn es erwartet einen IndexedRecord oder primitive Typen (es unterstützt auch benutzerdefinierte Klassen, aber es verwendet Reflection, um einen Serializer für die gegebene Klasse zu finden, also gehen wir nicht darauf ein), nicht irgendeine Klasse wie Data (denken Sie daran, dass es sich nur um eine einfache Case-Klasse handelt). Damit es funktioniert, müssen wir zunächst A in einen Datensatz umwandeln, und Codec.encode erledigt das für uns. Bis jetzt haben wir:

def serializer[A](schemaRegistryUrl: String, isKey: Boolean)(using
    codec: Codec[A]
): Serializer[A] = new Serializer[A] {

  private val serializer = new KafkaAvroSerializer
  serializer.configure(Map("schema.registry.url" -> schemaRegistryUrl).asJava, isKey)

  override def serialize(topic: String, data: A): Array[Byte] =
    serializer.serialize(topic, codec.encode(data) match {
      case Left(err)     => throw new RuntimeException(err.message)
      case Right(record) => record
    })
}

Schauen wir uns die Änderungen an:

  • KafkaAvroSerializer benötigt eine gewisse Konfiguration, um zu wissen, wie man Schema Registry erreicht und ob der Serialisierer für einen Schlüssel oder einen Wert ist. Diese beiden Dinge wurden als Parameter zu unserer Funktion hinzugefügt.
  • Codec[A].encode gibt ein Either[AvroError, AvroType] zurück. Der Mustervergleich wird verwendet, um zwischen den möglichen Fällen zu unterscheiden.
  • Wenn es nicht gelingt, die Daten zu kodieren, kann nur eine Ausnahme ausgelöst werden, da die Signatur der Methode nicht geändert werden kann, um ein Entweder-Oder oder eine andere Form des Umgangs mit Fehlern in den Typen zu ermöglichen.

Da wir nun einen Serializer haben, können wir dasselbe mit dem Deserializer tun.

def avroDeserializer[T](schemaRegistryUrl: String, isKey: Boolean)(using
    codec: Codec[T]
): Deserializer[T] =
  new Deserializer[T] {
    private val deserializer = new KafkaAvroDeserializer
    deserializer.configure(Map("schema.registry.url" -> schemaRegistryUrl).asJava, isKey)

    override def deserialize(topic: String, data: Array[Byte]): T =
      (for {
        readerSchema <- codec.schema.left.map(_.throwable)
        avro         <- Try(deserializer.deserialize(topic, data, readerSchema)).toEither
        decoded      <- codec.decode(avro, readerSchema).left.map(_.throwable)
      } yield decoded).fold(err => throw new RuntimeException(err.getMessage), identity)
  }

Diese Variante ist etwas komplizierter, da mehr Dinge wie das Leserschema gehandhabt werden müssen. Außerdem gibt es mehr Fehlerquellen. Schauen wir uns das mal an:

  • Zuerst machen wir dasselbe wie zuvor, aber diesmal mit KafkaAvroDeserializer.
  • Bei der Implementierung von deserialize muss zunächst das Schema von Codec.schema abgerufen werden, das möglicherweise nicht existiert. Deshalb wird ein zurückgegeben. Um die linke Seite von Either mit dem nächsten Schritt zusammenzusetzen, wird AvroError auf ein Throwable abgebildet.
  • Als nächstes deserialisieren Sie die Bytes mit deserializer.deserialize. Dieser Aufruf kann fehlschlagen, daher ist er in Try verpackt, um eine mögliche Ausnahme abzufangen und das Ergebnis in ein Either
  • Der letzte Schritt ist die Dekodierung mit codec.decode und dem Schema des Lesers. Auch dies kann fehlschlagen und gibt ein Either[AvroError, A] zurück. Wie zuvor wird der Fehler auf ein Throwable abgebildet.

Am Ende muss der Rückgabetyp vom Typ A sein, so dass Either gefaltet wird, um den Inhalt der Verzweigung Right zurückzugeben oder mit dem Fehler aus der Verzweigung Left zu werfen.

Und schließlich, um eine Serde[A] zu erhalten:

def avroSerde[T](schemaRegistryUrl: String, isKey: Boolean)(using Codec[T]): Serde[T] =
  Serdes.serdeFrom(
    avroSerializer(schemaRegsitryUrl, isKey),
    avroDeserializer(schemaRegistryUrl, isKey)
  )

Vorbehalte

Diese Implementierung hat eine von KafkaAvroSerializer übernommene Einschränkung, nämlich dass es nicht möglich ist, versiegelte Traits (Union-Typen) als Top-Level-Subjekte bei der Veröffentlichung in Kafka zu verwenden. Das liegt daran, dass der zugrundeliegende Serialisierungsmechanismus keine richtige Unterstützung für Union-Typen bietet, da es sich um eine Java-Bibliothek handelt.

Diese Einschränkung wurde in fs2-kafka festgestellt, und es gibt einen Workaround, der auch hier angewendet werden kann:

Der für uns relevante Teil ist die Überschreibung der Methode serialize von KafkaAvroSerializer. Anstatt sie also einfach ohne Parameter zu erstellen, sollte sie wie folgt erstellt werden:

private val avroSerializer = codec.schema match {
  case Left(_) => new KafkaAvroSerializer
  case Right(schema) =>
    new KafkaAvroSerializer {
      // Overrides the default auto-registration behaviour, which attempts to guess the
      // writer schema based on the encoded representation used by the Java Avro SDK.
      // This works for types such as Records, which contain a reference to the exact schema
      // that was used to write them, but doesn't work so well for unions (where
      // the default behaviour is to register just the schema for the alternative
      // being produced) or logical types such as timestamp-millis (where the logical
      // type is lost).
      val parsedSchema = new AvroSchema(schema.toString)
      override def serialize(topic: String, record: AnyRef): Array[Byte] = {
        if (record == null) {
          return null
        }
        serializeImpl(
          getSubjectName(topic, isKey, record, parsedSchema),
          record,
          parsedSchema
        )
      }
    }
}

Dieser Code wurde aus dem oben verlinkten PR in fs2-kafka übernommen.

Schlussfolgerungen

Abschließend haben wir die von Vulcan bereitgestellten Funktionen untersucht und eine Möglichkeit zur Integration mit Kafka-APIs implementiert, die Serializer und Deserializer erwarten, da Vulcan dies nicht von Haus aus unterstützt.

Durch die Arbeit an diesem Artikel habe ich viel über die Interna von Vulcan und die Kafka-Serialisierung gelernt, aber ich bin bei weitem kein Experte. Diese Beispiele könnten also einige Ecken und Kanten haben, die noch etwas Feinschliff benötigen, aber ich hoffe, dass sie zu einem besseren Verständnis dieser coolen Bibliothek beigetragen haben.

Wenn Sie andere Ideen oder Verbesserungen haben, zögern Sie nicht, uns zu kontaktieren und uns diese vorzuschlagen.

Den für diesen Artikel verwendeten Code finden Sie hier - github.com/47degrees/vulcan-kafka-serdes als Referenz. Wenn Sie tiefer in die Materie einsteigen möchten, finden Sie hier eine Liste der von mir verwendeten Ressourcen und verwandten Projekte:

Contact

Let’s discuss how we can support your journey.