Die Datenverarbeitung in Echtzeit ist für Unternehmen von entscheidender Bedeutung geworden, und Apache Flink mit seinen leistungsstarken Stream-Processing-Funktionen steht dabei an vorderster Front. Eine Herausforderung ist jedoch nach wie vor die Schieflage von Daten, insbesondere bei der Verarbeitung großer Datenmengen. In diesem Blog werde ich effektive Strategien zur Abschwächung von Datenverzerrungen in Flink SQL vorstellen, um eine effiziente und ausgewogene Verarbeitung zu gewährleisten.
Flink (plus) SQL-Übersicht
Flink SQL ist das Framework, das die schnelle und relativ einfache Entwicklung von Batch- und Streaming-Transformationen ermöglicht. Meiner Meinung nach sollte es als "Flink + SQL" bezeichnet werden (mit einem Pluszeichen dazwischen), denn SQL-Kenntnisse sind nicht ausreichend. Auch wenn SQL-Kenntnisse das Erlernen von Programmiersprachen wie Java, Scala oder Python überflüssig machen, sind solide Kenntnisse der Grundlagen von Flink für die Entwicklung effizienter Streaming-Pipelines nach wie vor unerlässlich.
Zunächst ist es von Vorteil, das Konzept der dynamischen Tabellen und den Unterschied zwischen Changelog und Append-Only-Streams zu verstehen. Für den Datenabgleich kann es erforderlich sein, Wasserzeichen zu definieren und Aggregationen auf der Grundlage von Fensterfunktionen zu erstellen.
Tabellen-Joins in Flink sind ähnlich wie die in relationalen Datenbanken. Je nach den geschäftlichen Anforderungen können Sie reguläre, temporale oder Lookup-Joins benötigen, die jeweils unterschiedlich funktionieren und unterschiedliche Ergebnisse liefern können.
Strategien verbinden
Obwohl Flink mehrere Join-Typen bietet, gibt es im Streaming-Modus nur zwei Join-Strategien. Die erste ist der Lookup-Join, eine einzigartige Strategie für Lookup-Tabellen oder Ansichten. Sie wird an Ort und Stelle ausgeführt (ohne Datenmischung), erfordert jedoch externen Speicher für das Abrufen von Daten. Sie erweist sich als vorteilhaft für die Verknüpfung mit sich langsam ändernden Dimensionstabellen, wie z.B. Wörterbüchern, oder in Szenarien, in denen die Speicherung von Daten im Flink-Status unerwünscht ist (z.B. bei einer riesigen Tabelle) und die Datenausrichtung kein Problem darstellt.
Die primäre Strategie ähnelt dem Shuffle-Hash-Join. Bei dieser Methode werden die Zeilen aus den verbundenen Tabellen auf der Grundlage von Hashes, die aus den Join-Schlüsselwerten berechnet werden, auf die Teilaufgaben verteilt. Die Leistung dieser Verknüpfung kann jedoch durch einige sehr häufig vorkommende Werte, wie z.B. NULLs, beeinträchtigt werden.
Fallstudie: Umgang mit Datenschieflage in Flink SQL
Stellen Sie sich ein Szenario vor, in dem wir eine linke Verknüpfung zwischen der großen Tabelle TABLE_A (mit 140 Millionen Zeilen und steigender Tendenz) und einem kleinen Wörterbuch TABLE_B durchführen müssen. Beide sind dynamisch und basieren auf Kafka-Themen. Die Verteilung der Join-Schlüsselwerte sehen Sie in der folgenden Abbildung. 
Eine Verzerrung der Daten ist in diesem Szenario fast sicher, da die Top-5-Werte über 50 % der Daten ausmachen. Außerdem ist der häufigste NULL-Wert in 32,6 % der Daten vorhanden. JOIN_KEY_VALUE ist ein nicht-negativer ganzzahliger Wert.
In Spark gibt es bei der Batch-Verarbeitung Techniken wie das Salzen von Join-Schlüsseln und optimierte Join-Strategien wie Broadcast Hash Join oder Storage Partitioned Join (eingeführt in Spark 3.3), die Probleme mit Datenschiefständen lösen können. Leider unterstützt Flink diese Techniken im Streaming-Modus nicht, und die bekannten Techniken von Spark können nicht direkt übernommen werden.
Nullverteilung
Flink verteilt Zeilen basierend auf dem Wert des Verknüpfungsschlüssels. Null stellt sicher, dass die Zeilen nicht zusammengefügt werden und kann durch den Verknüpfungsoperator weitergegeben werden. Allerdings leitet Flink SQL alle NULL-Werte an dieselbe Unteraufgabe weiter, so dass ein Eingriff erforderlich ist, um eine gleichmäßige Verteilung zu gewährleisten.
Die Lösung besteht in der Verwendung von Datenbeschränkungen und dem Ersetzen von NULL-Werten in der Join-Anweisung durch einen beliebigen Wert außerhalb des Bereichs. Es könnte zwar ein zufälliger Wert sein, aber ich empfehle, einen deterministischen Wert zu verwenden (deterministischer Wert = deterministische Pipeline). Da Flink keinen entsprechenden Wert auf der rechten Seite findet, wird es die linke Tabellenzeile unverbunden übergeben.
Um dies zu erreichen, habe ich beschlossen, eine negative Ganzzahl mit der MD5-Hash-Funktion auf der Grundlage des Primärschlüssels von TABLE_A zu berechnen. Der daraus resultierende Wert wird unabhängig vom PK-Typ oder Wertebereich immer im Bereich von -1999 bis -1000 liegen.
FROM
TABLE_A a
LEFT JOIN
TABLE_B a
ON
COALESCE(a.JOIN_KEY_VALUE, CAST(SUBSTRING(HEX(MD5(CAST(a.ID AS STRING))) FROM 0 FOR 3) AS INT) - 1999) = b.ID
Diese Abhilfe ist nicht auf reguläre Joins beschränkt. Sie gilt auch für temporale Joins. Bei temporalen Joins müssen Sie den COALESCE-Ausdruck in die berechnete Spalte in TABLE_A verschieben und ihn dann in der Join-Anweisung verwenden. Auch wenn dieser Ansatz als Workaround funktioniert, ist es wichtig zu wissen, dass Flink fehlschlägt, wenn Sie berechnete Ausdrücke direkt in einer temporalen Join-Anweisung verwenden.
Ergebnisse
Die Verteilung der Zeilen mit einem NULL-Join-Schlüssel hängt vom Primärschlüssel von TABLE_A ab, was zu einer ausgewogenen Verteilung führt. Dadurch wird das Problem der Datenschieflage effektiv gelöst.
Vorteile:
- Gut verteilte Nullwerte.
- Einfach zu implementieren.
- Benötigt keinen zusätzlichen Speicher.
- Geringe CPU-Kosten (Erzeugung von Pseudo-Zufallswerten im Falle von Null).
Nachteile:
- Erfordert eine freie Auswahl an Werten.
- Fügt zusätzliche Komplexität hinzu (die möglicherweise im "Kern" von Flink von vornherein gehandhabt werden könnte).
Pseudo-Broadcast-Verbindung
Dieses Konzept kombiniert die Salting-Key-Technik mit einem Broadcast-Join. Das Salt, das in diesem Artikel als BROADCAST_ID bezeichnet wird, verbessert die Datenverteilung, während der Pseudo-Broadcast die Verarbeitung von Zeilen durch mehrere Flink-Unteraufgaben ermöglicht.
Das Muster einer Pseudo-Broadcast-Verbindung sieht folgendermaßen aus:
CREATE TABLE_B (
...,
BROADCAST_ARRAY AS ARRAY[0,1,2,3,4,5,6,7] --step 1
) WITH (
...
);
CREATE VIEW TABLE_B_FLATTEN AS
SELECT
t.*,
b.BROADCAST_ID
FROM
TABLE_B t
CROSS JOIN --step 2
UNNEST(t.BROADCAST_ARRAY) AS b (BROADCAST_ID)
;
CREATE TABLE TABLE_A (
...,
BROADCAST_ID AS CAST(SUBSTRING(HEX(MD5(CAST(ID AS STRING))) FROM 0 FOR 4) AS INT) % 8 --step 3
) WITH (
...
);
SELECT
...
FROM
TABLE_A a
JOIN
TABLE_B_FLATTEN b
ON
a.JOIN_KEY_VALUE = b.ID
AND a.BROADCAST_ID = b.BROADCAST_ID --step 4
- Definieren Sie ein Array mit Werten zwischen 0 und n. Die Größe des Arrays ist gleich der Anzahl der daraus gebildeten Duplikate.
- Verwenden Sie einen unnest cross join, um BROADCAST_ARRAY zu reduzieren, indem Sie Zeilen mit einer anderen BROADCAST_ID duplizieren.
- Berechnen Sie BROADCAST_ID anhand des Primärschlüssels der linken Tabelle und wenden Sie die Modulo-Operation mit n an. Stellen Sie sicher, dass der resultierende Wert mit der BROADCAST_ID aus der rechten Tabelle übereinstimmt.
- Führen Sie die Verknüpfung zwischen Tabellen mit JOIN_KEY_VALUE und BROADCAST_ID durch.
Wie funktioniert das? Die rechte Tabelle wird mit einem Cross-Join-Unnest-Operator geglättet (da reines Cross-Join in Flink Streaming nicht unterstützt wird). Dieser Vorgang erzeugt Duplikate mit unterschiedlichen BROADCAST_IDs. Die duplizierten Zeilen werden auf mehrere Teilaufgaben verteilt, aber wir können nicht kontrollieren, wie Flink unsere Daten verteilt. 
In der vorliegenden Abbildung ist die Zeile mit JOIN_KEY_VALUE (B) nicht optimal verteilt. Zum Beispiel behält Subtask 0 die Instanzen (B, 1) und (B, 2), während Subtask 2 keine Zeile (B) enthält. Außerdem kann in unserem Szenario die Verknüpfung mit dem Wert (A) von jeder beliebigen Unteraufgabe verwaltet werden, während (B) von zwei der drei Unteraufgaben verarbeitet wird.
Der letzte Schritt ist die Berechnung der BROADCAST_ID in der linken Tabelle. Es ist wichtig, diesen Wert zu "drehen" und dabei so viele Teilaufgaben wie möglich zu verwenden. Ich habe mich entschieden, die MD5-Hash-Funktion des Primärschlüssels der linken Tabelle zu verwenden (ähnlich wie bei der NULL-Verteilung). Da meine linke Tabelle aus CDC-Ereignissen aufgebaut ist, sollten PK-Werte nicht häufig wiederholt werden.
BROADCAST-ARRAY
Das BROADCAST_ARRAY wird verwendet, um Zeilen zu replizieren, wobei seine Größe empirisch bestimmt wird. In einem idealen Szenario sollte seine Größe der Parallelität des Auftrags entsprechen. Da Sie jedoch keine Kontrolle über die Datenverteilung haben, ist es sinnvoll, einen höheren Wert zu verwenden, um die Verarbeitung in einer der Unteraufgaben zu erleichtern. Andererseits können übermäßige Duplikate Speicher verbrauchen und die Größe des Operator-Status sowie das anschließende Checkpointing beeinträchtigen. Daher habe ich die Größe des BROADCAST_ARRAY empirisch erhöht, um die Datenschieflage zu verringern. Die optimale Größe hängt eng mit Ihren Daten und der Parallelität des Auftrags zusammen.
Ergebnisse und Bewertung
Das Problem der Datenschieflage ist behoben. Flink hat mehrere Unteraufgaben für die Verarbeitung von Left Joins mit demselben Join-Schlüsselwert erzwungen. Dies garantiert zwar keinen perfekt ausgeglichenen Datenverkehr zwischen den Teilaufgaben, reduziert aber die Auswirkungen von Datenschiefständen innerhalb des Left-Join-Operators erheblich. Die entscheidenden Parameter sind hier der Bereich der BROADCAST_ID in der rechten Tabelle und die Rotation der BROADCAST_ID in der linken Tabelle.
Es ist jedoch wichtig zu wissen, dass das Duplizieren der richtigen Tabelle zusätzlichen Speicherplatz beansprucht. Jede Zeile wird im Status gehalten.
Diese Technik dient als Workaround für die Einschränkungen von Flink. Außerdem muss Flink immer noch Zeilen aus der linken Tabelle serialisieren und deserialisieren (es handelt sich nicht um einen Broadcast-Join, der an Ort und Stelle verarbeitet werden kann), und die Skalierbarkeit ist nicht automatisch. Um Ihren Auftrag zu skalieren, müssen Sie möglicherweise den BROADCAST_ARRAY-Bereich vergrößern. Nach dieser Änderung müssen Sie einen neuen Auftrag starten. Eine Aktualisierung des bestehenden Status ist nicht möglich.
Vorteile:
- Abhilfe für Probleme mit Datenschiefständen.
- Verbesserte Nutzung von Ressourcen und Teilaufgaben.
- Anwendbar auf reguläre und temporäre Verknüpfungen.
- Bietet ein allgemeines Muster.
Nachteile:
- Mangelnde Kontrolle über die Verteilung von Zeilen aus der richtigen Tabelle, was zu einer suboptimalen Verteilung führen kann.
- Erhöhte Speichernutzung, geeignet vor allem für kleine rechte Tabellen.
- Die Skalierbarkeit hängt stark von der Größe des BROADCAST_ARRAY ab.
- Um die Pseudo-Broadcast-Join-Parallelität zu erhöhen, muss ein neuer Job gestartet werden.
Fazit
Der Umgang mit Datenschiefständen in Flink SQL ist für eine effiziente und ausgewogene Datenverarbeitung von entscheidender Bedeutung. Durch die Implementierung von Strategien wie der deterministischen Behandlung von NULL-Werten und der Verwendung von Pseudo-Broadcast-Joins können Dateningenieure die Schiefe deutlich verringern und die Leistung verbessern. Mit der Weiterentwicklung von Flink bieten diese Techniken eine solide Grundlage für die Bewältigung einer der größten Herausforderungen bei der Stream-Verarbeitung.
Zusammenfassung
Um Datenverzerrungen in Flink SQL zu verringern, müssen Sie die zugrunde liegenden Konzepte verstehen, effektive Join-Strategien implementieren und die Ergebnisse auswerten. Durch die Anwendung von Best Practices können Unternehmen eine effiziente und ausgewogene Verarbeitung sicherstellen und den Weg für robuste Echtzeit-Datenanalysen ebnen. Möchten Sie in Ihren Flink SQL-Projekten Probleme mit Datenschiefständen angehen? Füllen Sie das untenstehende Formular aus.
Verfasst von
Maciej Maciejko
Unsere Ideen
Weitere Blogs
Contact




