Jeder, der schon einmal Code geschrieben hat, der komplexe asynchrone Arbeitsabläufe koordinieren muss, weiß, dass dies eine echte Qual sein kann, vor allem, wenn Sie sich darauf beschränken, nur Callbacks direkt zu verwenden. Es gibt verschiedene Tools, die diese Probleme angehen, wie Reactive Extensions und Javascript Promises.
Die Antwort von Clojure kommt in Form von core.async: Eine Implementierung von CSP sowohl für Clojure als auch für Clojurescript. In diesem Beitrag möchte ich zeigen, wie leistungsfähig core.async unter verschiedenen Umständen ist. Der Kontext wird das Schreiben eines Vert.x Event-Handlers sein.
Vert.x ist eine junge, leichtgewichtige, polyglotte, leistungsstarke, ereignisgesteuerte Anwendungsplattform, die auf der JVM aufbaut. Sie verfügt über ein akteursähnliches Gleichzeitigkeitsmodell, bei dem die grobkörnigen Akteure (Verticles genannt) über einen verteilten Ereignisbus kommunizieren können. Obwohl Vert.x noch recht jung ist, ist es sicher, dass es sich zu einem wichtigen Akteur in der Zukunft des reaktiven Webs entwickeln wird.
core.async ermöglicht dieses Szenario, ohne dass Sie einen zusätzlichen veränderlichen Zustand für die Koordinierung innerhalb Ihrer Handler hinzufügen müssen. Der Zustand ist im Kanal enthalten.
[clojure]
(eb/on-message
"forecast.report"
(fn [coords]
(let [ch (chan 3)]
(eb/send "temperature.service" coords #(put! ch {:temperature %}))
(eb/send "humidity.service" coords #(put! ch {:humidity %}))
(eb/send "wind-speed.service" coords #(put! ch {:wind-speed %}))
(go (let [data (merge (
forecast (create-forecast data)]
(eb/reply forecast))))))
[/clojure]
Eine Sache, die core.async bietet und die die meisten anderen Tools nicht haben, ist die Auswahl. Aus den Beispielen:
Es gibt natürlich viel mehr zu core.async als hier gezeigt wird. Aber das ist eine Geschichte für einen anderen Blog.
Szenarien
Das Szenario ist wie folgt. Unser Vertikel registriert einen Handler an einer bestimmten Adresse und hängt von 3 anderen Vertikeln ab.1. Zusammensetzung
Stellen Sie sich vor, der neue Mars-Rover ist an einem Marsfelsen hängen geblieben und wir müssen ihm Anweisungen geben, den Felsen mit seinem eingebauten Laser zu zerstören. Stellen Sie sich außerdem vor, dass die Steuerungssoftware mit Vert.x geschrieben wurde. Es gibt ein einziges Verticle, das für die Abwicklung der notwendigen Schritte verantwortlich ist:- Verwenden Sie den Sensor, um die Position des Felsens zu bestimmen
- Verwenden Sie die Position, um die Härte des Gesteins zu scannen.
- Verwenden Sie die Härte, um den Laser zu kalibrieren und abzufeuern. Status zurückmelden
- Erfolg oder Misserfolg an den Hauptanrufer melden
2. Gleichzeitige Anfragen
Eine andere Sache, die wir vielleicht tun möchten, ist die gleichzeitige Abfrage verschiedener Handler. Obwohl wir die Komposition verwenden können, ist dies nicht sehr performant, da wir nicht auf die Antwort von Dienst-A warten müssen, um Dienst-B aufzurufen. Ein konkretes Beispiel: Stellen Sie sich vor, wir müssen atmosphärische Daten über ein geografisches Gebiet sammeln, um eine Wettervorhersage zu erstellen. Die Daten umfassen die Temperatur, die Luftfeuchtigkeit und die Windgeschwindigkeit, die von drei verschiedenen unabhängigen Diensten angefordert werden. Sobald alle drei asynchronen Anfragen zurückkommen, können wir eine Vorhersage erstellen und dem Hauptaufrufer antworten. Aber woher wissen wir, wann der letzte Callback ausgelöst wurde? Wir müssen einen Speicher (veränderlicher Zustand) vorhalten, der aktualisiert wird, wenn jeder der Rückrufe ausgelöst wird, und die Daten verarbeiten, wenn der letzte zurückkehrt.3. Schnellste Reaktion
Manchmal stehen Ihnen mehrere Dienste zur Verfügung, die ähnliche Funktionen bieten, und Sie wollen nur den schnellsten. Mit einer kleinen Anpassung können wir den vorherigen Code auch für dieses Szenario verwenden. [clojure] (eb/on-message "server.request" (fn [msg] (let [ch (chan 3)] (eb/send "service-A" msg #(put! ch %)) (eb/send "service-B" msg #(put! ch %)) (eb/send "service-C" msg #(put! ch %)) (go (eb/reply (<! ch)))))) [/clojure] Wir nehmen einfach das erste Ergebnis auf dem Kanal und ignorieren die anderen Ergebnisse. Nachdem der go-Block geantwortet hat, gibt es keine weiteren Teilnehmer auf dem Kanal. Die Ergebnisse der Dienste, die zu spät kamen, werden immer noch in den Kanal gestellt, aber nachdem die Anfrage beendet wurde, gibt es keine Verweise mehr darauf und der Kanal mit den Ergebnissen kann entsorgt werden.4. Umgang mit Zeitüberschreitungen und Wahlmöglichkeiten mit Alts!
Wir können Timeout-Kanäle erstellen, die sich nach einer bestimmten Zeitspanne selbst schließen. In geschlossene Kanäle kann nicht mehr geschrieben werden, aber alle Nachrichten im Puffer können noch gelesen werden. Danach gibt jeder Lesevorgang null zurück.Ein entscheidender Vorteil von Kanälen gegenüber Warteschlangen ist die Möglichkeit, auf viele Kanäle gleichzeitig zu warten (wie ein Socket Select). Dies geschieht mit `alts!!` (gewöhnliche Threads) oder `alts!` in Go-Blöcken.In Kombination mit Timeout-Kanälen können Sie auf einem Kanal bis zu einer maximalen Zeitspanne warten, bevor Sie aufgeben. Indem Sie Beispiel 2 ein wenig anpassen: [clojure] (eb/on-message "forecast.report" (fn [coords] (let [ch (chan) t-ch (timeout 3000)] (eb/send "temperature.service" coords #(put! ch {:temperature %})) (eb/send "humidity.service" coords #(put! ch {:humidity %})) (eb/send "wind-speed.service" coords #(put! ch {:wind-speed %})) (go-loop [n 3 data {}] (if (pos? n) (if-some [result (alts! [ch t-ch]))] (recur (dec n) (merge data result)) (eb/fail 408 "Request timed out")) (eb/reply (create-forecast data))))))) [/clojure] Dies bewirkt dasselbe wie zuvor, aber wir warten insgesamt 3 Sekunden, bis die Anfragen abgeschlossen sind, andernfalls antworten wir mit einem Timeout-Fehler. Beachten Sie, dass wir den Timeout-Parameter nicht in den vert.x API-Aufruf von eb/send eingefügt haben. Mit einem erstklassigen Timeout-Kanal können wir diese Timeouts einfacher koordinieren, als wenn wir Timeout-Parameter und Fehler-Callbacks hinzufügen.
Einpacken
Die obigen Szenarien sind eindeutig vereinfacht, um sich auf die verschiedenen Arbeitsabläufe zu konzentrieren, aber sie sollten Ihnen eine Vorstellung davon geben, wie Sie mit der Verwendung in Vert.x beginnen können. Ich habe mich gefragt, ob core.async gut mit Vert.x zusammenarbeiten kann, was die ursprüngliche Motivation für diesen Blogbeitrag war. Verticles sind von Haus aus single-threaded, während core.async Hintergrund-Threads einführt, um Go-Blocks oder State Machine Callbacks zu versenden. Da die versendeten Go-Blöcke den korrekten Nachrichtenkontext tragen, können die Funktionen eb/send, eb/reply usw. von diesen Go-Blöcken aus aufgerufen werden, und alles geht gut.Verfasst von
Daniel Marjenburgh
Unsere Ideen
Weitere Blogs
Contact
Let’s discuss how we can support your journey.



