Event-Registrierung bei Kafka

Ursprünglicher Autor: Adam Warski
  • Übersetzung
Hi, Habr!

Wir haben die letzten Reserven des Buches " Apache Kafka. Stream-Verarbeitung und Analyse von Daten " entkorkt und an einen Nachdruck geschickt. Außerdem haben wir einen Auftrag für das Buch " Kafka Streams in Action " erhalten und nächste Woche wortwörtlich übersetzt.



Um einen interessanten Anwendungsfall für die Kafka Streams-Bibliothek zu zeigen, haben wir uns entschlossen, einen Artikel über das Event Sourcing-Paradigma in Kafka von Adam Worsky zu übersetzen, dessen Artikel über die Sprache Scala vor zwei Wochen bei uns veröffentlicht wurde. Interessanter ist, dass die Meinung von Adam Worsky nicht unumstritten ist: Hier wird beispielsweise argumentiert, dass dieses Paradigma für Kafka entscheidend ist. Umso denkwürdiger wird der Eindruck des Artikels sein, hoffen wir.

Der Begriff „Event Sourcing“ wird in unserer Martin Pure Architecture Edition und in diesem Artikel mit „Event Registration“ übersetzt . Wenn jemand beeindruckt ist von der Übersetzung von "Pumping Events" - lassen Sie es mich wissen.

Wenn Sie ein System erstellen, in dem Event-Sourcing vorgesehen ist, stoßen wir früher oder später auf das Problem der Persistenz - und hier haben wir einige Möglichkeiten. Erstens gibt es einen EventStore , eine ausgereifte, kampferprobte Implementierung. Alternativ können Sie die Akka-Persistenz verwenden , um die Skalierbarkeit von Cassandra optimal zu nutzen und sich auf die Leistung des Darstellermodells zu verlassen. Eine andere Option ist eine gute alte relationale Datenbank , bei der der Ansatz CRUDmithilfe von Ereignissen kombiniert wird und der maximale Nutzen aus Transaktionen herausgedrückt wird.

Zusätzlich zu diesen (und vielleicht vielen anderen) Möglichkeiten, die sich aufgrund einiger kürzlich implementierter Dinge ergeben haben, ist es heute sehr einfach geworden, die Registrierung von Ereignissen auf Kafka zu organisieren . Lass uns sehen wie.

Was ist eine Veranstaltungsregistrierung?

Zu diesem Thema gibt es eine Reihe hervorragender einleitender Artikel , daher beschränke ich mich auf eine kurze Einführung. Bei der Registrierung von Ereignissen speichern wir nicht den "aktuellen" Status der in unserem System verwendeten Entitäten, sondern den Ereignisstrom, der sich auf diese Entitäten bezieht. Jedes Ereignis ist eine Tatsache , die eine Zustandsänderung (bereits!) Beschreibt , die beim Objekt aufgetreten ist . Wie Sie wissen, werden die Fakten nicht diskutiert und unverändert .

Wenn wir einen Strom solcher Ereignisse haben, kann der tatsächliche Zustand der Entität durch Abwicklung aller damit zusammenhängenden Ereignisse ermittelt werden. Bitte beachten Sie jedoch, dass das Gegenteil nicht möglich ist. Wenn Sie nur den „Ist-Zustand“ beibehalten, werden viele wertvolle chronologische Informationen verworfen.

Ereignisprotokollierung können friedlich koexistieren mit den traditionellen Methoden der Lagerbedingung. In der Regel verarbeitet das System verschiedene Arten von Entitäten (z. B. Benutzer, Bestellungen, Waren usw.), und es ist durchaus möglich, dass die Registrierung von Ereignissen nur für einige dieser Kategorien geeignet ist. Es ist wichtig anzumerken, dass wir hier nicht mit der Wahl von „alles oder nichts“ konfrontiert sind; es handelt sich lediglich um die zusätzlichen Zustandsverwaltungsoptionen in unserer Anwendung.

Speichern von Ereignissen in Kafka

Das erste Problem, das gelöst werden muss: Wie werden Ereignisse in Kafka gespeichert? Es gibt drei mögliche Strategien:

  • Speichern Sie alle Ereignisse für alle Entitätstypen in einem einzigen Thema (mit mehreren Segmenten).
  • Entsprechend der Entität "Topic-on-Each-Type" werden alle mit dem Benutzer verknüpften Ereignisse in ein separates Thema eingefügt, in ein separates Thema - alles in Bezug auf das Produkt usw.
  • D. H. Zu einem separaten Thema für jeden spezifischen Benutzer und jeden Gegenstand

