Blog
Schreiben von Flink-Aufträgen unter Verwendung des Spring-Frameworks für die Injektion von Abhängigkeiten

Einführung
Vor fast zwei Jahrzehnten wurde die erste Version des Spring-Frameworks veröffentlicht.
In dieser Zeit wurde Spring zum Fundament, auf dem die meisten Java-Unternehmensanwendungen aufgebaut wurden.
Man kann mit Fug und Recht behaupten, dass es mehr Programmierer gibt, die wissen, wie man eine Anwendung mit dem Spring-Framework schreibt, als Programmierer, die wissen, wie man Flink-Jobs mit der Streaming-API schreibt. Viele von denen, die Flink-Jobs für ihr Unternehmen schreiben möchten, würden gerne die gleichen Muster und Frameworks verwenden, die ihnen vertraut sind und als Industriestandard anerkannt sind. In diesem Artikel möchte ich Ihnen zeigen, wie Sie das Spring-Framework verwenden können, um Flink-Jobs zu schreiben. Dies ist etwas, das auf der Flink-Benutzerliste und in den Slack-Kanälen immer wieder angefragt wurde und das meiner Meinung nach noch Neuland ist.
Geschäftslogik
Lassen Sie uns zunächst unseren Beispielanwendungsfall einrichten. Wir haben eine Geschäftslogik, die in einer Java-Bibliothek ausgedrückt ist und derzeit als Spring Boot Microservice läuft. Das Ziel ist es, diese Logik unter Verwendung derselben Anwendungskonfiguration und desselben Basiscodes nach Flink zu migrieren.
Die Aufgabe unserer Anwendung besteht darin, einen Strom von Ereignissen zu verarbeiten, der als Entität "Bestellung" ausgedrückt wird. Zweitens, die Anonymisierung der Informationen über die Beteiligten anzuwenden und dann die bereits erstellte Sitzungsnummer zu erstellen oder zuzuweisen.
Die Geschäftslogik wird durch das Modul `BusinessLogic' ausgedrückt.
Die wichtigsten Komponenten sind:
OrderundSessionizeOrderKlassen - ein Modell darstellen.- An
OrderProcessorinterface that has implementations for:- Anonymisierung -
SideNameAnonymizationKlasse - Hinzufügen von Sitzungsinformationen -
OrderSessionizeclass
- Anonymisierung -
Das Modul BusinessLogic ist unabhängig von einem Framework. Es ist jedoch so geschrieben, dass alle "Abhängigkeiten" über Konstruktoren bereitgestellt werden sollten. Es folgt also dem Muster der Dependency Injection.
Frühling
Die Spring Boot-Anwendung, in der unsere Geschäftslogik ausgeführt wird, ist in der Datei SpringBootBusinessApp Modul implementiert. Die Spring-Boot-Konfiguration wird durch die unten dargestellte SpringConfig-Klasse repräsentiert:
@Configuration
@EnableScheduling
@EnableAutoConfiguration
public class SpringConfig {
@Bean
public SessionManager createSessionManager() {
return new SimpleSessionManager();
}
@Bean
public OrderSessionize orderSessionize(SessionManager sessionManager) {
return new OrderSessionize(sessionManager);
}
@Bean
public List<OrderProcessor<Order>> orderProcessors() {
return List.of(new SideNameAnonymization());
}
@Bean("businessOrderProcessor")
public OrderProcessor<SessionizeOrder> createOrderProcessor(
List<OrderProcessor<Order>> orderProcessors,
OrderSessionize orderSessionize) {
return new BusinessOrderProcessor(orderProcessors, orderSessionize);
}
}
Dies ist eine recht unkomplizierte Konfiguration. Die Hauptlogik wird durch die Bean "businessOrderProcessor'' dargestellt, die eine Liste von Auftragsverarbeitern und eine OrdrSiessionize-Bean benötigt. Beide Abhängigkeiten werden von Spring auf der Grundlage dieser Konfigurationsklasse erstellt. Die Bean OrderSessionize delegiert die Sitzungsverwaltung an SessionManager, dessen Instanz ebenfalls von Spring injiziert wird. In diesem Fall verwenden wir einen einfachen Map-basierten Cache. Wir werden später sehen, dass dies für Flink Jobs geändert werden kann.
Apache Flink mit Spring
Wie wir alle wissen, ist Flink kein Microservice und ein Flink-Job ist auch nichts dergleichen. Sie werden also nichts dergleichen sehen:
public static void main(String[] args) {
SpringApplication.run(FlinkApplication.class, args);
}
was in Spring Boot-Anwendungen erwartet wird. Dieser Artikel zeigt Ihnen, wie Sie Spring als Framework für die Injektion von Abhängigkeiten verwenden können, um Ihren Flink-Job einzurichten, ähnlich wie Sie es beim Schreiben einer Standard-Microservice-Anwendung tun würden.Der Beispiel-Flink-Job, der Spring verwendet, wird im Modul FlinkPipeline vorgestellt.Der Einstiegspunkt unseres Beispiel-Jobs ist die Java-Klasse DataStreamJob.
public class DataStreamJob {
@Autowired
private EventProducer<Order> eventProducer;
@Autowired
private SinkFunction<SessionizeOrder> sink;
@Autowired
private ProcessFunction<Order, SessionizeOrder> businessLogic;
public static void main(String[] args) throws Exception {
// Add Job argument to System arguments, so we can tell Spring via job arguments which bean
// should be loaded. This is done by @ConditionalOnProperty
ParameterTool parameterTool = ParameterTool.fromArgs(args);
Properties argProperties = parameterTool.getProperties();
System.getProperties().putAll(argProperties);
new ContextRegistry()
.autowiredBean(new DataStreamJob(), "org.example.config")
.run(parameterTool);
}
private void run(ParameterTool parameterTool) throws Exception {
StreamExecutionEnvironment env = createStreamEnv();
env.addSource(new CheckpointCountingSource<>(5, 5, eventProducer))
.setParallelism(1)
.process(businessLogic)
.setParallelism(2)
.addSink(sink)
.setParallelism(2);
env.execute("Flink Job Powered By Spring DI.");
}
}
Dies ist eine einfache Pipeline, die Folgendes enthält:
- source - CheckpointCountingSource (ein großes Lob an Grzegorz Kołakowski von Getindata für diesen Artikel)
- Prozessfunktion - Implementierung basierend auf der Spring-Konfigurationsklasse.
- sink - Implementierung basierend auf der Spring-Konfigurationsklasse.
Die CheckpointCountingSource erzeugt eine feste Anzahl von Ereignissen pro Checkpoint für die Dauer von X Checkpoints. Der genaue Typ des erzeugten Ereignisses und die Logik für seine Erzeugung werden @Autowired markiert ist, was bedeutet, dass seine konkrete Implementierung von Spring injiziert wird.
Auch die konkrete Implementierung von sink wird von Spring injiziert:
@Autowired
private SinkFunction<SessionizeOrder> sink;
Lassen Sie uns nun die Java-Klasse FlinkBusinessLogic untersuchen. Diese Klasse implementiert die ProcessFunction von Flink und wird von Spring injiziert und initialisiert.
public class FlinkBusinessLogic extends ProcessFunction<Order, SessionizeOrder> {
@Autowired
@Qualifier("businessOrderProcessor")
private transient OrderProcessor<SessionizeOrder> orderProcessor;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
new ContextRegistry().autowiredBean(this, "org.example.config");
}
@Override
public void processElement(Order value, Context ctx, Collector<SessionizeOrder> out) throws Exception {
SessionizeOrder newOrder = orderProcessor.process(value);
out.collect(newOrder);
}
}
Dies ist eine sehr einfache Implementierung von ProcessFunction von Flink, die die Ausführung der Geschäftslogik an OrderProcessor type aus dem BusinessLogic-Modul delegiert.
Wichtig ist dabei, dass OrderProcessor als markiert ist:
@Autowired
@Qualifier("businessOrderProcessor")
private OrderProcessor<SessionizeOrder> orderProcessor;
Das bedeutet, dass die konkrete Implementierung OrderProcessor von Spring injiziert wird, indem der Bean-Name mit der Zeichenkette "businessOrderProcessor" abgeglichen wird.Die Spring-Konfigurationsklasse für den Flink-Job ist in zwei Teile aufgeteilt.
@Configuration
public class FlinkPipelineConfig {
@Bean
public EventToStringConverter<SessionizeOrder> converter() {
return event -> String.format("Order Details - %s", event.toString());
}
@Bean
public SinkFunction<SessionizeOrder> sink(EventToStringConverter<SessionizeOrder> converter) {
return new ConsoleSink<>(converter);
}
@Bean
public EventProducer<Order> eventProducer() {
return new OrderProducer();
}
@Bean
@ConditionalOnProperty(
value="business.logic",
havingValue = "standard",
matchIfMissing = true)
public ProcessFunction<Order, SessionizeOrder> businessLogic() {
return new FlinkBusinessLogic();
}
@Bean
@ConditionalOnProperty(
value="business.logic",
havingValue = "sleep")
public ProcessFunction<Order, SessionizeOrder> suspiciousLogic() {
return new SuspiciousFlinkBusinessLogic();
}
}
und:
@Configuration
public class BusinessLogicSpringConfig {
@Bean
public SessionManager createSessionManager() {
return new SimpleSessionManager();
}
@Bean
public OrderSessionize orderSessionize(SessionManager sessionManager) {
return new OrderSessionize(sessionManager);
}
@Bean
public List<OrderProcessor<Order>> orderProcessors() {
return List.of(new SideNameAnonymization());
}
@Bean("businessOrderProcessor")
public OrderProcessor<SessionizeOrder> createOrderProcessor(
List<OrderProcessor<Order>> orderProcessors,
OrderSessionize orderSessionize) {
return new BusinessOrderProcessor(orderProcessors, orderSessionize);
}
}
Die erste stellt eine Spring-Konfiguration für die Flink-Pipeline dar. Sie definiert die Quell- und Sink-Abhängigkeiten der Pipeline. Es ist erwähnenswert, dass Spring diese Implementierung nur dann verwendet, wenn die Eigenschaft “business.logic” auf "sleep" gesetzt wurde, würde stattdessen die SuspiciousFlinkBusinessLogic verwendet werden. Dies ist möglich durch die Verwendung einer spring-boot @ConditionalOnProperty Annotation auf der Bean-Factory-Methode in der Spring-Konfigurationsklasse. Die zweite Klasse definiert die Abhängigkeiten der Kerngeschäftslogik auf genau dieselbe Weise, wie es für das SpringBootBusinessApp Modul gemacht wurde. Wenn wir unseren Job an den Flink-Cluster übermitteln, sehen wir den folgenden erstellten Job-Graphen:

Wenn wir die Aufgabe "Verarbeiten" untersuchen, werden wir feststellen, dass sie auf zwei Task-Managern läuft:

Dies bestätigt die Tatsache, dass die FlinkBusienssLogic-Prozessfunktion zweimal auf verschiedenen Rechnern erstellt wurde, wobei der Spring-Kontext für jede Instanz erstellt wurde.
Wenn wir schließlich die von Spring injected ConsoleSink ausgegebenen Protokolle untersuchen, sehen wir dies:

Wie Sie sehen, haben wir soeben einen funktionierenden Flink-Job bereitgestellt, der auf mehreren Task-Manager-Knoten unter Verwendung des Spring Dependency Injection Frameworks läuft.
Wie funktioniert das?
Im vorigen Kapitel haben wir gesehen, dass wir mit dem Spring-Framework für Flink Jobs Klassenabhängigkeiten injizieren können. Die geheime Zutat ist hier unsere Open-Source-Bibliothek flink-spring, die alle Spring-Abhängigkeiten in einem Jar bündelt und eine einfache API für die Erstellung und Verwendung von Spring-Kontext in Flink-Jobs bietet. Sie müssen lediglich flink-spring-0.1.0-SNAPSHOT-jar-with-dependencies.jar in den lib-Ordner von Flink einfügen und den Cluster neu starten. Die Details zur Erstellung dieser jar-Datei finden Sie im Handbuch der flink-spring-Bibliothek. Um von einem Flink-Job aus auf Spring-Klassen zugreifen zu können, müssen Sie eine neue Abhängigkeit hinzufügen.
<dependency>
<groupId>com.getindata</groupId>
<artifactId>flink-spring</artifactId>
<version>0.1.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
Mit dieser hinzugefügten Abhängigkeit haben Sie nun Zugriff auf die Spring-Bibliotheken und können die Annotationen @Autowired und @Bean verwenden. Die Schlüsselkomponente für die Bibliothek flink-spring ist die Klasse ContextRegistry. Diese Klasse bietet eine API, mit der Benutzer den Spring-Kontext auf der Grundlage der Spring-Java-Konfigurationsklassen initialisieren und alle als @Autowired gekennzeichneten Abhängigkeiten aus diesem Kontext injizieren können. Dies geschieht einfach durch einen Aufruf:
ContextRegistry registry = new ContextRegistry();
registry.autowireBean(objToInjectDependenciesTo, “configuration.package”)
Im Fall unserer Anwendung FlinkPipeline haben wir dies an zwei Stellen getan: Das erste Mal in unserer Hauptklasse DataStreamJob, wo wir eine konkrete Implementierung für Flink Sink und unsere benutzerdefinierte Schnittstelle EventProducer injiziert haben, etwa so:
new ContextRegistry()
.autowiredBean(new DataStreamJob(), "org.example.config")
.run(parameterTool);
Das zweite Mal war in unserer benutzerdefinierten Prozessfunktion → FlinkBusinessLogic::open() Methode, wo wir den Spring-Kontext initialisiert und die Instanzen von OrderProcessor aus unserem BusinessLogic Modul injiziert haben.
Diese beiden Beispiele zeigen, dass wir den Spring-Kontext sowohl im Job Manager in der Phase der Auftragsübermittlung als auch im Task Manager in der Phase der Aufgabeninitialisierung initialisieren und verwenden können.
Das Besondere an der Klasse ContextRegistry ist, dass sie einen Cache der bereits erstellten Kontextobjekte speichert. Das heißt, wenn Sie mehr als ein Objekt mit demselben Spring-Kontext "initialisieren" möchten, wird der Kontext nur einmal erstellt. Allerdings müssen Sie bedenken, dass dieser Cache vergänglich ist. Die Regel gilt also nur für den Bereich Job Manager und Task Operator im Task Manager.
Randnotizen und Überlegungen
Die flink-spring-Bibliothek ist in ihrem derzeitigen Zustand ein PoC-Projekt, das zeigen soll, dass die Verwendung des Spring-Frameworks für die Injektion von Abhängigkeiten eine mögliche und recht unkomplizierte Aufgabe für die Entwicklung von Flink-Jobs unter Verwendung von Streaming und der Tabellen-API ist.
Es gibt zwei Überlegungen, die man berücksichtigen muss:
- Derzeit sind im flink-spring uber jar keine Spring-Abhängigkeiten schattiert. Das bedeutet, dass es einen potenziellen Versionskonflikt zwischen Flink und flink-spring geben könnte. Trotz dieser Möglichkeit wurden zum Zeitpunkt der Erstellung dieses Artikels keine derartigen Konflikte beobachtet.
- Die Klasse
ContextRegistrylädt den Spring-Kontext bei Bedarf aus bereitgestellten Konfigurationsklassen. Da der Lebenszyklus der Anwendung nicht von Spring verwaltet wird und der erstellte Kontext nur kurzlebig ist (nur im Bereich vonContextRegistry::autowireBean), bedeutet dies, dass Sie keine globale Garantie dafür haben, dass der Spring-Bereich der erstellten Bean erhalten bleibt. Mit anderen Worten, wenn wir eine Bean mit Standard-Singleton-Scope haben, haben wir zwei contextRegistry-Instanzen wie diese:
new ContextRegistry().autowireBean(...);
new ContextRegistry().autowireBean(...);
würde zwei exakt gleiche Beans erzeugen, obwohl die Bean-Definition besagt, dass der Geltungsbereich Singleton sein sollte.
Durch den in ContextRegistry implementation verwendeten Cache bleibt der Geltungsbereich der Bean im Kontext der einzelnen ContextRegistry Instanzen erhalten.
Der zweite Punkt ist besonders wichtig, wenn wir an die Kosten für die Initialisierung des Spring-Kontexts denken. Sie können sich einen Auftrag vorstellen, bei dem jeder Operator seinen eigenen ContextRegistry() instance initialisiert. Multiplizieren Sie dies mit dem Parallelisierungsgrad des Auftrags/der Aufgabe und Sie können leicht erkennen, dass wir am Ende die Initialisierung vieler Spring-Kontexte haben, wobei jeder Operator nur eine Handvoll erstellter Beans verwendet. Dies kann die für die Auftragsinitialisierung benötigte Zeit erhöhen. Eine gute Praxis wäre hier, nur die benötigten Beans zu initialisieren. Dies kann auf zwei Arten erreicht werden:
- Annotieren Sie Ihre Konfigurationsklassen mit
@Lazy - Teilen Sie Ihre Spring-Konfiguration in mehrere Dateien auf und verwenden Sie nur die Datei, die Sie für die Task-Initialisierung benötigen.
Fazit
In diesem Blog-Beitrag haben wir gezeigt, dass es möglich und relativ einfach ist, Spring als Dependency Injection Framework für die Erstellung von Flink-Aufträgen mit unserer kleinen Bibliothek flink-spring zu verwenden.
Wir haben auch gezeigt, dass Sie die Spring-Konfigurationsklassen, die Sie bereits für Ihre anderen Geschäftsanwendungen haben, für die Einrichtung von Flink-Jobs mit dem Spring-Framework verwenden können. Wir haben auch bewiesen, dass dies für Job- und Task-Manager-Knoten möglich ist und dass Sie sowohl Flink-basierte Implementierungen (Sinks, Quellen) als auch Ihre benutzerdefinierten Geschäftscode-Klassen einbinden können. Wenn Sie noch einen Schritt weiter gehen, können Sie auch die SessionManager -Implementierung in eine Implementierung umwandeln, die den Map-Status von Flink anstelle der Java-Map verwendet.Wir hoffen, dass dies einigen von Ihnen bei der Einführung von Flink in Ihren Unternehmen hilft, indem wir zeigen, dass gut etablierte Industrie-Frameworks und -Muster immer noch zum Schreiben von Flink-Jobs verwendet werden können.
Verfasst von
Krzysztof Chmielewski
Unsere Ideen
Weitere Blogs
Contact



