Blog

Flink mit MLeap

Bartosz Chodnicki

Aktualisiert Oktober 16, 2025
17 Minuten

MLOps mit Stream Processing

In der Welt der Big Data entdecken immer mehr Unternehmen das Potenzial der schnellen Datenverarbeitung mit Stream-Technologien. Wir müssen nicht einmal erwähnen, wie nützlich Machine Learning ist, denn das versteht sich von selbst.

Normalerweise beherrschen nicht viele Experten diese beiden Bereiche. Die Integration Ihrer Streaming-Architektur mit den Leistungen Ihres Data Scientist-Teams ist also möglicherweise nicht so einfach.

In diesem Artikel möchte ich einen der möglichen Wege aufzeigen, wie diese Art der Integration möglich ist. Um dies zu erreichen, habe ich Apache Flink, eine weltweit verbreitete Streaming-Engine, und MLeap, eine Serialisierungsbibliothek für maschinelle Lernmodelle für verschiedene Programmiersprachen. Weitere Informationen über MLeap und darüber, wie man ein Modell in Python erstellt und in Scala ausführt, finden Sie in meinem früheren Artikel hier.

Die Idee dieses Artikels ist es, zu erklären, wie das in Python trainierte ML-Modell in der Flink-Pipeline verwendet werden kann und Informationen über die Flink-Bibliothek zu liefern, an der wir bei GetInData arbeiten. Details finden Sie hier.

Motivation für Flink mit MLeap

Warum sollten ML-Modelle innerhalb einer Streaming-Engine bereitgestellt werden? Warum nicht einfach einen HTTP-Dienst wiederverwenden, der uns Vorhersagen liefert oder (vorhergesagte und gespeicherte Daten) aus der Datenbank liest?

Hier sind drei Argumente:

  • Der Bedarf an extrem niedrigen Latenzen - das Ausliefern von Vorhersagen innerhalb von Streaming-Aufträgen ist viel schneller, da wir die durch Verbindungen zu einem HTTP-Server oder einer DB verursachten Netzwerklatenzen weglassen können. Ganz zu schweigen von der Tatsache, dass das Netzwerk manchmal ausfällt.
  • Service-Entkopplung - was ist, wenn andere Teams die REST-JSON-Nutzdaten ändern oder ein Datenanalyst die Spalte in der Datenbank umbenennt? Die Speicherung von Modellen innerhalb von Flink-Jobs ist eine einfache Lösung.
  • Kosten - niemand spricht gerne darüber, dennoch sind sie einer der wichtigsten Faktoren, wenn Projekte erfolgreich sein sollen. Die Unterhaltung zusätzlicher HTTP-Server und DBs + Pipelines, die Vorhersagen berechnen und in der DB speichern, verursacht Kosten. Ihr Managementteam mag keine Kosten, da bin ich mir zu 100% sicher!

Natürlich bringt die Speicherung von ML-Modellen in Jobs andere Herausforderungen mit sich, wie z.B.:

  • Wie laden Sie das Modell?
  • Wie machen Sie Vorhersagen?
  • Mein Team verwendet Flink SQL. Ist es möglich, mit dieser API Vorhersagen zu treffen?

Um diese Fragen zu beantworten, haben wir die Flink-MLeap Bibliothek erstellt, die Sie hier finden können hier.

Anwendungsfall

Stellen Sie sich vor, Sie sind ein Data Scientist (oder vielleicht sind Sie es sogar) und möchten die bestehende Flink-Infrastruktur nutzen, um Ihr trainiertes ML-Modell anzuschließen und Vorhersagen über gestreamte Daten zu treffen. Sie sind ein vielbeschäftigter Mensch und möchten keine Zeit damit verschwenden, eine völlig neue Technologie zu erlernen. Das ist in Ordnung, Sie sind nicht faul! Das ist nur natürlich, denn niemand kann alle Technologien der Welt erlernen.

Konzentrieren wir uns auf das, was Sie wissen, und wie Sie dieses Wissen nutzen können. Sie sind ein "Datentyp", also haben Sie wahrscheinlich schon von SQL gehört oder vielleicht sogar Tausende von Abfragen in dieser Sprache geschrieben. Gut so! Als Sie über Flink gelesen haben, sind Sie auf diesen interessanten Blogbeitrag über die Erstellung von Flink-Jobs mit SQL gestoßen, wie zum Beispiel diesen hier.

