Nützliche Tricks für die Arbeit mit Apache Camel

    Wenn Sie Integrationslösungen in Java erstellen mussten, sind Sie mit dem wunderbaren Java-Framework namens Apache Camel vertraut . Es stellt auf einfache Weise eine Verbindung zwischen mehreren Diensten her, importiert Daten aus Dateien, Datenbanken und anderen Quellen, benachrichtigt Sie über verschiedene Ereignisse im Jabber-Client oder per E-Mail und wird zur Grundlage für eine Verbundanwendung, die auf einer Vielzahl anderer Anwendungen basiert.


    Einleitung


    В основе модели Apache Camel лежит понятие маршрутов (routes), которые можно конфигурировать как статически (например, в файле Spring-контекста), так и во время работы приложения. По маршрутам ходят караваны сообщений, попутно попадая в различные обработчики, конверторы, аггрегаторы и прочие трансформеры, что в конечном итоге позволяет обработать данные из множества различных источников в едином приложении и передать эти данные другим сервисам или сохранить в какое-либо хранилище.
    В общем и целом Camel — вполне самодостаточный фреймворк. Используя его, зачастую, даже не приходится писать собственный код — достаточно лишь набрать правильный маршрут, который позволит решить поставленную задачу. Однако, всё же для построения собственной модели обработки данных, может потребоваться написание кода.

    So war es bei uns. Wir verwenden Camel, um Pipelines für die Verarbeitung mehrerer Nachrichten aus verschiedenen Quellen zu implementieren. Ein solcher Ansatz ermöglicht es beispielsweise, den Status von Diensten zu überwachen, Probleme rechtzeitig zu melden, aggregierte Analyse-Slices zu empfangen, Daten für das Senden an andere Systeme vorzubereiten und so weiter. Der Fluss der verarbeiteten und "verdaulichen" Nachrichten an das System kann sehr groß sein (Tausende von Nachrichten pro Minute). Daher versuchen wir, möglichst horizontal skalierbare Lösungen zu verwenden. Zum Beispiel haben wir ein System zur Überwachung des Status laufender Tests und Überwachungsdienste. Täglich gibt es eine Million solcher Tests, und wir erhalten ein Vielfaches mehr Nachrichten, um den Prozess ihrer Ausführung zu kontrollieren.
    Um ein solches Nachrichtenvolumen zu „assimilieren“, muss die Aggregationsstrategie klar definiert werden - von einer größeren Parallelität zu einer geringeren. Darüber hinaus müssen Sie über eine grundlegende horizontale Skalierbarkeit und Fehlertoleranz des Dienstes verfügen.
    Wir verwenden ActiveMQ als Nachrichtenwarteschlange und Hazelcast als Online-Speicher .

    Skalierung


    Um die parallele Verarbeitung zu organisieren, wird ein Cluster aus mehreren Peerservern organisiert. Auf jedem von ihnen lebt der ActiveMQ- Broker , in dessen Warteschlange Nachrichten hinzugefügt werden, die über das HTTP-Protokoll eingehen. HTTP-Handles befinden sich hinter einem Balancer, der Nachrichten über Live-Server verteilt.
    Die Eingabenachrichtenwarteschlange auf jedem Server wird von einer Camel-Anwendung analysiert, die einen Hazelcast- Cluster verwendet , um Zustände zu speichern und gegebenenfalls die Verarbeitung zu synchronisieren. ActiveMQs werden auch mit NetworkConnectors geclustert und können Nachrichten miteinander "teilen".
    Im Allgemeinen ist das Schema wie folgt:
    Bild
    Wie aus dem Diagramm hervorgeht, beeinträchtigt der Ausfall einer der Systemkomponenten unter Berücksichtigung der Gleichheit der Elemente nicht deren Leistung. Wenn beispielsweise ein Nachrichtenhandler auf einem der Server ausfällt, beginnt ActiveMQ, Nachrichten aus seinen Warteschlangen an andere zu senden. Wenn einer der ActiveMQ-Broker abstürzt, „hakt“ der Handler den benachbarten. Und schließlich, wenn der gesamte Server ausfällt, arbeitet der Rest des Servers weiter, als wäre nichts passiert. Zur Erhöhung der Datensicherheit speichern Hazelcast-Knoten Sicherungskopien der Daten ihrer Nachbarn (Kopien werden asynchron erstellt, ihre Anzahl auf jedem Knoten wird zusätzlich konfiguriert).
    Mit diesem Schema können Sie den Dienst auch ohne zusätzliche Kosten skalieren, indem Sie zusätzliche Server hinzufügen und dadurch die Computerressource erhöhen.

    Verteilte Aggregatoren


    Bei Verwendung der Aggregation enthält Apache Camel die Konzepte " Aggregations-Repository " und " Korrelationsschlüssel ". Das erste ist das Repository, in dem aggregierte Zustände gespeichert werden (z. B. die Anzahl der gelöschten Tests pro Tag). Der zweite ist der Schlüssel, der zum Verteilen des Nachrichtenflusses nach Status verwendet wird. Mit anderen Worten, der Korrelationsschlüssel ist der Schlüssel im Aggregationsrepository (z. B. das aktuelle Datum).
    Für Aggregatoren in einem ähnlichen Schema mussten wir unser eigenes Aggregations-Repository implementieren, das in der Lage ist, Zustände in Hazelcast zu speichern und die Verarbeitung identischer Schlüssel innerhalb des Clusters zu synchronisieren. Leider haben wir eine solche Möglichkeit im Standard-Camel-Paket nicht gefunden. Der Vorteil der Erstellung erwies sich als recht einfach - implementieren Sie einfach die SchnittstelleAggregationRepository :
    Versteckter Text
    public class HazelcastAggregatorRepository implements AggregationRepository {
        private final Logger logger = LoggerFactory.getLogger(getClass());
        // maximum time of waiting for the lock from hz
        public static final long WAIT_FOR_LOCK_SEC = 20;
        private final HazelcastInstance hazelcastInstance;
        private final String repositoryName;
        private IMap map;
        public HazelcastAggregatorRepository(HazelcastInstance hazelcastInstance, String repositoryName){
            this.hazelcastInstance = hazelcastInstance;
            this.repositoryName = repositoryName;
        }
        @Override
        protected void doStart() throws Exception {
            map = hazelcastInstance.getMap(repositoryName);
        }
        @Override
        protected void doStop() throws Exception {
            /* Nothing to do */
        }
        @Override
        public Exchange add(CamelContext camelContext, String key, Exchange exchange) {
            try {
                DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(exchange);
                map.tryPut(key, holder, WAIT_FOR_LOCK_SEC, TimeUnit.SECONDS);
                return toExchange(camelContext, holder);
            } catch (Exception e) {
                logger.error("Failed to add new exchange", e);
            } finally {
                map.unlock(key);
            }
            return null;
        }
        @Override
        public Exchange get(CamelContext camelContext, String key) {
            try {
                map.tryLock(key, WAIT_FOR_LOCK_SEC, TimeUnit.SECONDS);
                return toExchange(camelContext, map.get(key));
            } catch (Exception e) {
                logger.error("Failed to get the exchange", e);
            }
            return null;
        }
        @Override
        public void remove(CamelContext camelContext, String key, Exchange exchange) {
            try {
                logger.debug("Removing '" + key + "' tryRemove...");
                map.tryRemove(key, WAIT_FOR_LOCK_SEC, TimeUnit.SECONDS);
            } catch (Exception e) {
                logger.error("Failed to remove the exchange", e);
            } finally {
                map.unlock(key);
            }
        }
        @Override
        public void confirm(CamelContext camelContext, String exchangeId) {
            /* Nothing to do */
        }
        @Override
        public Set getKeys() {
            return Collections.unmodifiableSet(map.keySet());
        }
        private Exchange toExchange(CamelContext camelContext, DefaultExchangeHolder holder) {
            Exchange exchange = null;
            if (holder != null) {
                exchange = new DefaultExchange(camelContext);
                DefaultExchangeHolder.unmarshal(exchange, holder);
            }
            return exchange;
        }
    }
    

    Um ein solches Repository zu verwenden, müssen Sie lediglich eine Verbindung zum Hazelcast-Projekt herstellen, es im Kontext deklarieren und dann eine Reihe von Repositorys hinzufügen, die auf die Hazelcast-Instanz verweisen. Es ist wichtig, sich daran zu erinnern, dass jeder Aggregator einen eigenen Schlüsselraum haben muss und daher auch den Namen des Repositorys erhalten muss. In den Hazelcast-Einstellungen müssen Sie alle Server registrieren, die Teil des Clusters sind.
    Auf diese Weise haben wir die Möglichkeit, Aggregatoren in einer verteilten Umgebung zu verwenden, ohne darüber nachdenken zu müssen, auf welchem ​​Server die Aggregation stattfinden soll.

    Verteilte Timer


    Die Anzahl der im Cluster gespeicherten Status ist ziemlich groß. Aber nicht alle werden ständig benötigt. Darüber hinaus müssen einige Status (z. B. der Status von Tests, die seit langer Zeit nicht verwendet wurden und daher seit langer Zeit keine Nachrichten mehr für sie vorhanden sind) überhaupt nicht gespeichert werden. Ich möchte solche Zustände loswerden und darüber zusätzlich andere Systeme benachrichtigen. Dazu muss der Status von Aggregatoren mit einer bestimmten Häufigkeit auf Veralterung überprüft und gelöscht werden.
    Eine einfache Möglichkeit, dies zu tun, besteht darin, eine periodische Aufgabe hinzuzufügen, z. B. mit Quarz. Darüber hinaus können Sie dies mit Camel tun. Es ist jedoch zu beachten, dass die Ausführung in einem Cluster mit vielen Peerservern erfolgt. Und ich möchte nicht, dass regelmäßige Quarzaufgaben gleichzeitig ausgeführt werden. Um dies zu vermeiden, genügt es, die Synchronisation mit Hilfe von Hazelcast-Sperren erneut durchzuführen. Aber wie kann man Quartz zwingen, nur auf einem Server zu initialisieren, oder besser gesagt, zu welchem ​​Zeitpunkt?
    Wir verwenden Spring, um den Camel-Kontext und alle anderen Komponenten des Systems zu initialisieren und Quartz zu zwingen, den Scheduler auf nur einem Server des Clusters zu starten. Zunächst muss der automatische Start deaktiviert werden, indem explizit im Kontext deklariert wird:

    Zweitens müssen Sie irgendwo synchronisieren und den Scheduler nur starten, wenn Sie es geschafft haben, die Sperre zu erfassen und dann auf den nächsten Moment ihrer Erfassung zu warten (falls der vorherige Server, der die Sperre erfasst hat, fehlgeschlagen ist oder aus irgendeinem Grund die Sperre losgelassen hat). Dies kann in Spring auf verschiedene Arten implementiert werden, z. B. über den ApplicationListener, mit dem Sie Kontext-Trigger-Ereignisse verarbeiten können:

    Wir erhalten die folgende Implementierung der Scheduler-Initialisierungsklasse:
    Versteckter Text
    public class HazelcastQuartzSchedulerStartupListener implements ShutdownPrepared, ApplicationListener {
        public static final String DEFAULT_QUARTZ_LOCK = "defaultQuartzLock";
        protected volatile boolean initialized = false;
        Logger log = LoggerFactory.getLogger(getClass());
        Lock lock;
        protected volatile boolean initialized = false;
        protected String lockName;
        protected HazelcastInstance hazelcastInstance;
        protected QuartzComponent quartzComponent;
        public HazelcastQuartzSchedulerStartupListener() {
            super();
            log.info("HazelcastQuartzSchedulerStartupListener created");
        }
        public void setLockName(final String lockName) {
            this.lockName = lockName;
        }
        public synchronized Lock getLock() {
            if (lock == null) {
                lock = hazelcastInstance.getLock(lockName != null ? lockName : DEFAULT_QUARTZ_LOCK);
            }
            return lock;
        }
        @Override
        public void prepareShutdown(boolean forced) {
            unlock();
        }
        @Required
        public void setQuartzComponent(QuartzComponent quartzComponent) {
            this.quartzComponent = quartzComponent;
        }
        @Required
        public void setHazelcastInstance(HazelcastInstance hazelcastInstance) {
            this.hazelcastInstance = hazelcastInstance;
        }
        @Override
        public synchronized void onApplicationEvent(ApplicationEvent event) {
            if (initialized) {
                return;
            }
            try {
                while (true) {
                    try {
                        getLock().lock();
                        log.warn("This node is now the master Quartz!");
                        try {
                            quartzComponent.startScheduler();
                        } catch (Exception e) {
                            unlock();
                            throw new RuntimeException(e);
                        }
                        return;
                    } catch (OperationTimeoutException e) {
                        log.warn("This node is not the master Quartz and failed to wait for the lock!");
                    }
                }
            } catch (Exception e) {
                log.error("Error while trying to wait for the lock from Hazelcast!", e);
            }
        }
        private synchronized void unlock() {
            try {
                getLock().unlock();
            } catch (IllegalStateException e) {
                log.warn("Exception while trying to unlock quartz lock: Hazelcast instance is already inactive!");
            } catch (Exception e) {
                log.warn("Exception during the unlock of the master Quartz!", e);
            }
        }
    }
    

    Auf diese Weise können wir regelmäßige Aufgaben verwenden , die von Camel empfohlen werden und die verteilte Laufzeit berücksichtigen. Zum Beispiel so:

    Finite-State-Maschine


    Zusätzlich zu einfachen Aggregationsmethoden (zum Beispiel das Zählen von Summen) mussten wir häufig auch den Status von Aggregatoren in Abhängigkeit von eingehenden Nachrichten ändern, um beispielsweise immer den aktuellen Status eines abgeschlossenen Tests zu speichern. Um diese Funktion zu implementieren, sind Zustandsautomaten gut geeignet .. Stellen Sie sich vor, wir haben einen gewissen Stand des Tests. Zum Beispiel TestPassedState. Wenn wir die TestFailed-Nachricht für diesen Test erhalten, müssen wir den Status des Aggregators auf TestFailedState ändern und wenn wir TestPassed erneut auf TestPassedState erhalten. Und so weiter bis zur Unendlichkeit. Basierend auf diesen Übergängen können einige Schlussfolgerungen gezogen werden, zum Beispiel, wenn der Übergang TestPassed -> TestFailed auftritt, ist es notwendig, alle interessierten Parteien zu benachrichtigen, dass der Test gebrochen wurde. Und wenn es einen umgekehrten Übergang gibt, dann sage ihnen im Gegenteil, dass alles gut geworden ist.
    Bild

    Bei der Auswahl der Optionen für die Implementierung einer solchen Aggregationsstrategie kamen wir zu dem Schluss, dass ein bestimmtes, an die Realitäten der Nachrichtenverarbeitung angepasstes Zustandsmaschinenmodell erforderlich ist. Erstens sind Nachrichten, die am Eingang von Aggregatoren empfangen werden, eine bestimmte Menge von Objekten. Jedes Ereignis hat seinen eigenen Typ und fällt daher leicht in Klassen in Java. Um die Arten von Ereignissen zu beschreiben, verwenden wir das xsd-Schema, nach dem wir mit xjc eine Menge von Klassen generieren. Diese Klassen können in xml und json mit jaxb einfach serialisiert und deserialisiert werden. Hazelcast-Zustände werden auch durch eine Reihe von Klassen dargestellt, die von xsd generiert werden. Daher mussten wir eine Implementierung einer Zustandsmaschine finden, die es einfach machte, Übergänge zwischen Zuständen basierend auf dem Typ der Nachricht und dem Typ des aktuellen Zustands zu handhaben. Und ich wollte auch Damit werden diese Übergänge deklarativ und nicht zwingend wie in vielen ähnlichen Bibliotheken gesetzt. Wir haben keine einfache Implementierung dieser Funktionalität gefunden und beschlossen, unsere eigene zu schreiben, um unseren Bedürfnissen Rechnung zu tragen und die Grundlage für die Verarbeitung von Nachrichten auf dem Weg zu Camel zu schaffen.

    Eine kleine Bibliothek, die unsere Bedürfnisse erkennt, heißt Yatomata (nach den Worten Yet Another auTomata) und ist auf github verfügbar .
    Es wurde beschlossen, das FSM-Modell etwas zu vereinfachen - zum Beispiel wird der Kontext durch das Objekt des aktuellen Status festgelegt, die Nachricht speichert auch einige Daten. Übergänge werden in diesem Fall jedoch nur von Zustands- und Nachrichtentypen bestimmt. Eine Zustandsmaschine ist für eine Klasse definiert, die als Aggregator verwendet wird. Dazu wird die Klasse mit der Anmerkung @FSM markiert . Hierfür werden ein Anfangszustand (Start) und eine Reihe von Übergängen definiert, von denen einige die Aggregation stoppen (Stop = True) und den akkumulierten Zustand automatisch weiter entlang der Route senden.
    Übergangssatz, der durch die @ Transitions-Annotation deklariert wurdeund ein Array von @Transit- Annotationen , in denen Sie jeweils eine Reihe von Anfangszuständen (von), Endzustand (bis) und eine Reihe von Ereignissen angeben können, durch die dieser Übergang aktiviert wird (ein). Außerdem können Sie angeben, ob dieser Zustand das Ende der Maschine ist (Stopp). . Für die Verarbeitung von Übergängen stehen die Annotationen @OnTransit , @BeforeTransit und auch @AfterTransit zur Verfügung , mit denen Sie öffentliche Methoden innerhalb der Klasse markieren können. Diese Methoden werden aufgerufen, wenn ein passender Übergang gefunden wird, der mit seiner Signatur übereinstimmt.
    @FSM(start = Undefined.class)
    @Transitions({
          @Transit(on = TestPassed.class, to = TestPassedState.class),
          @Transit(on = TestFailed.class, to = TestFailedState.class),
          @Transit(stop = true, on = TestExpired.class),
    })
    public class TestStateFSM {
          @OnTransit
          public void onTestFailed(State oldState, TestFailedState newState, TestFailed event){}
          @OnTransit
          public void onTestPassed(State oldState, TestPassedState newState, TestPassed event){}
        }
    

    Die Arbeit mit der Zustandsmaschine ist wie folgt:
    Yatomata fsm = new FSMBuilder(TestStateFSM.class).build();
    fsm.getCurrentState();       // returns instance of Undefined
    fsm.isStopped();             // returns false
    fsm.getFSM();                // returns instance of TestStateFSM
    fsm.fire(new TestPassed());  // returns instance of TestPassedState
    fsm.fire(new TestFailed());  // returns instance of TestFailedState
    fsm.fire(new TestExpired()); // returns instance of TestFailedState
    fsm.isStopped();             // returns true
    

    Durch die Implementierung der AggregationStrategy- Schnittstelle haben wir FSMAggregationStrategy erstellt, das im Kontext von Spring wie folgt deklariert wird:

    Die einfachste Implementierung der Aggregationsstrategie unter Verwendung dieser Zustandsmaschine könnte folgendermaßen aussehen:
    Versteckter Text
    public class FSMAggregationStrategy implements AggregationStrategy {
        private final Yatomata fsmEngine;
        public FSMAggregationStrategy(Class fsmClass) {
            this.fsmEngine = new FSMBuilder(fsmClass).build();
        }
        @Override
        public Exchange aggregate(Exchange state, Exchange message) {
            Object result = state == null ? null : state.getIn().getBody();
            try {
                Object event = message.getIn().getBody();
                Object fsm = fsmEngine.getFSM();
                result = fsmEngine.fire(event);            
            } catch (Exception e) {
                logger.error(fsm + " error", e);
            }
            if (result != null) {
                message.getIn().setBody(result);
            }
            return message;
        }
        public boolean isCompleted() {
            return fsmEngine.isCompleted();
        }
    }
    


    Schlussfolgerungen


    Diese Techniken ermöglichten es uns, mehrere horizontal skalierbare Dienste für verschiedene Zwecke zu implementieren. Apache Camel zeigte sich von seiner besten Seite und erfüllte seine Erwartungen. Die Deklarativität ist mit einer hohen Flexibilität verbunden, die insgesamt eine hervorragende Skalierung von Integrationsanwendungen mit einem Minimum an Aufwand für die Unterstützung und das Hinzufügen neuer Funktionen ermöglicht.

    Jetzt auch beliebt: