Abonnieren Sie Kafka über HTTP oder wie Sie Ihre eigenen Web-Hooks vereinfachen können

Es gibt viele Möglichkeiten, Nachrichten aus Pub-Sub-Systemen zu verarbeiten: Verwenden eines separaten Dienstes, Isolieren eines isolierten Prozesses, Orchestrieren eines Prozess- / Thread-Pools, komplexe IPCs, Poll-over-Http und viele andere. Heute möchte ich über die Verwendung von Pub-Sub über HTTP und über meinen speziell dafür geschriebenen Service sprechen.

In einigen Fällen ist die Verwendung eines vorgefertigten HTTP-Service-Backends eine ideale Lösung für die Verarbeitung von Nachrichtenwarteschlangen:

  1. Balancieren aus der Box. In der Regel befindet sich das Backend bereits hinter dem Balancer und verfügt über eine ladefähige Infrastruktur, die die Arbeit mit Nachrichten erheblich vereinfacht.
  2. Verwenden Sie einen regulären REST-Controller (eine beliebige HTTP-Ressource). Durch den Verbrauch von Nachrichten über HTTP werden die Kosten für die Implementierung von Benutzern für verschiedene Sprachen auf ein Minimum reduziert, wenn das Backend bunt ist.
  3. Vereinfachen Sie die Verwendung anderer Web-Hooks. Jetzt unterstützt fast jeder Dienst (Jira, Gitlab, Mattermost, Slack ...) irgendwie Webhooks, um mit der Außenwelt zu interagieren. Sie können das Leben einfacher machen, wenn Sie der Queue die Funktionen des HTTP-Dispatchers beibringen.

Dieser Ansatz hat Nachteile:

  1. Sie können die Leichtigkeit der Lösung vergessen. HTTP ist ein umfangreiches Protokoll, und die Verwendung von Frameworks auf Seiten des Verbrauchers führt sofort zu einer Erhöhung der Latenz und der Last.
  2. Die Stärken des Poll-Ansatzes durch Push-Schwächen entziehen.
  3. Das Verarbeiten von Nachrichten von denselben Dienstinstanzen, die Clients verarbeiten, kann die Reaktionsfähigkeit beeinträchtigen. Dies ist irrelevant, da es durch Balancing und Isolation behandelt wird.

Ich habe die Idee als Queue-Over-Http-Dienst implementiert, auf den später noch eingegangen wird. Das Projekt wurde in Kotlin mit Spring Boot 2.1 geschrieben. Als Broker ist derzeit nur Apache Kafka verfügbar.

Weiter wird in dem Artikel davon ausgegangen, dass der Leser mit Kafka vertraut ist und sich mit Commits (Commits) und Offsets (Offset) von Nachrichten, den Prinzipien von Gruppen (Group) und Consumern (Consumer) auskennt und auch weiß, wie sich die Partition (Partition) vom Thema (Topic) unterscheidet. . Wenn es Lücken gibt, sollten Sie sich mit diesem Abschnitt der Kafka-Dokumentation vertraut machen, bevor Sie weiterlesen.

Inhalt



Bewertung


Queue-Over-Http ist ein Dienst, der als Vermittler zwischen dem Nachrichtenbroker und dem endgültigen HTTP-Anbieter fungiert (der Dienst vereinfacht die Unterstützung für das Senden von Nachrichten an Konservatoren auf andere Weise, z. B. verschiedene * RPCs). Momentan sind nur Abonnements, Abmeldungen und das Durchsuchen der Liste der Consumer-Designer verfügbar. Das Senden von Nachrichten an den Broker (Produkt) über HTTP wurde noch nicht implementiert, da die Reihenfolge der Nachrichten ohne besondere Unterstützung des Herstellers nicht garantiert werden kann.

Die Schlüsselfigur des Services ist der Consumer Manager, der bestimmte Partitionen oder einfach Themen abonnieren kann (das Themenmuster wird unterstützt). Im ersten Fall ist der automatische Abgleich der Partitionen deaktiviert. Nach dem Abonnieren empfängt die angegebene HTTP-Ressource Nachrichten von den zugewiesenen Kafka-Partitionen. Architektonisch ist jeder Abonnent dem nativen Java-Client von Kafka zugeordnet.

Unterhaltsame Geschichte über KafkaConsumer
У Kafka есть замечательный Java-клиент, который умеет многое. Его использую в адаптере очереди для получения сообщений от брокера и дальнейшей отправки в локальные очереди сервиса. Стоит оговориться, что клиент работает исключительно в контексте одного потока.