Die dritte Strategie (on-topic-on-essence) ist praktisch nicht praktikabel. Wenn jeder neue Benutzer im System erscheinen würde, müsste er ein eigenes Thema beginnen, so dass die Anzahl der Themen bald unbegrenzt wird. Jede Aggregation wäre in diesem Fall sehr schwierig, zum Beispiel wäre es schwierig, alle Benutzer in einer Suchmaschine zu indizieren. Nicht nur, dass gleichzeitig eine Vielzahl von Themen konsumiert werden müsste - so wären auch nicht alle davon im Voraus bekannt.

Daher bleibt die Wahl zwischen 1 und 2. Beide Optionen haben ihre Vor- und Nachteile. Ein einziges Thema macht es einfacher, eine globale Ansicht zu erhalten.über alle Ereignisse. Indem Sie das Thema für jeden Entitätstyp hervorheben, können Sie den Fluss jeder Entität jedoch separat skalieren und segmentieren. Die Wahl einer der beiden Strategien hängt vom jeweiligen Anwendungsfall ab.

Darüber hinaus können Sie beide Strategien gleichzeitig implementieren, wenn Sie über zusätzlichen Speicherplatz verfügen: um Themen als Entitäten aus einem umfassenden Thema zu erstellen.



Im Rest des Artikels werden wir nur mit einem Entitätstyp und einem einzelnen Thema arbeiten, obwohl das angegebene Material leicht zu extrapolieren ist und auf viele Themen oder Entitätstypen angewendet werden kann.

(LESEN: Wie Chris Hunt bemerkt hat , gibt es einen hervorragenden Artikel von Martin Kleppman.)wo detailliert beschrieben wird, wie Ereignisse zu Themen und Segmenten verteilt werden).

Die einfachsten Operationen mit einem Repository im Ereignisregistrierungsparadigma

Die einfachste Operation, die von einem Repository, das die Registrierung von Ereignissen unterstützt, logisch zu erwarten ist, ist das Lesen des „aktuellen“ (minimierten) Status einer bestimmten Entität. In der Regel hat jede Entität die eine oder die andere id. Dementsprechend idmuss unser Speichersystem den aktuellen Status des Objekts zurückgeben.

Das Ereignisprotokoll dient uns als ultimative Wahrheit: Der aktuelle Status kann immer aus dem Ereignisstrom einer bestimmten Entität abgeleitet werden. Dazu benötigt die Datenbank-Engine eine reine Funktion (ohne Nebenwirkungen), die das Ereignis und den Anfangsstatus akzeptiert und den geänderten Status zurückgibt:Event = &gt State =&gt State. Bei einer solchen Funktion und den anfänglichen Zustandswerten ist der aktuelle Zustand eine Faltung des Ereignisstroms (die Zustandsänderungsfunktion muss sauber sein, damit sie wiederholt auf dieselben Ereignisse angewendet werden kann.) Die

vereinfachte Implementierung der Operation „aktuellen Zustand lesen“ in Kafka sammelt den Strom von Alle Ereignisse aus dem Thema werden gefiltert, wobei nur die Ereignisse mit den angegebenen Ereignissen übrig bleibenidund faltet sich mit der angegebenen Funktion. Wenn es viele Ereignisse gibt (und im Laufe der Zeit nur die Anzahl der Ereignisse zunimmt), kann dieser Vorgang langsam werden und viele Ressourcen verbrauchen. Selbst wenn das Ergebnis im Arbeitsspeicher zwischengespeichert und auf dem Dienstknoten gespeichert wird, müssen diese Informationen immer noch periodisch neu erstellt werden, beispielsweise aufgrund von Knotenausfällen oder aufgrund des Verdrängens der Cache-Daten.



Deshalb brauchen wir einen rationalen Weg. Hier können sich Kafka-Streams und State Stores als nützlich erweisen. Kafka-Streams-Anwendungen werden auf einem ganzen Cluster von Knoten ausgeführt, die bestimmte Themen gemeinsam nutzen. Jedem Knoten werden, wie beim herkömmlichen Kafka-Kundenkonto, mehrere Segmente der konsumierten Themen zugewiesen. Kafka-streams bietet jedoch übergeordnete Operationen für Daten, mit denen abgeleitete Streams wesentlich einfacher erstellt werden können.

Eine dieser Operationen in Kafka-Streams ist eine Faltung des Streams im lokalen Speicher. Jeder lokale Speicher enthält nur Daten aus den Segmenten, die von einem bestimmten Knoten belegt werden. Standardmäßig stehen zwei lokale Speicherimplementierungen zur Verfügung: im RAM und auf RocksDB-Basis .

Zurück zum Thema der Registrierung von Ereignissen stellen wir fest, dass es möglich ist, den Fluss von Ereignissen im Zustandsspeicher zu minimieren , wobei der "aktuelle Status" jeder Entität im lokalen Knoten von den dem Knoten zugewiesenen Segmenten beibehalten wird. Wenn Sie die RocksDB-basierte Zustandsspeicherimplementierung verwenden, hängt es nur davon ab, wie viel Speicherplatz auf einem einzelnen Knoten verfolgt werden kann.

So sieht die Faltung von Ereignissen im lokalen Speicher aus, wenn die Java-API verwendet wird (serde bedeutet "Serializer / Deserializer"):

KStreamBuilder builder = new KStreamBuilder();
builder.stream(keySerde, valueSerde, "my_entity_events")
  .groupByKey(keySerde, valueSerde)
  // функция свертки: должна возвращать новое состояние
  .reduce((currentState, event) -> ..., "my_entity_store");
  .toStream(); // выдает поток промежуточных состоянийreturn builder;

Ein vollständiges Beispiel für die Auftragsbearbeitung auf Basis von Microservice ist auf der Confluent-Website verfügbar.

(LESEN: Wie von Sergey Egorov und Nikita Salnikov auf Twitter angemerkt , müssen Sie für das System mit Ereignisregistrierung wahrscheinlich die Standardeinstellungen für den Datenspeicher in Kafka ändern, sodass weder zeitlich noch in der Größe und optional optional Grenzen gesetzt werden können , Aktivieren Sie die Datenkomprimierung.)

Anzeigen des aktuellen Status

Wir haben ein Status-Repository erstellt, in dem sich die aktuellen Status aller Entitäten befinden, die aus den dem Knoten zugewiesenen Segmenten stammen, aber wie können Sie dieses Repository jetzt anfordern? Wenn die Anforderung lokal ist (dh sie stammt von demselben Knoten, auf dem sich das Repository befindet), ist alles ganz einfach:

streams
  .store("my_entity_store", QueryableStoreTypes.keyValueStore());
  .get(entityId);

Was aber, wenn wir Daten anfordern möchten, die sich auf einem anderen Knoten befinden? Und wie kann man herausfinden, welche Art von Knoten? Hier haben wir eine weitere Gelegenheit, die kürzlich in Kafka erschienen ist: interaktive Anfragen . Mit ihrer Hilfe können Sie auf die Kafka-Metadaten zugreifen und herausfinden, welcher Knoten das Themensegment mit dem angegebenen verarbeitet id(in diesem Fall wird das Tool implizit für die Themensegmentierung verwendet ):

metadataService
  .streamsMetadataForStoreAndKey("my_entity_store", entityId, keySerde)

Als Nächstes müssen Sie die Anforderung irgendwie an den richtigen Knoten umleiten. Bitte beachten Sie: Die spezifische Art und Weise, in der die Kommunikation zwischen Knoten implementiert und verarbeitet wird, sei es REST, Akka-Remote oder eine andere, liegt nicht in der Verantwortung von Kafka-Streams. Kafka bietet einfach Zugriff auf den Zustandsspeicher und gibt Informationen darüber, auf welchem ​​Knoten sich der Zustandsspeicher für einen bestimmten befindet id.

Wiederherstellung nach einem Fehler

Der Zustandsspeicher sieht gut aus, aber was passiert, wenn ein Knoten ausfällt? Das Wiederherstellen eines lokalen Zustandsrepositorys für ein bestimmtes Segment kann auch eine kostspielige Operation sein. Dies kann lange Zeit zu einer erhöhten Latenzzeit oder zum Verlust von Anforderungen führen, da die Kafka-Streams neu ausbalanciert werden müssen (nachdem ein Knoten hinzugefügt oder entfernt wurde).

Aus diesem Grund werden standardmäßig Langzeitzustandsspeicher protokolliert. Das heißt, alle am Speicher vorgenommenen Änderungen werden zusätzlich im Changelog-Thema erfasst. Dieses Thema ist komprimiert (schließlich idinteressiert uns nur der letzte Eintrag ohne Änderungshistorie, da die Geschichte in den Ereignissen selbst gespeichert ist) - daher ist es so klein wie möglich. Daher kann das Erstellen eines Repositorys auf einem anderen Knoten wesentlich schneller erfolgen.

Bei einer Neuausrichtung in diesem Fall sind jedoch immer noch Verzögerungen möglich. Um sie noch weiter zu reduzieren, bietet kafka-streams die Möglichkeit, mehrere Sicherungsreplikate aufzubewahren (num.standby.replicas) für jeden Speicher. Diese Replikate wenden alle Aktualisierungen an, die von Themen mit Änderungsprotokollen abgerufen wurden, sobald sie eintreffen. Sie sind bereit, für ein bestimmtes Segment in den Hauptzustandsspeicher zu wechseln, sobald der aktuelle Hauptspeicher ausfällt.

Konsistenz

Mit den Standardeinstellungen liefert Kafka mindestens einmalig. Das heißt, im Falle eines Knotenausfalls können einige Nachrichten mehrmals zugestellt werden. Beispielsweise ist es möglich, dass ein bestimmtes Ereignis zweimal auf den Zustandsspeicher angewendet wird, wenn ein Systemabsturz auftritt, nachdem der Zustandsspeicher Änderungen im Zustandsspeicher aufgezeichnet hat, aber bevor der Versatz für dieses bestimmte Ereignis vorgenommen wurde. Vielleicht verursacht dies keine Schwierigkeiten: Unsere Statusaktualisierungsfunktion (Event = &gt State =&gt State) kann mit solchen Situationen ganz normal umgehen. Dies kann jedoch nicht der Fall sein: In einem solchen Fall können Sie die Garantien der strengen Einzellieferung in Kafka verwenden . Solche Garantien gelten nur für das Lesen und Schreiben von Kafka-Themen. Dies ist jedoch genau das, was wir hier tun: Im Hintergrund sind alle Einträge in Kafka-Themen auf das Aktualisieren des Änderungsprotokolls für den Zustandsspeicher und das Durchführen von Offsets zurückzuführen. All dies kann in Form von Transaktionen erfolgen .

Wenn also unsere Funktion Status - Updates erforderlich ist , können wir die Semantik der Verarbeitung enthalten „ist ausschließlich eine einmalige Lieferung“ Ströme eine einzige Konfigurationsoptionen verwendet processing.guarantee. Dadurch sinkt die Produktivität, aber nichts kommt umsonst.

Ereignisse hören

Nachdem wir nun die Grundlagen besprochen haben - den aktuellen Status abfragen und ihn für jede Entität aktualisieren - was ist mit dem Auslösen von Nebenwirkungen ? Irgendwann wird es beispielsweise notwendig für:

  • Senden von Benachrichtigungs-E-Mails
  • Indizierung von Entitäten in einer Suchmaschine
  • Externe Dienste über REST anrufen (oder SOAP, CORBA usw.)

Alle diese Aufgaben sind zu einem gewissen Grad blockiert und beziehen sich auf E / A-Vorgänge (dies ist natürlich bei Nebenwirkungen). Daher ist es möglicherweise keine gute Idee, sie innerhalb der Zustandsaktualisierungslogik auszuführen. Dadurch kann die Häufigkeit von Fehlern in der Hauptschleife zunehmen Ereignisse, und in Bezug auf die Leistung wird es einen Engpass geben.

Darüber hinaus kann die Funktion mit der Zustandsaktualisierungslogik (E Event = &gt State =&gt State) mehrmals ausgeführt werden (im Falle von Ausfällen oder Neustarts), und öfter möchten wir die Anzahl der Fälle minimieren, in denen Nebenwirkungen für ein bestimmtes Ereignis wiederholt ausgeführt werden.

Glücklicherweise haben wir mit Kafka-Tops viel Flexibilität. In der Flussphase, in der der Zustandsspeicher aktualisiert wird, können Ereignisse in unveränderter Form (oder ggf. in geänderter Form) ausgegeben werden, und der resultierende Stream / Topic (in Kafka sind diese Konzepte gleichwertig) können nach Belieben verwendet werden. Darüber hinaus kann es entweder vor oder nach der Statusaktualisierungsstufe verbraucht werden. Schließlich können wir steuern, wie wir Nebenwirkungen beginnen werden: mindestens einmal oder höchstens einmal. Die erste Option ist verfügbar, wenn Sie den Versatz des konsumierten Themenereignisses erst durchführen, nachdem alle Nebenwirkungen erfolgreich abgeschlossen wurden. Umgekehrt führen wir bei einem maximal einmaligen Start Versätze durch, bevor Nebenwirkungen ausgelöst werden.

Es gibt verschiedene Optionen für den Beginn von Nebenwirkungen, die von der konkreten praktischen Situation abhängen. Zunächst ist es möglich, die Stufe der Kafka-Streams zu bestimmen, bei der Nebenwirkungen für jedes Ereignis als Teil der Stream-Verarbeitungsfunktion ausgelöst werden.
Es ist ziemlich einfach, einen solchen Mechanismus einzurichten, aber diese Lösung ist unflexibel, wenn es um Wiederholungsversuche, das Verwalten von Verschiebungen und konkurrierende Verschiebungen bei vielen Ereignissen gleichzeitig geht. In solch komplexeren Fällen ist es zweckmäßiger, die Verarbeitung beispielsweise mit einem reaktiven Kafka oder einem anderen Mechanismus zu bestimmen, bei dem Kafka-Themen "direkt" verwendet werden.

Es ist auch möglich, dass ein Ereignis andere Ereignisse auslöst. - Zum Beispiel kann das Ereignis "order" die Ereignisse "Vorbereitung zum Senden" und "Kundenbenachrichtigung" auslösen. Dies kann auch auf der Stufe der Kafka-Streams implementiert werden.

Wenn wir Ereignisse oder Daten, die aus Ereignissen in einer Datenbank oder einer Suchmaschine extrahiert wurden, beispielsweise in ElasticSearch oder PostgreSQL, speichern möchten, können wir den Kafka Connect- Connector verwenden , der alle Details verarbeitet, die für uns mit dem Verbrauch von Themen verbunden sind.

Erstellen von Ansichten und Projektionen

Normalerweise sind die Systemanforderungen nicht auf das Anfordern und Verarbeiten nur einzelner Entitätsströme beschränkt. Auch sollte Aggregation, eine Kombination mehrerer Ereignisströme, unterstützt werden. Solche kombinierten Flüsse werden oft als Projektionen bezeichnet., und in einer reduzierten Form können Datenrepräsentationen erstellt werden . Kann man sie mit Kafka implementieren?



Wieder - ja! Denken Sie daran, dass wir uns im Grunde nur mit dem Kafka-Thema befassen, in dem unsere Ereignisse gespeichert werden. Infolgedessen verfügen wir über die gesamte Macht der „rohen“ Verbraucher-Buchhalter / Produzenten Kafka, des Kombinators Kafka-Streams und sogar von KSQL - all dies wird für die Definition der Projektionen von Nutzen sein. Mit Kafka-Streams können Sie beispielsweise den Stream filtern, anzeigen, nach Schlüsseln gruppieren, in Zeit- oder Sitzungsfenstern aggregieren usw. entweder auf der Codeebene oder mit einem SQL-ähnlichen KSQL.

Solche Ströme können für Anforderungen über Zustandsspeicher und interaktive Anforderungen über lange Zeit gespeichert und bereitgestellt werden, genau wie bei separaten Entitätsströmen.

Was als nächstes gilt:

Um das unendliche Wachstum des Ereignisflusses zu verhindern, während sich das System entwickelt, kann eine solche Komprimierungsoption, beispielsweise das Speichern von Momentaufnahmen des „aktuellen Status“ , nützlich sein . Daher können wir uns darauf beschränken, nur einige wenige Momentaufnahmen und die Ereignisse zu speichern, die nach ihrer Erstellung aufgetreten sind.

Obwohl Kafka keine direkte Unterstützung für Snapshots bietet (und in einigen anderen Systemen, die nach dem Prinzip der Ereignisregistrierung arbeiten, gibt es diese nicht), können Sie diese Art von Funktionalität definitiv hinzufügen, indem Sie einige der oben genannten Mechanismen wie Threads, Consumer, State Stores usw. verwenden. d.

Zusammenfassung

Obwohl Kafka anfangs nicht mit Blick auf das Ereignisregistrierungsparadigma konzipiert wurde, handelt es sich tatsächlich um eine Stream-Verarbeitungs-Engine mit Unterstützung für die Themenreplikation , Segmentierung, Zustandsspeicherung und Streaming-APIsund gleichzeitig sehr flexibel. Daher können Sie auf Kafka problemlos ein Ereignisaufzeichnungssystem implementieren. Da wir vor dem Hintergrund all dessen, was passiert, immer ein Kafka-Thema haben werden, gewinnen wir zusätzliche Flexibilität, da wir entweder mit Streaming-APIs auf hoher Ebene oder Konsumenten auf niedriger Ebene arbeiten können.

Jetzt auch beliebt: