Eines der Dinge, die ich an CQRS mag, ist, dass viele der Infrastrukturkomponenten einfacher werden, zumindest im Vergleich zum klassischen ORM-Ansatz. Einige dieser Komponenten sind jedoch in bestehenden Unternehmensanwendungen noch nicht weit verbreitet und werden für die meisten Benutzer neu sein. Eine dieser Komponenten ist der Ereignisspeicher, der für die Persistenz der (transaktionalen) Domäne in CQRS verwendet wird.
Vom Konzept her ist ein Ereignisspeicher sehr einfach. Um ein Aggregat aufzubewahren, müssen alle neuen Ereignisse, die den Zustand ändern, gespeichert werden. Wenn Sie ein Aggregat wieder in den Speicher laden, wird jedes Ereignis gegen Ihre Domäne abgespielt, damit das Aggregat wieder in den richtigen Zustand versetzt wird. Die ursprüngliche Schnittstelle des EventStore spiegelte dies auf eine sehr einfache Weise wider: [java] public interface EventStore<EreignisTyp> { void storeEvents(UUID aggregateId, Liste<EreignisTyp> Ereignisse); Liste<EreignisTyp> loadEvents(UUID aggregateId); } [/java] Die storeEvents-Methode kümmert sich sowohl um die Speicherung der ersten Ereignisse für ein neu persistiertes Aggregat als auch um das Hinzufügen weiterer Ereignisse nach weiteren Transaktionen. Die loadEvents-Methode gibt einfach alle für das Aggregat gespeicherten Ereignisse zurück. Diese Schnittstelle funktioniert als Ausgangspunkt recht gut, ist aber schwer zu erweitern, um einige Funktionen zu unterstützen, die Sie wahrscheinlich in einem echten System benötigen:
- Polymorphismus
- Schnappschüsse
- Versionierung und Konfliktlösung
Für das Übungsbeispiel wurde die Schnittstelle also so geändert, dass sie Polymorphismus und Versionierung unterstützt: [java] public interface EventStore<EreignisTyp> { void storeEventSource(EventSource<EreignisTyp> Quelle); T loadEventSource(Klasse<T> expectedType, UUID eventSourceId); T loadEventSource(Klasse<T> expectedType, VersionedId eventSourceId); } [/java] Die EventSource ist eine neue Schnittstelle, die von Ihrem Aggregatstamm implementiert werden muss: [java] public interface EventSource<EreignisTyp> { VersionedId getVersionedId(); Liste<EreignisTyp> getUnsavedEvents(); void loadFromHistory(Liste<EreignisTyp> Geschichte); } [/java] Die Methoden getVersionedId und getUnsavedEvents werden beim Persistieren eines Aggregats aufgerufen. Beim Laden des Aggregats instanziiert der Ereignisspeicher Ihr Aggregat mit seiner aktuellen ID und Version und ruft loadFromHistory auf. Obwohl diese Änderung die Unterstützung für Polymorphismus und Versionierung hinzugefügt hat, ist die Schnittstelle nicht sehr sauber:
- Die Aggregatwurzel muss die Schnittstelle EventSource implementieren, um sie mit dem Ereignisspeicher zu verbinden
- Die EventSource-Schnittstelle wird sowohl als Quelle als auch als Senke verwendet
- Der Ereignisspeicher instanziiert Ihr Aggregat und übergibt dabei die VersionedId. Ihr Aggregatstamm muss also auch einen Ein-Argument-Konstruktor haben
Und schließlich war auch die Implementierung des Ereignisspeichers nicht sehr sauber, da sie JDBC, Reflection-Code und Versionsverwaltung vermischte. Also beschloss ich letztes Wochenende, mit einem Neuanfang zu beginnen und den Ereignisspeicher neu zu implementieren. Eine meiner Lieblingsmethoden, um eine API zu entwerfen und zu implementieren, ist es, mit Unit-Tests und einer einfachen Java-Implementierung zu beginnen, ohne sich gleichzeitig mit dem eigentlichen JDBC-Code zu befassen. Maps und Lists eignen sich hervorragend zum Speichern von Daten, während Sie die verschiedenen Designs erkunden. Außerdem sind die Tests vollständig wiederverwendbar, wenn es an der Zeit ist, die eigentliche JDBC-Implementierung zu schreiben. Hier ist einer der ersten Testfälle: [java] @Test public void should_create_event_stream_with_initial_version_and_events() { FakeEventSource source = new FakeEventSource("type", 0, T1, EVENTS); subject.createEventStream(ID_1, source); FakeEventSink sink = new FakeEventSink("type", 0, T1, EVENTS); subject.loadEventsFromLatestStreamVersion(ID_1, sink); sink.verify(); } [/java] Wie Sie aus dem Testfall ersehen können, wurden die Rollen von Ereignisquelle und Ereignissenke explizit gemacht. Die Erstellung eines Ereignisstroms ist nun auch von der Speicherung zusätzlicher Ereignisse in einem bestehenden Ereignisstrom getrennt. Außerdem muss das Aggregat diese Schnittstellen nicht mehr implementieren. Die Repository-Implementierung muss das Mapping zwischen Ihren Domänenklassen und dem Ereignisspeicher vornehmen. Hier sind die neuen Schnittstellen des Ereignisspeichers mit verschiedenen Methoden, die das Laden von historischen Daten unterstützen: [java] public interface EventSource<EreignisTyp> { String getType(); long getVersion(); long getTimestamp(); Liste<EreignisTyp> getEvents(); } public interface EventSink<EreignisTyp> { void setType(String type); void setVersion(long version); void setTimestamp(long timestamp); void setEvents(Liste<EreignisTyp> Ereignisse); } public interface EventStore<EreignisTyp> { void createEventStream(UUID streamId, EventSource<EreignisTyp> Quelle); void storeEventsIntoStream(UUID streamId, long expectedVersion, EventSource<EreignisTyp> Quelle); void loadEventsFromLatestStreamVersion(UUID streamId, EventSink<EreignisTyp> Waschbecken); void loadEventsFromExpectedStreamVersion(UUID streamId, long expectedVersion, EventSink<EreignisTyp> Waschbecken); void loadEventsFromStreamUptoVersion(UUID streamId, long version, EventSink<EreignisTyp> Waschbecken); void loadEventsFromStreamUptoTimestamp(UUID streamId, long timestamp, EventSink<EreignisTyp> Waschbecken); } [/java] Wie Sie sehen, sind die EventSink- und EventSource-Schnittstellen sehr gegensätzlich und leicht erweiterbar, um neue Funktionen wie das Laden aus einem Snapshot oder die Unterstützung von Konfliktlösungen zu ermöglichen. Nachdem wir eine vollständige Testsuite und eine In-Memory-Implementierung des neuen Ereignisspeichers erstellt hatten (was den größten Teil des Tages in Anspruch nahm), war die Hinzufügung einer neuen JDBC-basierten Implementierung nur ein paar Stunden Arbeit, die nur minimale Änderungen an den Tests und Schnittstellen erforderte, die sich hauptsächlich auf die Ausnahmebehandlung bezogen. Die aktuelle JDBC-Ereignisspeicher-Implementierung umfasst etwa 250 Zeilen Java, was für einen Domänen-Persistenzmechanismus sehr bescheiden ist.
Verfasst von
Erik Rozendaal
Contact