Идея адаптера простая. Запускаем в одном потоке, пишем простейший планировщик нативных клиентов, делая упор на уменьшение latency. То есть пишем что-то похожее:

while (!Thread.interrupted()) {
    var hasWork = false
    for (consumer in kafkaConsumers) {
        val queueGroup = consumers[consumer] ?: continue
        invalidateSubscription(consumer, queueGroup)
        val records = consumer.poll(Duration.ZERO)
        /* здесь раскидываем в локальные очереди */
        if (!records.isEmpty) {
            hasWork = true
        }
    }
    val committed = doCommit()
    if (!hasWork && committed == 0) {
        // засыпаем, если нечего делать
        Thread.sleep(1)
    }
}

Казалось бы, всё замечательно, latency минимальный даже при наличии десятков консюмеров. На практике получилось, что KafkaConsumer к такому режиму эксплуатации совершенно не готов и даёт allocation rate около 1.5 МБ/сек в простое. При 100 консюмерах allocation rate достигает 150 МБ/сек и заставляет GC чаще вспоминать о приложении. Конечно, весь этот мусор находится в young области, GC вполне справляется с этим, но всё же, решение не идеально.

Очевидно, нужно идти типичным для KafkaConsumer путём и каждого подписчика размещаю теперь в своём потоке. Это даёт оверхед по памяти и диспетчеризации, но другого выхода нет.

Переписываю код сверху, убирая внутренний цикл и меняя Duration.ZERO на Duration.ofMillis(100). Получается хорошо, allocation rate падает до приемлемых 80-150 КБ/сек на одного консюмера. Однако, Poll с таймаутом в 100мс задерживает всю очередь коммитов на эти самые 100мс, а это неприемлемо много.

В процессе поиска решений проблемы вспоминаю про KafkaConsumer::wakeup, который бросает WakeupException и прерывает любую блокирующую операцию на консюмере. С этим методом путь к low-latency прост: когда приходит новый запрос на коммит, кладём его в очередь, а на нативном консюмере вызываем wakeup. В рабочем цикле ловим WakeupException и идём коммитить то, что накопилось. За передачу управления с помощью исключений нужно сразу давать по рукам, но раз уж по-другому никак…

Оказывается, и этот вариант далёк от совершенства, так как любая операция на нативном консюмере теперь выкидывает WakeupException, в том числе, сам коммит. Обработка этой ситуации захламит код флагом, разрешающим делать wakeup.

Прихожу к выводу, что было бы неплохо модифицировать метод KafkaConsumer::poll, чтобы он мог прерываться штатно, по дополнительному флагу. В итоге, был рождён франкенштейн из рефлексии, который в точности копирует оригинальный метод poll, добавляя выход из цикла по флагу. Этот флаг устанавливается отдельным методом interruptPoll, который, к тому же, на селекторе клиента вызывает wakeup, чтобы снять блокировку потока на операции ввода-вывода.

Реализовав таким образом клиент, получаю скорость реакции с момента поступления запроса на коммит до его обработки до 100 микросекунд, и отличный latency на выборку сообщений из брокера, что вполне устраивает.

Jede Partition wird durch eine separate lokale Warteschlange dargestellt, in die der Adapter Nachrichten vom Broker schreibt. Ein Worker sammelt Nachrichten daraus und schickt sie zur Ausführung, dh zum Senden über HTTP.

Der Dienst unterstützt die Stapelverarbeitung von Nachrichten, um den Durchsatz zu erhöhen. Beim Abonnieren können Sie concurrencyFactorjedes Thema angeben (gilt für jede zugewiesene Partition unabhängig). Dies bedeutet beispielsweise, concurrencyFactor=1000dass 1000 Nachrichten gleichzeitig mit HTTP-Anforderungen an den Consumer gesendet werden können. Sobald alle Nachrichten aus dem Paket eindeutig vom Designer ausgearbeitet wurden, entscheidet der Dienst über das nächste Commit des letzten Satzes in der Reihenfolge der Nachricht an Kafka. Daher ist der zweite Wert concurrencyFactordie maximale Anzahl wiederaufbereiteter Nachrichten, die der Verbraucher im Falle eines Kafka- oder Queue-Over-Http-Absturzes verarbeitet.

Um Verzögerungen zu reduzieren, verfügt die Warteschlange über loadFactor = concurrencyFactor * 2die Möglichkeit, doppelt so viele Nachrichten von einem Broker zu lesen, als gesendet werden können. Da Autocommit auf dem nativen Client deaktiviert ist, verstößt ein solches System nicht gegen At-Least-Once-Garantien.
Ein hoher Wert concurrencyFactorerhöht den Durchsatz der Warteschlange (Durchsatz), indem die Anzahl der Commits reduziert wird, die im schlimmsten Fall bis zu 10 ms dauern. Gleichzeitig steigt die Belastung des Verbrauchers.