Die Verbindung zum Flink-Cluster über den SQL-Client und das Schreiben von Abfragen zur Vorhersage von Streams sollte so einfach und natürlich sein, oder?

Verwendung

Mit unserer Bibliothekkönnen Sie mühelos ML-Modelle in einer Stream-Umgebung bereitstellen:

Also erstellen wir zunächst einen Stream mit Merkmalen mithilfe von SQL. Dazu verwenden wir einen Datagen-Konnektor, einen Hilfs-Konnektor in Flink, der einen Stream mit Zufallswerten erzeugt - sehr nützlich für die Entwicklungsphase.

Tutorial

8 Minuten lesen

Flink mit MLeap

MLOps mit Stream Processing

In der Welt der Big Data entdecken immer mehr Unternehmen das Potenzial der schnellen Datenverarbeitung mit Stream-Technologien. Wir müssen nicht einmal erwähnen, wie nützlich Machine Learning ist, denn das versteht sich von selbst.

Normalerweise beherrschen nicht viele Experten diese beiden Bereiche. Die Integration Ihrer Streaming-Architektur mit den Leistungen Ihres Data Scientist-Teams ist also möglicherweise nicht so einfach.

In diesem Artikel möchte ich einen der möglichen Wege aufzeigen, wie diese Art der Integration möglich ist. Um dies zu erreichen, habe ich Apache Flink, eine weltweit verbreitete Streaming-Engine, und MLeap, eine Serialisierungsbibliothek für maschinelle Lernmodelle für verschiedene Programmiersprachen. Weitere Informationen über MLeap und darüber, wie man ein Modell in Python erstellt und in Scala ausführt, finden Sie in meinem früheren Artikel hier.

Die Idee dieses Artikels ist es, zu erklären, wie das in Python trainierte ML-Modell in der Flink-Pipeline verwendet werden kann und Informationen über die Flink-Bibliothek zu liefern, an der wir bei GetInData arbeiten. Details finden Sie hier.

Motivation für Flink mit MLeap

Warum sollten ML-Modelle innerhalb einer Streaming-Engine bereitgestellt werden? Warum nicht einfach einen HTTP-Dienst wiederverwenden, der uns Vorhersagen liefert oder (vorhergesagte und gespeicherte Daten) aus der Datenbank liest?

Hier sind drei Argumente:

  • Der Bedarf an extrem niedrigen Latenzen - das Ausliefern von Vorhersagen innerhalb von Streaming-Aufträgen ist viel schneller, da wir die durch Verbindungen zu einem HTTP-Server oder einer DB verursachten Netzwerklatenzen weglassen können. Ganz zu schweigen von der Tatsache, dass das Netzwerk manchmal ausfällt.
  • Service-Entkopplung - was ist, wenn andere Teams die REST-JSON-Nutzdaten ändern oder ein Datenanalyst die Spalte in der Datenbank umbenennt? Die Speicherung von Modellen innerhalb von Flink-Jobs ist eine einfache Lösung.
  • Kosten - niemand spricht gerne darüber, dennoch sind sie einer der wichtigsten Faktoren, wenn Projekte erfolgreich sein sollen. Die Unterhaltung zusätzlicher HTTP-Server und DBs + Pipelines, die Vorhersagen berechnen und in der DB speichern, verursacht Kosten. Ihr Managementteam mag keine Kosten, da bin ich mir zu 100% sicher!

Natürlich bringt die Speicherung von ML-Modellen in Jobs andere Herausforderungen mit sich, wie z.B.:

  • Wie laden Sie das Modell?
  • Wie machen Sie Vorhersagen?
  • Mein Team verwendet Flink SQL. Ist es möglich, mit dieser API Vorhersagen zu treffen?

Um diese Fragen zu beantworten, haben wir die Flink-MLeap Bibliothek erstellt, die Sie hier finden können hier.

Anwendungsfall

