Blog

Erforschung des TCP-Gegendrucks von Akka Stream

Urs Peter

Aktualisiert Oktober 22, 2025
8 Minuten

Vor einigen Jahren, als Reactive Streams noch Utopie war, bekamen wir den Auftrag, einen Nachrichtenmakler für große Mengen an Nachrichten zu bauen. Ein beträchtlicher Teil des Codes der Lösung, die wir damals lieferten, diente dazu, zu verhindern, dass dieser Broker mit Nachrichten überflutet wird, wenn ein Endpunkt langsam wird. Wie hätten wir dieses Problem heute gelöst, da die glänzende neue Akka Reactive Stream (experimentelle) Implementierung zum Greifen nahe ist? In diesem Blog untersuchen wir Akka Streams im Allgemeinen und TCP Streams im Besonderen. Außerdem zeigen wir, wie viel einfacher wir die damalige Herausforderung mit Streams lösen können.

Ein Anwendungsfall für TCP Back Pressure

Der in der Einleitung erwähnte High-Volume Message Broker hat im Wesentlichen Folgendes getan:

  • Lesen von Nachrichten (aus syslog) von einem TCP-Socket
  • Parsen Sie die Nachricht
  • Weiterleitung der Nachricht an ein anderes System über eine TCP-Verbindung

Für einen optimalen Durchsatz standen mehrere TCP-Verbindungen zur Verfügung, über die Nachrichten parallel an das Endpunktsystem geliefert werden konnten. Der Broker sollte etwa 4000 - 6000 Nachrichten pro Sekunde verarbeiten. Im Folgenden finden Sie ein Schema der wichtigsten Komponenten und des Nachrichtenflusses:

Wasserschlauch2

Natürlich haben wir Akka als Framework für die Implementierung dieser Anwendung gewählt. Unser Ansatz bestand darin, einen Actor für jede TCP-Verbindung zum Endpunktsystem zu haben. Eine eingehende Nachricht wurde dann an einen dieser Verbindungs-Aktoren weitergeleitet. Die größte Herausforderung war der Gegendruck: Wie konnten wir verhindern, dass unsere Verbindungs-Aktoren mit Nachrichten überflutet wurden, falls das Endpunktsystem langsamer wurde oder nicht verfügbar war? Bei 6000 Nachrichten pro Sekunde wird die Mailbox eines Actors sehr schnell überflutet. Eine weitere Anforderung war, dass die Nachrichtenpufferung von der Client-Anwendung, also von Syslog, übernommen werden musste. Syslog verfügt über ausgezeichnete Funktionen für diese Aufgabe. Dauerhafte Mailboxen oder etwas Ähnliches kamen nicht in Frage. Daher mussten wir einen Weg finden, nur so viele Nachrichten in unseren Broker zu ziehen, wie er an den Endpunkt liefern konnte. Mit anderen Worten: Wir mussten unsere eigene Backpressure-Implementierung bereitstellen. Ein beträchtlicher Teil des Codes der Lösung, die wir damals lieferten, war dem Backpressure gewidmet. Während eines unserer wiederkehrenden Innovationstage versuchten wir herauszufinden, wie viel einfacher die Backpressure-Herausforderung gewesen wäre, wenn Akka Streams verfügbar gewesen wäre.

Akka Streams in einer Nussschale

Falls Sie Akka Streams noch nicht kennen, finden Sie im Folgenden einige grundlegende Informationen, die Ihnen helfen, den Rest des Blogs zu verstehen. Die Kernbestandteile eines Reactive Streams bestehen aus drei Bausteinen:

  • Eine Quelle, die einige Werte produziert
  • Eine Bewegung, die eine Transformation der von einer Quelle erzeugten Elemente durchführt
  • Eine Senke, die die transformierten Werte eines Flows verbraucht

Akka Streams bietet eine umfangreiche DSL, mit der Transformationspipelines unter Verwendung der drei genannten Bausteine zusammengestellt werden können. Eine Transformationspipeline wird asynchron ausgeführt. Damit dies funktioniert, benötigt sie einen so genannten FlowMaterializer, der jeden Schritt der Pipeline ausführt. Ein FlowMaterializer verwendet Actor's für die Ausführung der Pipeline, auch wenn Ihnen das aus der Sicht der Verwendung nicht bewusst ist. Eine grundlegende Transformationspipeline sieht wie folgt aus: [scala] import akka.stream.scaladsl. import akka.stream.FlowMaterializer import akka.actor.ActorSystem implicit val actorSystem = ActorSystem() implicit val materializer = FlowMaterializer() val numberReverserFlow: Flow[Int, String] = Flow[Int].map(.toString.reverse) numberReverserFlow.runWith(Source(100 bis 200), ForeachSink(println)) [/scala] Wir erstellen zunächst einen Flow, der Ints verbraucht und sie in umgekehrte Strings umwandelt. Damit der Flow ausgeführt werden kann, rufen wir die runWith-Methode mit einer Source und einer Sink auf. Nach dem Aufruf von runWith beginnt die Pipeline mit der asynchronen Ausführung. Dieselbe Pipeline kann auf verschiedene Weise ausgedrückt werden, z.B: [scala] // Source(100 to 200).via(numberReverserFlow).to(ForeachSink(println)).run() //Direkt map auf der Source aufrufen. //Der Nachteil dieses Ansatzes ist, dass die Transformationslogik nicht wiederverwendet werden kann. Source(100 bis 200).map(_.toString.reverse).to(ForeachSink(println)).run() [/scala] Weitere Informationen über Akka Streams finden Sie in dieser Typesafe Präsentation.

Ein einfacher Reverse Proxy mit Akka Streams

Kehren wir zu unserer ursprünglichen Aufgabe zurück. Die erste Aufgabe, die wir zu bewältigen versuchten, bestand darin, einen Stream zu erstellen, der Daten von einer eingehenden TCP-Verbindung entgegennimmt, die an eine einzige ausgehende TCP-Verbindung weitergeleitet werden. In diesem Sinne sollte dieser Stream wie ein typischer Reverse-Proxy funktionieren, der den Datenverkehr einfach an eine andere Verbindung weiterleitet. Die einzige bemerkenswerte Eigenschaft im Vergleich zu einer herkömmlichen blockierenden/synchronen Lösung ist, dass unser Stream asynchron arbeitet und dabei den Gegendruck beibehält. [scala] import java.net.InetSocketAddress import akka.actor.ActorSystem import akka.stream.FlowMaterializer import akka.stream.io.StreamTcp import akka.stream.scaladsl.ForeachSink implicit val system = ActorSystem("on-to-one-proxy") implicit val materializer = FlowMaterializer() val serverBinding = StreamTcp().bind(new InetSocketAddress("localhost", 6000)) val sink = ForeachSink[StreamTcp.IncomingConnection] { connection => println(s "Client verbunden von: ${connection.remoteAddress}") connection.handleWith(StreamTcp().outgoingConnection(new InetSocketAddress("localhost", 7000)).flow) } val materializedServer = serverBinding.connections.to(sink).run() serverBinding.localAddress(materializedServer) [/scala] Zunächst erstellen wir die obligatorischen Instanzen, die jeder Akka reactive Stream benötigt, nämlich ein ActorSystem und einen FlowMaterializer. Dann erstellen wir mit der StreamTcp-Erweiterung eine Serververbindung, die auf eingehenden Datenverkehr auf localhost:6000 wartet. Mit ForeachSink[StreamTcp.IncomingConnection] definieren wir, wie die eingehenden Daten für jede StreamTcp.IncomingConnection behandelt werden sollen, indem wir einen Flow vom Typ Flow[ByteString, ByteString] übergeben. Dieser Fluss verbraucht ByteStrings der IncomingConnection und erzeugt einen ByteString, also die Daten, die an den Client zurückgeschickt werden. In unserem Fall wird der Fluss vom Typ Flow[ByteString, ByteString] mit Hilfe des Flusses StreamTcp().outgoingConnection(endpointAddress).flow erstellt. Er leitet einen ByteString an die angegebene endpointAddress (hier localhost:7000) weiter und gibt seine Antwort ebenfalls als ByteString zurück. Dieser Fluss könnte auch dazu verwendet werden, einige Datenumwandlungen vorzunehmen, z.B. das Parsen einer Nachricht.

Paralleler Reverse Proxy mit einem Flussdiagramm

Die Weiterleitung einer Nachricht von einer Verbindung zu einer anderen erfüllt nicht unsere selbst definierten Anforderungen. Wir müssen in der Lage sein, Nachrichten von einer einzigen eingehenden Verbindung an eine konfigurierbare Anzahl von ausgehenden Verbindungen weiterzuleiten. Die Abdeckung dieses Anwendungsfalls ist etwas komplexer. Damit er funktioniert, verwenden wir die Flow Graph DSL. [scala] import akka.util.ByteString import akka.stream.scaladsl. import akka.stream.scaladsl.FlowGraphImplicits. private def parallelFlow(numberOfConnections:Int): Flow[ByteString, ByteString] = { PartialFlowGraph { implicit builder => val balance = Balance[ByteString] val merge = Merge[ByteString] UndefinedSource("in") ~> balance 1 to numberOfConnections map { _ => balance ~> StreamTcp().outgoingConnection(new InetSocketAddress("localhost", 7000)).flow ~> merge } merge ~> UndefinedSink("out") } toFlow (UndefinedSource("in"), UndefinedSink("out")) } [/scala] Wir konstruieren einen Flussgraphen, der die Verbindungspunkte Balance und Merge verwendet, mit denen wir den Stream auf mehrere andere Streams auffächern können. Für die Anzahl der parallelen Verbindungen, die wir unterstützen wollen, erstellen wir einen Fan-Out-Flow, der mit einem Balance-Vertex beginnt, gefolgt von einem OutgoingConnection-Flow, der dann mit einem Merge-Vertex zusammengeführt wird. Aus API-Sicht standen wir vor der Herausforderung, wie wir diesen Flow mit unserer IncomingConnection verbinden können. Fast alle Flow-Graph-Beispiele gehen von einer konkreten Source- und Sink-Implementierung aus, während die IncomingConnection weder eine Source noch eine Sink aufweist. Sie akzeptiert nur einen vollständigen Datenfluss als Eingabe. Folglich brauchten wir eine Möglichkeit, Source und Sink zu abstrahieren, da unser Fan-Out-Flow sie benötigt. Die Flow Graph API bietet dafür die Klasse PartialFlowGraph, mit der Sie mit abstrakten Sources und Sinks (UndefinedSource und UndefinedSink) arbeiten können. Wir haben einige Zeit gebraucht, um herauszufinden, wie sie funktionieren: Einfach eine UndefinedSource/Sink ohne Namen zu deklarieren, funktioniert nicht. Sie müssen der UndefinedSource/Sink unbedingt einen Namen geben, der mit dem Namen der UndefinedSource/Sink identisch ist, die in der toFlow-Methode übergeben wird. Ein wenig mehr Dokumentation zu diesem Thema wäre hilfreich. Sobald der Fan-Out-Flow erstellt ist, kann er an die handleWith-Methode der IncomingConnection übergeben werden: [scala] ... val sink = ForeachSink[StreamTcp.IncomingConnection] { connection => println(s "Client verbunden von: ${connection.remoteAddress}") val parallelConnections = 20 connection.handleWith(parallelFlow(parallelConnections)) [/scala] } ... Im Ergebnis liefert diese Implementierung alle eingehenden Nachrichten parallel an das Endpunktsystem und bewahrt gleichzeitig den Gegendruck. Mission erfüllt!

Testen der Anwendung

Um unsere Lösung zu testen, haben wir zwei Hilfsprogramme geschrieben:

  • Ein blockierender Client, der so viele Nachrichten wie möglich in eine Socket-Verbindung zum parallelen Reverse-Proxy pumpt
  • Ein Server, der Antworten mit einer konfigurierbaren Latenzzeit verzögert, um einen langsamen Endpunkt zu imitieren. Der parallele Reverse-Proxy leitet Nachrichten über eine seiner Verbindungen zu diesem Endpunkt weiter.

Das folgende Diagramm veranschaulicht den Anstieg des Durchsatzes mit der Zunahme der Anzahl der Verbindungen. Aufgrund des nicht-deterministischen gleichzeitigen Verhaltens gibt es einige Ausschläge in den Ergebnissen, aber der Trend zeigt eine klare Korrelation zwischen Durchsatz und Anzahl der Verbindungen:

Leistung_Karte

End-to-End-Lösung

Die End-to-End-Lösung finden Sie hier. Wenn Sie die Variable numberOfConnections ändern, können Sie die Auswirkungen auf die Leistung selbst sehen. Probieren Sie es aus! ...und lassen Sie sich treiben ;-)

Informationen über TCP-Gegendruck mit Akka Streams

Zum Zeitpunkt der Erstellung dieses Artikels gab es noch nicht viele Informationen über Akka Streams, da es sich um eines der neuesten Spielzeuge der Typesafe Factory handelt. Im Folgenden finden Sie einige wertvolle Ressourcen, die uns den Einstieg erleichtert haben:

  • Akka Stream Scaladoc

Verfasst von

Urs Peter

Contact

Let’s discuss how we can support your journey.