Blog

Datenanreicherung in Flink SQL mit dem HTTP Connector für Flink - Teil Eins

Krzysztof Chmielewski

Aktualisiert Oktober 17, 2025
7 Minuten

HTTP-Konnektor für Flink SQL

In unseren Projekten bei GetInData arbeiten wir viel daran, die Data-Engineering-Fähigkeiten unserer Kunden zu erweitern, indem wir mehr Menschen in die Lage versetzen, datengesteuerte Anwendungen zu entwickeln, nicht nur Software-Ingenieure. Unser Weg, Nicht-Ingenieure zu befähigen, besteht darin, ihnen eine weit verbreitete und höherstufige Definitionssprache für die Datenverarbeitung, wie SQL, an die Hand zu geben.

Eine der Spezialitäten unseres Unternehmens sind Projekte, bei denen wir intensive Datenströme verarbeiten müssen. Bei der Verarbeitung selbst handelt es sich nicht um einfache Transformationen, sondern auch um die Anreicherung mit Daten aus externen Datenquellen, Stream Joins und komplexe Aggregationen. Diese sind zwar immer noch auf dem neuesten Stand der Technik, werden aber derzeit immer häufiger mit Frameworks wie Apache Flink, Spark oder GCP Dataflow gelöst. Das ist es, was wir in unseren Projekten verwenden.

Aber wie können wir nun Nicht-Ingenieure in die Lage versetzen, diese komplexe Verarbeitung in SQL zu definieren? Daran arbeiten wir intern bei GetInData sehr intensiv.

In diesem Blogbeitrag werde ich mich auf den Anwendungsfall konzentrieren, auf den wir gestoßen sind: die Verwendung von Flink SQL zur Anreicherung von Datenströmen mit Daten, auf die über eine HTTP-API zugegriffen wird. Wir werden uns auf die geschäftliche Seite konzentrieren, während Teil zwei einige technische Details dieser Implementierung beleuchten wird.
flink connector http getindata

Apache Flink - SQL

Die Apache Flink-Plattform ist ein Open-Source-Projekt, das die Stream-Verarbeitung mit niedriger Latenz in großem Maßstab unterstützt. Apache Flink ist ein Cluster von Knoten, in dem zustandsabhängige Datenverarbeitungsaufträge auf die Arbeitsknoten verteilt werden.

Flink bietet eine ANSI-Standard-konforme SQL-API. Sie wird durch Flink-SQL implementiert, mit dem Sie Datenverarbeitungspipelines definieren und Datenquellen, Senken und Datentransformationsfunktionen, einschließlich Mustererkennung, ausdrücken können.

Anwendungsfall

Der Anwendungsfall, an dem wir gearbeitet haben, war ziemlich einfach:

"Als Datenanalyst möchte ich eingehende Daten mit einem Machine Learning-Modell zur weiteren Verarbeitung anreichern.

Die Daten des Machine Learning-Modells werden über HTTP/GET API durch ein externes Webservice-System bereitgestellt.

Die Logik sollte mit SQL ausgedrückt werden."

Sie können sich auch einen anderen Fall vorstellen:

"Als Datenanalyst möchte ich eingehende Datenströme über Kredite mit detaillierten Metadaten der Benutzer anreichern.

Die Benutzerdaten werden in einem externen System gespeichert und über HTTP/GET API durch ein externes Webservice-System bereitgestellt.

Die Logik sollte mit SQL ausgedrückt werden."

Jede beliebige Geschichte, sei es ein Kartenzahlungsstrom, ein Aktientransaktionsstrom oder ein Klickstrom aus einem Online-Kreditantragsformular, kann hier verwendet werden, da eine Anreicherung fast immer erforderlich ist, um einen größeren Kontext für die nachgelagerte Verarbeitung zu schaffen. Dieser Anreicherungsschritt beinhaltet in der Regel das Abrufen von Daten aus einem externen System. In vielen Fällen ist der Zugriff auf diese Daten nur über eine REST-API möglich.

Das Problem

Die Anreicherung mit externen Daten wird in der Regel über eine benutzerdefinierte Funktion (User Defined Function - UDF) realisiert. In diesem Fall könnte das Anwendungsbeispiel etwa so aussehen:

SELECT dedicatedOrdersEnrichmen(orderId) FROM orders

Im Falle der Verwendung eines ML-basierten Dienstes könnte die generische Funktion wie folgt aussehen:

SELECT genericEnrichment(orderId, "http://3rdpartyservice.com/service/ml"") FROM orders

Zwei Dinge können wir sofort erkennen:

  1. Der Benutzer muss wissen, dass dedicatedOrdersEnrichmen existiert.
  2. Der Benutzer muss wissen, welche Art von Parametern dedicatedOrdersEnrichmen erwartet. Mit anderen Worten, der Benutzer muss wissen, wie er diese UDF verwenden kann.

Die Herausforderung besteht hier in der Wiederverwendbarkeit von dedicatedOrdersEnrichmen. Diese UDF sehr spezifisch zu gestalten, würde sie benutzerfreundlicher machen. Es wären weniger Parameter erforderlich, was die Abfrage vereinfachen würde, aber wir würden für jeden neuen Fall eine neue UDF benötigen. Neue UDFs würden einen erfahrenen Java-Programmierer erfordern, um sie zu implementieren. Da neue UDFs implementiert, bereitgestellt und verwaltet werden müssen, kann dies die Markteinführungszeit für eine solche Funktionalität erhöhen.

Andererseits könnten wir es einmal generisch und wiederverwendbar implementieren, wie im Beispiel von genericEnrichment, aber das macht es viel weniger benutzerfreundlich und anfälliger für Fehler.

Eine gute Balance zwischen beidem zu finden, wäre schwierig und für jeden Anwendungsfall spezifisch.

Was wäre, wenn...

Was wäre, wenn ich Ihnen sage, dass es einen anderen Weg gibt, einen besseren Weg?

Was wäre, wenn wir es wie einen normalen SQL JOIN anreichern könnten?

Lassen Sie uns den Anwendungsfall aufschlüsseln:

  • Wir haben eine Tabelle mit Ausgangsdaten - Bestellungen.
  • Wir haben eine Tabelle mit Machine Learning-Modelldaten - ML_Data.
  • Datensätze aus beiden Tabellen können mit einem Schlüssel verbunden werden.

Die SQL-Abfrage würde in diesem Fall lauten:

SELECT Orders.\*, ML_Data.\* FROM Orders AS o JOIN ML_Data AS ml ON o.id = ml.id

So einfach ist das.

Können wir den gleichen Ansatz für Flink-SQL verwenden? Nun, dank unseres http-flink-connector können wir das jetzt.

Mit dem flink-http-connector, den wir als Open Source zur Verfügung gestellt haben, können wir Flink SQL-Tabellen definieren, die als Datenquelle für die Anreicherung dienen. Auf eine solche Tabelle kann in der SQL JOIN-Abfrage Bezug genommen werden. Sie können sich das Repository hier ansehen.

Verwenden Sie

Mit http-flink-connector können wir eine Datentabelle wie folgt definieren:

CREATE TABLE ML_Data (

  id STRING,

  id2 STRING,

  msg STRING,

  uuid STRING,

  isActive STRING,

  balance STRING

) WITH (

  'connector' = 'rest-lookup',

  'url' = 'http://localhost:8080/client'

)

Diese neu erstellte Tabelle ist in der Tat eine neue Datenquelle, die mit reinem Flink-SQL erstellt wurde, ohne eine Programmiersprache wie Java oder Scala zu verwenden. Nur SQL und einige wenige Konfigurationsparameter reichen aus, um auszudrücken, dass diese Tabelle von einem externen Webdienst unterstützt wird.

Wir können viele solcher Quellen definieren, wobei jede von ihnen unterschiedliche Schemata haben und verschiedene externe Dienste nutzen kann. Diese Quellen fungieren dann als Standard-SQL-Tabellen und können von Analysten verwendet werden.

Der Parameter url definiert die Basis-URL für die REST-API, die zum Abrufen der Daten aus dem externen System verwendet wird. Derzeit wird nur die Methode HTTP/GET unterstützt.

Die Flink SQL-Abfrage, die unseren Anwendungsfall erfüllen würde, muss den so genannten "Lookup Join" verwenden . Ohne zu sehr ins Detail zu gehen, übergibt der Lookup Join die JOIN-Argumente an den Connector. Der Connector kann diese Argumente verwenden, um die HTTP-Anfrage zu erstellen.

Die SQL-Anweisung für den Enrichment Join mit den beiden Argumenten id und id2 mit der Tabelle, die durch einen http-Connector unterstützt wird, würde wie folgt aussehen:

SELECT o.id, o.id2, c.msg, ml.uuid, ml.isActive FROM Orders AS o

JOIN ML_Data FOR SYSTEM_TIME AS OF o.proc_time AS ml ON o.id = ml.id AND o.id2 = ml.id2

Die Abfrage sieht fast wie eine Standard-SQL-Abfrage aus, die das bekannte Konzept von JOIN verwendet, um den geschäftlichen Anwendungsfall auszudrücken.

Der Teil SYSTEM_TIME AS OF o.proc_time ist derzeit erforderlich, damit Flink dies als Lookup Join ausführen kann, und wird im zweiten Teil dieses Blog-Beitrags etwas näher erläutert.

Fazit

SQL wird bei der täglichen Arbeit in der Big Data-Welt verwendet. Die Endbenutzer werden es zu schätzen wissen, dass sie nicht zwischen den Dialekten umschalten und sich keine spezifischen Befehle oder Konfigurationsoptionen merken müssen.

In diesem Blog Post haben wir beschrieben, wie die Anreicherung mit Daten aus einem externen System über dessen REST-API mit Hilfe unseres Open Source http-flink-connector als Flink SQL-Abfrage ausgedrückt werden kann. Der Konnektor basiert auf dem Konzept der Lookup Joins von Flink.

Mit GetInData füllen wir die Lücke für einen Anwendungsfall, bei dem der Zugriff auf die Daten nicht über eine direkte Datenbankverbindung erfolgen kann, sondern, was ein häufiger Anwendungsfall zu sein scheint, über eine Rest-API eines externen Webdienstes.

Da wir glauben, dass dies ein nützliches Tool für andere sein könnte, haben wir beschlossen, diesen Connector unter der Open-Source-Lizenz zu veröffentlichen (HTTP Flink Connector).

Bitte denken Sie daran, dass sich dieser Connector noch in der Anfangsphase befindet, obwohl wir hoffen, dass er sich in Zukunft weiterentwickeln wird.

Die Details der Implementierung und mögliche Verbesserungen werden im zweiten Teil dieses Blogbeitrags beschrieben, bleiben Sie also bitte dran.

Viel Spaß beim Codieren!!!

---

Hat Ihnen dieser Blogbeitrag gefallen? Schauen Sie sich unsere anderen Blogs an und melden Sie sich für unseren Newsletter an, um auf dem Laufenden zu bleiben!

Verfasst von

Krzysztof Chmielewski

Contact

Let’s discuss how we can support your journey.