Stellen Sie sich vor, Sie sind ein Data Scientist (oder vielleicht sind Sie es sogar) und möchten die bestehende Flink-Infrastruktur nutzen, um Ihr trainiertes ML-Modell anzuschließen und Vorhersagen über gestreamte Daten zu treffen. Sie sind ein vielbeschäftigter Mensch und möchten keine Zeit damit verschwenden, eine völlig neue Technologie zu erlernen. Das ist in Ordnung, Sie sind nicht faul! Das ist nur natürlich, denn niemand kann alle Technologien der Welt erlernen.

Konzentrieren wir uns auf das, was Sie wissen, und wie Sie dieses Wissen nutzen können. Sie sind ein "Datentyp", also haben Sie wahrscheinlich schon von SQL gehört oder vielleicht sogar Tausende von Abfragen in dieser Sprache geschrieben. Gut so! Als Sie über Flink gelesen haben, sind Sie auf diesen interessanten Blogbeitrag über die Erstellung von Flink-Jobs mit SQL gestoßen, wie zum Beispiel diesen hier.

Die Verbindung zum Flink-Cluster über den SQL-Client und das Schreiben von Abfragen zur Vorhersage von Streams sollte so einfach und natürlich sein, oder?

Verwendung

Mit unserer Bibliothekkönnen Sie mühelos ML-Modelle in einer Stream-Umgebung bereitstellen:

Also erstellen wir zunächst einen Stream mit Merkmalen mithilfe von SQL. Dazu verwenden wir einen Datagen-Konnektor, einen Hilfs-Konnektor in Flink, der einen Stream mit Zufallswerten erzeugt - sehr nützlich für die Entwicklungsphase.

// Create table with features
CREATE TABLE Features (
  feature1 DOUBLE NOT NULL,
  feature2 INT NOT NULL,
  feature3 DOUBLE NOT NULL,
  feature_timestamp TIMESTAMP(3))
WITH ( 
'connector' = 'datagen',
'number-of-rows' = '10',
'fields.feature1.min' = '0.0',
'fields.feature1.max' = '1.0'
)

Als Nächstes machen wir auf der Grundlage dieser Merkmale Vorhersagen:

// Execute predictions
SELECT
  Predict(feature1, feature2, feature3) as prediction,
  Predictv2(feature1) as prediction2
FROM Features

Wie Sie sehen können, verwenden wir hier die benutzerdefinierten SQL-Funktionen Predict und Predictv2 für unsere Merkmale. Sie können eine unterschiedliche Anzahl von Argumenten und Typen annehmen. Die Namen der Funktionen und Modelle, die sie verwenden, können einfach in der Konfiguration definiert werden.

Im Folgenden finden Sie weitere technische Aspekte der Bibliothek, wie wir sie aufgebaut haben und Beispiele für ihre Verwendung und Konfiguration.

FLINK SQL API

Wir haben uns mehr auf die Flink SQL-API konzentriert und mehr Hilfsprogramme für diese API vorbereitet, damit jemand, der mit Flink nicht so vertraut ist oder sich mit SQL wohler fühlt als mit Java/Scala, das ML-Modell mühelos in Flink-Jobs verwenden kann.

Zu diesem Zweck haben wir MLeapUDFRegistry vorbereitet. Der Hauptzweck dieser Registrierung besteht darin, UDFs (Flink user defined functions) zu registrieren, die später in SQL-Abfragen verwendet werden können. Um Ihre UDFs hinzuzufügen, können Sie sie in der application.conf wie folgt definieren:

mleap {
	udfRegistry = [
    	{
        	udfName = "Predict"
        	bundlePath = "/mleap-example-1"
        	bundleSource = "file"
    	},
    	{
           udfName = "Predictv2"
        	bundlePath = "/mleap-example-2"
        	bundleSource = "file"
    	}
	]
}

Und führen Sie MLeapUDFRegistry.registerFromConfig(config, tableEnv) aus, bevor Sie Ihre Abfragen ausführen, wie wir es in diesen Beispielanwendungen getan haben: FlinkSqlWithMLeap.

...
	val env = StreamExecutionEnvironment.getExecutionEnvironment
	val tableEnv = StreamTableEnvironment.create(env)
 
	// Register UDFs basing on config
	val config = ConfigFactory.load()
	MLeapUDFRegistry.registerFromConfig(config, tableEnv)
 
...

Eine weitere Sache, die recht problematisch sein kann, ist das Schreiben spezifischer UDFs für jedes ML-Modell. Das ist natürlich die einfachste, aber auch die zeitaufwändigste Methode. Deshalb haben wir eine sehr generische MLeapUDF definiert, die für jedes MLeap-Bündel wiederverwendet werden kann.

Dank MLeapUDFRegistry und MLeapUDF geht die Verwendung von ML-Modellen mit SQL sehr leicht von der Hand. Sehen Sie sich nur die Anwendungen von FlinkSqlWithMLeap an. Jeder, der sich mit SQL auskennt und ein ML-Modell besitzt, kann es problemlos mit Flink verwenden.

Guten Appetit!

Code wiederbeleben

Werfen wir einen kurzen Blick auf den Code. Wir haben dieses Projekt in Scala geschrieben. Es enthält zwei Module:

  • lib - mit Bibliotheksklassen
  • Beispiel - mit Anwendungsbeispielen

In dem Bibliotheksmodul haben wir zwei Flink-APIs behandelt: Streaming und SQL, damit sie in jedem Auftrag wiederverwendet werden können.

ML-Modelle laden

Um ein Modell zu laden, müssen wir zunächst eines erstellen. Ich werde die im vorherigen Artikel vorgestellten Modelle wiederverwenden Artikel.

Random Forest Regressoren nehmen einen Float als Eingabe und geben einen Float als Ausgabe.

Um die Modelle in die Flink-Jobs zu laden, haben wir BundleLoaders erstellt. Das eine ist FileBundleLoader, das Bundles aus lokalen Dateien lädt. Die andere ist GCSBundleLoader, die Modelle von Google Cloud Storage Bucket abrufen und in Flink-Jobs verwenden kann.

Streaming-API

In unserer Bibliothek konzentrieren wir uns mehr auf SQL-Beispiele, weil die Zielgruppe für diese Funktion größer ist. Ich glaube, dass mehr Data Scientists SQL als Java beherrschen. Davon abgesehen war die Streaming-API ein guter Ausgangspunkt, um zu prüfen, ob es überhaupt möglich ist, Aufträge mit MLeap-Modellen auszuführen.

In MleapMapFunction haben wir eine Möglichkeit vorgestellt, MLeap-Bundles zu verwenden. Wir haben das Modell mit der offenen Methode geladen.

case class MleapMapFunction(bundleName: String, bundleLoader: BundleLoader) extends
  RichMapFunction[Double, Double] {
 
  private val LOG = LoggerFactory.getLogger(classOf[MleapMapFunction])
  @transient var transformer: Transformer = _
 
  override def open(parameters: Configuration): Unit = {
	transformer = bundleLoader.loadBundle(bundleName) match {
  	case Failure(exception) => {
    	LOG.error(s"Error while loading bundle: $bundleName", exception)
    	throw BundleLoadProblem(exception)
  	}
  	case Success(value) => value
	}
  }
 
  override def map(value: Double): Double = {
	val dataset = Seq(Row(DenseTensor(Array(value), List(1))))
	val frame = DefaultLeapFrame(transformer.inputSchema, dataset)
	val res = transformer.transform(frame).get.dataset.head(1).asInstanceOf[Double]
	res
  }
}

Dann machen wir mit der Kartenmethode Vorhersagen. Wie Sie sehen können, war das eine sehr einfache Lösung.

Um dies zu testen, haben wir einen einfachen Fink-Job FlinkDatastreamWithMleap implementiert:

object FlinkDatastreamWithMleap {
  def main(args: Array[String]): Unit = {
 
	implicit val typeInfo = TypeInformation.of(classOf[StructType])
	val env = StreamExecutionEnvironment.getExecutionEnvironment
 
	val rand: Random = new Random()
 
	val text = env.fromElements(rand.nextDouble(), rand.nextDouble(), rand.nextDouble())
	val bundlePath = getClass.getResource("/mleap-example-1").toString
 
	text.map(MleapMapFunction(bundlePath, FileBundleLoader)).print()
 
	env.execute()
  }
}

Tutorial

8 Minuten lesen

Flink mit MLeap

MLOps mit Stream Processing

In der Welt der Big Data entdecken immer mehr Unternehmen das Potenzial der schnellen Datenverarbeitung mit Stream-Technologien. Wir müssen nicht einmal erwähnen, wie nützlich Machine Learning ist, denn das versteht sich von selbst.

Normalerweise beherrschen nicht viele Experten diese beiden Bereiche. Die Integration Ihrer Streaming-Architektur mit den Leistungen Ihres Data Scientist-Teams ist also möglicherweise nicht so einfach.

In diesem Artikel möchte ich einen der möglichen Wege aufzeigen, wie diese Art der Integration möglich ist. Um dies zu erreichen, habe ich Apache Flink, eine weltweit verbreitete Streaming-Engine, und MLeap, eine Serialisierungsbibliothek für maschinelle Lernmodelle für verschiedene Programmiersprachen. Weitere Informationen über MLeap und darüber, wie man ein Modell in Python erstellt und in Scala ausführt, finden Sie in meinem früheren Artikel hier.

Die Idee dieses Artikels ist es, zu erklären, wie das in Python trainierte ML-Modell in der Flink-Pipeline verwendet werden kann und Informationen über die Flink-Bibliothek zu liefern, an der wir bei GetInData arbeiten. Details finden Sie hier.

Motivation für Flink mit MLeap

Warum sollten ML-Modelle innerhalb einer Streaming-Engine bereitgestellt werden? Warum nicht einfach einen HTTP-Dienst wiederverwenden, der uns Vorhersagen liefert oder (vorhergesagte und gespeicherte Daten) aus der Datenbank liest?

Hier sind drei Argumente:

  • Der Bedarf an extrem niedrigen Latenzen - das Ausliefern von Vorhersagen innerhalb von Streaming-Aufträgen ist viel schneller, da wir die durch Verbindungen zu einem HTTP-Server oder einer DB verursachten Netzwerklatenzen weglassen können. Ganz zu schweigen von der Tatsache, dass das Netzwerk manchmal ausfällt.
  • Service-Entkopplung - was ist, wenn andere Teams die REST-JSON-Nutzdaten ändern oder ein Datenanalyst die Spalte in der Datenbank umbenennt? Die Speicherung von Modellen innerhalb von Flink-Jobs ist eine einfache Lösung.
  • Kosten - niemand spricht gerne darüber, dennoch sind sie einer der wichtigsten Faktoren, wenn Projekte erfolgreich sein sollen. Die Unterhaltung zusätzlicher HTTP-Server und DBs + Pipelines, die Vorhersagen berechnen und in der DB speichern, verursacht Kosten. Ihr Managementteam mag keine Kosten, da bin ich mir zu 100% sicher!

Natürlich bringt die Speicherung von ML-Modellen in Jobs andere Herausforderungen mit sich, wie z.B.:

  • Wie laden Sie das Modell?
  • Wie machen Sie Vorhersagen?
  • Mein Team verwendet Flink SQL. Ist es möglich, mit dieser API Vorhersagen zu treffen?

Um diese Fragen zu beantworten, haben wir die Flink-MLeap Bibliothek erstellt, die Sie hier finden können hier.

Anwendungsfall

Stellen Sie sich vor, Sie sind ein Data Scientist (oder vielleicht sind Sie es sogar) und möchten die bestehende Flink-Infrastruktur nutzen, um Ihr trainiertes ML-Modell anzuschließen und Vorhersagen über gestreamte Daten zu treffen. Sie sind ein vielbeschäftigter Mensch und möchten keine Zeit damit verschwenden, eine völlig neue Technologie zu erlernen. Das ist in Ordnung, Sie sind nicht faul! Das ist nur natürlich, denn niemand kann alle Technologien der Welt erlernen.

Konzentrieren wir uns auf das, was Sie wissen, und wie Sie dieses Wissen nutzen können. Sie sind ein "Datentyp", also haben Sie wahrscheinlich schon von SQL gehört oder vielleicht sogar Tausende von Abfragen in dieser Sprache geschrieben. Gut so! Als Sie über Flink gelesen haben, sind Sie auf diesen interessanten Blogbeitrag über die Erstellung von Flink-Jobs mit SQL gestoßen, wie zum Beispiel diesen hier.

Die Verbindung zum Flink-Cluster über den SQL-Client und das Schreiben von Abfragen zur Vorhersage von Streams sollte so einfach und natürlich sein, oder?

Verwendung

Mit unserer Bibliothekkönnen Sie mühelos ML-Modelle in einer Stream-Umgebung bereitstellen:

Also erstellen wir zunächst einen Stream mit Merkmalen mithilfe von SQL. Dazu verwenden wir einen Datagen-Konnektor, einen Hilfs-Konnektor in Flink, der einen Stream mit Zufallswerten erzeugt - sehr nützlich für die Entwicklungsphase.

// Create table with features
CREATE TABLE Features (
  feature1 DOUBLE NOT NULL,
  feature2 INT NOT NULL,
  feature3 DOUBLE NOT NULL,
  feature_timestamp TIMESTAMP(3))
WITH ( 
'connector' = 'datagen',
'number-of-rows' = '10',
'fields.feature1.min' = '0.0',
'fields.feature1.max' = '1.0'
)


Als Nächstes machen wir auf der Grundlage dieser Merkmale Vorhersagen:

// Execute predictions
SELECT
  Predict(feature1, feature2, feature3) as prediction,
  Predictv2(feature1) as prediction2
FROM Features


Wie Sie sehen können, verwenden wir hier die benutzerdefinierten SQL-Funktionen Predict und Predictv2 für unsere Merkmale. Sie können eine unterschiedliche Anzahl von Argumenten und Typen annehmen. Die Namen der Funktionen und Modelle, die sie verwenden, können einfach in der Konfiguration definiert werden.

Im Folgenden finden Sie weitere technische Aspekte der Bibliothek, wie wir sie aufgebaut haben und Beispiele für ihre Verwendung und Konfiguration.

FLINK SQL API

Wir haben uns mehr auf die Flink SQL-API konzentriert und mehr Hilfsprogramme für diese API vorbereitet, damit jemand, der mit Flink nicht so vertraut ist oder sich mit SQL wohler fühlt als mit Java/Scala, das ML-Modell mühelos in Flink-Jobs verwenden kann.

Zu diesem Zweck haben wir MLeapUDFRegistry vorbereitet. Der Hauptzweck dieser Registrierung besteht darin, UDFs (Flink user defined functions) zu registrieren, die später in SQL-Abfragen verwendet werden können. Um Ihre UDFs hinzuzufügen, können Sie sie in der application.conf wie folgt definieren:

mleap {
	udfRegistry = [
    	{
        	udfName = "Predict"
        	bundlePath = "/mleap-example-1"
        	bundleSource = "file"
    	},
    	{
           udfName = "Predictv2"
        	bundlePath = "/mleap-example-2"
        	bundleSource = "file"
    	}
	]
}

Und führen Sie MLeapUDFRegistry.registerFromConfig(config, tableEnv) aus, bevor Sie Ihre Abfragen ausführen, wie wir es in diesen Beispielanwendungen getan haben: FlinkSqlWithMLeap.

...
	val env = StreamExecutionEnvironment.getExecutionEnvironment
	val tableEnv = StreamTableEnvironment.create(env)
 
	// Register UDFs basing on config
	val config = ConfigFactory.load()
	MLeapUDFRegistry.registerFromConfig(config, tableEnv)
 
...

Eine weitere Sache, die recht problematisch sein kann, ist das Schreiben spezifischer UDFs für jedes ML-Modell. Das ist natürlich die einfachste, aber auch die zeitaufwändigste Methode. Deshalb haben wir eine sehr generische MLeapUDF definiert, die für jedes MLeap-Bündel wiederverwendet werden kann.

Dank MLeapUDFRegistry und MLeapUDF geht die Verwendung von ML-Modellen mit SQL sehr leicht von der Hand. Sehen Sie sich nur die Anwendungen von FlinkSqlWithMLeap an. Jeder, der sich mit SQL auskennt und ein ML-Modell besitzt, kann es problemlos mit Flink verwenden.

Guten Appetit!

Code wiederbeleben

Werfen wir einen kurzen Blick auf den Code. Wir haben dieses Projekt in Scala geschrieben. Es enthält zwei Module:

  • lib - mit Bibliotheksklassen
  • Beispiel - mit Anwendungsbeispielen

In dem Bibliotheksmodul haben wir zwei Flink-APIs behandelt: Streaming und SQL, damit sie in jedem Auftrag wiederverwendet werden können.

ML-Modelle laden

Um ein Modell zu laden, müssen wir zunächst eines erstellen. Ich werde die im vorherigen Artikel vorgestellten Modelle wiederverwenden Artikel.

Random Forest Regressoren nehmen einen Float als Eingabe und geben einen Float als Ausgabe.

Um die Modelle in die Flink-Jobs zu laden, haben wir BundleLoaders erstellt. Das eine ist FileBundleLoader, das Bundles aus lokalen Dateien lädt. Die andere ist GCSBundleLoader, die Modelle von Google Cloud Storage Bucket abrufen und in Flink-Jobs verwenden kann.

Streaming-API

In unserer Bibliothek konzentrieren wir uns mehr auf SQL-Beispiele, weil die Zielgruppe für diese Funktion größer ist. Ich glaube, dass mehr Data Scientists SQL als Java beherrschen. Davon abgesehen war die Streaming-API ein guter Ausgangspunkt, um zu prüfen, ob es überhaupt möglich ist, Aufträge mit MLeap-Modellen auszuführen.

In MleapMapFunction haben wir eine Möglichkeit vorgestellt, MLeap-Bundles zu verwenden. Wir haben das Modell mit der offenen Methode geladen.

case class MleapMapFunction(bundleName: String, bundleLoader: BundleLoader) extends
  RichMapFunction[Double, Double] {
 
  private val LOG = LoggerFactory.getLogger(classOf[MleapMapFunction])
  @transient var transformer: Transformer = _
 
  override def open(parameters: Configuration): Unit = {
	transformer = bundleLoader.loadBundle(bundleName) match {
  	case Failure(exception) => {
    	LOG.error(s"Error while loading bundle: $bundleName", exception)
    	throw BundleLoadProblem(exception)
  	}
  	case Success(value) => value
	}
  }
 
  override def map(value: Double): Double = {
	val dataset = Seq(Row(DenseTensor(Array(value), List(1))))
	val frame = DefaultLeapFrame(transformer.inputSchema, dataset)
	val res = transformer.transform(frame).get.dataset.head(1).asInstanceOf[Double]
	res
  }
}


Dann machen wir mit der Kartenmethode Vorhersagen. Wie Sie sehen können, war das eine sehr einfache Lösung.

Um dies zu testen, haben wir einen einfachen Fink-Job FlinkDatastreamWithMleap implementiert:

object FlinkDatastreamWithMleap {
  def main(args: Array[String]): Unit = {
 
	implicit val typeInfo = TypeInformation.of(classOf[StructType])
	val env = StreamExecutionEnvironment.getExecutionEnvironment
 
	val rand: Random = new Random()
 
	val text = env.fromElements(rand.nextDouble(), rand.nextDouble(), rand.nextDouble())
	val bundlePath = getClass.getResource("/mleap-example-1").toString
 
	text.map(MleapMapFunction(bundlePath, FileBundleLoader)).print()
 
	env.execute()
  }
}

Niemals aufhören, sich zu verbessern

Es gibt kein System/keine Bibliothek, das/die nicht bereichert oder verbessert werden kann. Das Gleiche gilt für diese Bibliothek. Die wichtigsten Dinge, die wir gerne verbessern würden, sind:

  • unsere generische UDF mit komplizierteren ML-Modellen zu testen,
  • bereiten Sie Beispiele für die Verwendung mit Kubernetes vor,
  • Unterstützung für andere ML-Modell-Serialisierer ähnlich wie MLeap hinzufügen: PMML, die Unterstützung für weitere ML-Bibliotheken bieten wird.

Interessieren Sie sich für ML- und MLOps-Lösungen? Wie können Sie ML-Prozesse verbessern und die Lieferfähigkeit von Projekten steigern? Sehen Sie sich unsere MLOps-Demo an und melden Sie sich für eine kostenlose Beratung an.

Verfasst von

Bartosz Chodnicki

Contact

Let’s discuss how we can support your journey.