Die Reihenfolge des Versendens von Nachrichten innerhalb eines Pakets ist nicht garantiert, kann jedoch bei Installation erreicht werden concurrencyFactor=1.

Commits


Commits sind ein wichtiger Teil des Services. Wenn das nächste Datenpaket bereit ist, wird der Versatz der letzten Nachricht aus dem Paket sofort an Kafka übergeben, und erst nach einem erfolgreichen Commit steht das nächste Paket für die Verarbeitung zur Verfügung. Dies ist häufig nicht ausreichend und erfordert eine automatische Übergabe. Dazu gibt es einen Parameter autoCommitPeriodMs, der wenig mit der klassischen Autocommit-Periode für native Clients zu tun hat, die die letzte aus der Partition gelesene Nachricht festschreiben. Stell dir das vorconcurrencyFactor=10. Der Dienst hat alle 10 Nachrichten gesendet und wartet, bis alle Nachrichten bereit sind. Die Verarbeitung von Nachricht 3 wird zuerst abgeschlossen, dann Nachricht 1 und dann Nachricht 10. Zu diesem Zeitpunkt tritt der Zeitpunkt der automatischen Übergabe auf. Es ist wichtig, die At-Least-Once-Semantik nicht zu brechen. Daher können Sie nur die erste Nachricht festschreiben, d. H. Den Offset 2, da nur die Nachricht zu diesem Zeitpunkt erfolgreich verarbeitet wurde. Bis zum nächsten automatischen Commit werden die Nachrichten 2, 5, 6, 4 und 8 verarbeitet. Jetzt müssen Sie nur noch den Offset 7 festschreiben usw. Autocommit hat fast keinen Einfluss auf den Durchsatz.

Fehlerbehandlung


Im normalen Betriebsmodus sendet der Dienst einmal eine Nachricht an den Master. Wenn aus irgendeinem Grund ein Fehler 4xx oder 5xx verursacht wurde, sendet der Dienst die Nachricht erneut und wartet auf die erfolgreiche Verarbeitung. Die Zeit zwischen Versuchen kann als separater Parameter konfiguriert werden.

Es ist auch möglich, die Anzahl der Versuche festzulegen, nach denen die Nachricht als bearbeitet markiert wird, wodurch das wiederholte Senden unabhängig vom Antwortstatus gestoppt wird. Ich empfehle nicht, sensible Daten zu verwenden. Versagen von Verbrauchern sollten immer manuell korrigiert werden. Haftnachrichten können durch die Dienstprotokolle überwacht werden und der Status der Antwort des Verbraucherverwalters überwacht werden.

über kleben
Обычно, HTTP-сервер, отдавая 4xx или 5xx статус ответа, отсылает ещё и заголовок Connection: close. Закрытое таким образом TCP-соединение остаётся в статусе TIME_WAITED, пока не будет подчищено операционной системой спустя какое-то время. Проблема в том, что такие соединения занимают целый порт, который невозможно переиспользовать до освобождения. Это может вылиться в отсутствие свободных портов на машине для установки TCP-соединения и сервис будет сыпаться исключениями в логи на каждую отправку. На практике, на Windows 10 порты кончаются спустя 10-20 тысяч отправок ошибочных сообщений в течение 1-2 минут. В стандартном режиме работы это не проблема.

Nachrichten


Jede vom Broker abgerufene Nachricht wird an HTTP an die vom Abonnenten während des Abonnements angegebene Ressource gesendet. Die Nachricht wird standardmäßig von einer POST-Anforderung im Hauptteil gesendet. Dieses Verhalten kann durch Angabe einer anderen Methode geändert werden. Wenn die Methode das Senden von Daten im Hauptteil nicht unterstützt, können Sie den Namen des Zeichenfolgeparameters angeben, in dem die Nachricht gesendet wird. Außerdem können Sie beim Abonnieren zusätzliche Header angeben, die zu jeder Nachricht hinzugefügt werden. Dies ist praktisch für die grundlegende Autorisierung mithilfe von Token. Zu jeder Nachricht werden Kopfzeilen mit der Kennung des Designers des Konsumenten, dem Thema und der Partition, von der die Nachricht gelesen wurde, der Nachrichtennummer, dem Partitionsschlüssel (falls vorhanden) und dem Namen des Brokers selbst hinzugefügt.

Leistung


Zur Leistungsbewertung habe ich einen PC (Windows 10, OpenJDK-11 (G1 ohne Abstimmung), i7-6700K, 16GB) verwendet, auf dem der Dienst und der Laptop (Windows 10, i5-8250U, 8GB) ausgeführt wurden, auf dem der Nachrichtenproduzent sich drehte, HTTP -resource konsyumera und Kafka mit Standardeinstellungen. Der PC ist mit dem Router über eine kabelgebundene Verbindung von 1 Gbit / s verbunden, ein Laptop mit 802.11ac. Der Produzent schreibt Nachrichten mit einer Länge von 110 Bytes alle 100 ms innerhalb von 1000 Sekunden in die angegebenen Themen, die von Consumer- concurrencyFactor=500Designern abonniert werden ( Autocommit ist deaktiviert), und zwar aus verschiedenen Gruppen. Der Stand ist alles andere als ideal, aber Sie können sich ein Bild machen.

Der wichtigste gemessene Parameter ist der Einfluss des Dienstes auf die Latenzzeit.

Sei:
- t q der Zeitstempel des Dienstes, der die Nachricht vom nativen Client empfängt
- d t0- Zeit zwischen t q und Zeitpunkt des Sendens einer Nachricht von einer lokalen Warteschlange an einen Pool von Executoren
- d t - Zeit zwischen t q und dem Zeitpunkt des Sendens einer HTTP-Anforderung. D t ist der Einfluss des Dienstes auf die Latenzzeit der Nachricht.

Während der Messungen wurden die folgenden Ergebnisse erzielt (C - Consumer, T - Themen, M - Nachrichten):



In der Standardbetriebsart hat der Dienst selbst kaum Auswirkungen auf die Latenz und der Speicherverbrauch ist minimal. Die Maximalwerte von d t (etwa 60 ms) nicht angezeigt, da sie auf der GC Arbeit abhängen, anstatt sich auf den Dienst selbst. Die Verringerung der Streuung der Maximalwerte kann dazu beitragen, die GC-Optimierung zu optimieren oder den G1 bei Shenandoah zu ersetzen.

Alles ändert sich drastisch, wenn der Konsument den Nachrichtenfluss aus der Warteschlange nicht bewältigt und der Dienst den Trotting-Modus aktiviert. In diesem Modus steigt der Speicherverbrauch an, da die Antwortzeit auf Abfragen dramatisch ansteigt, was eine rechtzeitige Säuberung der Ressourcen verhindert. Die Auswirkung auf die Latenz bleibt dabei auf dem Niveau der vorherigen Ergebnisse, und hohe dt-Werte werden durch das Vorladen von Nachrichten in die lokale Warteschlange verursacht.

Leider ist ein Test bei höherer Last nicht möglich, da das Notebook bereits bei 1300 RPS verbogen ist. Wenn jemand bei der Organisation von Messungen bei hohen Belastungen helfen kann, stelle ich Ihnen gerne eine Baugruppe für Tests zur Verfügung.

Demonstration


Wir wenden uns nun der Demonstration zu. Dafür brauchen wir:

  • Kafka-Broker, bereit zu gehen. Ich werde die von 192.168.99.100:9092 erhobene Instanz von Bitnami nehmen.
  • Eine HTTP-Ressource, die Nachrichten erhalten soll. Zur Verdeutlichung nahm ich die Web-Hooks von Slack.

Zunächst müssen Sie den Service Queue-Over-Http erhöhen. Erstellen Sie dazu ein leeres Verzeichnis mit application.ymlfolgendem Inhalt:

spring:
  profiles: default
logging:
  level:
    com:
      viirrtus:
        queueOverHttp: DEBUG
app:
  persistence:
    file:
      storageDirectory: "persist"
  brokers:
    - name: "Kafka"
      origin: "kafka"
      config:
        bootstrap.servers: "192.168.99.100:9092"

Hier geben wir dem Dienst die Verbindungsparameter eines bestimmten Brokers sowie den Speicherort der Abonnenten an, damit sie zwischen den Starts nicht verloren gehen. In `app.brokers []. Config` können Sie beliebige Verbindungsparameter angeben, die vom nativen Kafka-Client unterstützt werden. Die vollständige Liste finden Sie hier .

Da die Konfigurationsdatei von Spring verarbeitet wird, können Sie dort viele interessante Dinge schreiben. Anpassen der Protokollierung.

Nun starten wir den Dienst selbst. Wir benutzen den einfachsten Weg - docker-compose.yml:

version: "2"
services:
  app:
    image: viirrtus/queue-over-http:0.1.3
    restart: unless-stopped
    command: --debug
    ports:
      - "8080:8080"
    volumes:
      - ./application.yml:/application.yml
      - ./persist:/persist

Wenn diese Option nicht zu Ihnen passt, können Sie den Dienst aus der Quelle erstellen. Montageanweisungen in der Readme-Datei des Projekts, auf die am Ende des Artikels verwiesen wird.

Der nächste Schritt ist die Registrierung des ersten Teilnehmers. Dazu müssen Sie eine HTTP-Anforderung an den Dienst mit einer Beschreibung des Verbrauchers ausführen:

POST localhost:8080/broker/subscription
Content-Type: application/json
{
  "id": "my-first-consumer",
  "group": {
    "id": "consumers"
  },
  "broker": "Kafka",
  "topics": [
    {
      "name": "slack.test",
      "config": {
        "concurrencyFactor": 10,
        "autoCommitPeriodMs": 100
      }
    }
  ],
  "subscriptionMethod": {
    "type": "http",
    "delayOnErrorMs": 1000,
    "retryBeforeCommit": 10,
    "uri": "<slack-wh-uri>",
    "additionalHeaders": {
      "Content-Type": "application/json"
    }
  }
}

Wenn alles gut gegangen ist, wird die Antwort fast mit dem gleichen Inhalt gesendet.

Lassen Sie uns jeden Parameter noch einmal durchgehen:

  • Consumer.id - unsere Abonnenten-ID
  • Consumer.group.id - Gruppenkennung
  • Consumer.broker - Geben Sie an, welchen Service-Broker Sie abonnieren müssen
  • Consumer.topics[0].name - der Name des Themas, von dem wir Nachrichten erhalten möchten
  • Consumer.topics[0].config. concurrencyFactor - maximale Anzahl gleichzeitig gesendeter Nachrichten
  • Consumer.topics[0].config. autoCommitPeriodMs - Zeitraum des erzwungenen Festschreibens abgeschlossener Nachrichten
  • Consumer.subscriptionMethod.type- Abonnementtyp. Derzeit ist nur HTTP verfügbar.
  • Consumer.subscriptionMethod.delayOnErrorMs - Zeit vor dem erneuten Senden der Nachricht, die fehlerhaft beendet wurde
  • Consumer.subscriptionMethod.retryBeforeCommit- Die Anzahl der Versuche, die Fehlermeldung erneut zu senden. Bei 0 wird die Nachricht bis zur erfolgreichen Verarbeitung durchlaufen. In unserem Fall ist die Garantie der vollen Lieferung nicht so wichtig wie die Flusskonstanz.
  • Consumer.subscriptionMethod.uri - Ressource, an die Nachrichten gesendet werden
  • Consumer.subscriptionMethod.additionalHeader- zusätzliche Header, die mit jeder Nachricht gesendet werden. Beachten Sie, dass sich in jeder Nachricht JSON befindet, damit Slack die Anforderung richtig interpretieren kann.

In dieser Anforderung wird die Angabe der HTTP-Methode ausgelassen, da standardmäßig POST und Slack in Ordnung sind.

Ab diesem Zeitpunkt überwacht der Dienst die geplanten Partitionen des Themas slack.testauf neue Nachrichten.

Um Nachrichten zum Thema zu schreiben, verwende ich die in Kafka integrierten Dienstprogramme, die sich im /opt/bitnami/kafka/binlaufenden Kafka-Image befinden (der Speicherort der Dienstprogramme in anderen Kafka-Instanzen kann sich unterscheiden):

kafka-console-producer.sh --broker-list localhost:9092 --topic slack.test
> {“text”: “Hello!”}

Gleichzeitig informiert Slack über die neue Nachricht:



Um den Verbraucher abzubestellen, genügt es, eine POST-Anforderung für `broker / abbestellen` mit demselben Inhalt wie beim Abonnement zu stellen.

Fazit


Derzeit ist nur eine Basisfunktionalität implementiert. Als Nächstes planen wir, das Batching zu verbessern, versuchen, Exactly-Once-Semantik zu implementieren, die Möglichkeit hinzuzufügen, Nachrichten über HTTP an den Broker zu senden, und vor allem Unterstützung für andere populäre Pub-Sub.

Der Queue-Over-Http-Dienst wird derzeit aktiv entwickelt. Version 0.1.3 ist für Tests auf Entwicklungs- und Bühnenständen ziemlich stabil. Die Leistung wurde unter Windows 10, Debian 9 und Ubuntu 18.04 getestet. Verwenden Sie es auf eigene Gefahr. Wenn Sie bei der Entwicklung mithelfen oder dem Service ein Feedback geben möchten - willkommen im Github- Projekt.

Jetzt auch beliebt: