Spark Streaming und Kafka Integration

Published on July 13, 2018

Spark Streaming und Kafka Integration

Ursprünglicher Autor: Data Flair
  • Übersetzung
Hallo Kollegen! Wir möchten Sie daran erinnern, dass wir vor nicht allzu langer Zeit ein Buch über Spark veröffentlicht haben , und gerade das letzte Korrekturlesebuch über Kafka vergeht .


Wir hoffen, dass diese Bücher erfolgreich genug sein werden, um das Thema fortzusetzen, beispielsweise um Literatur zu Spark Streaming zu übersetzen und zu veröffentlichen. Wir wollten mit Kafka heute eine Übersetzung über die Integration dieser Technologie anbieten.

1. Begründung

Apache Kafka + Spark Streaming ist eine der besten Kombinationen für die Erstellung von Echtzeitanwendungen. In diesem Artikel werden die Details dieser Integration ausführlich beschrieben. Zusätzlich betrachten wir das Beispiel von Spark Streaming-Kafka. Anschließend besprechen wir den „Ansatz mit dem Empfänger“ und die Möglichkeit, Kafka und Spark Streaming direkt zu integrieren. Kommen Sie zur Integration von Kafka und Spark Streaming.



2. Integration von Kafka und Spark Streaming

Bei der Integration von Apache Kafka und Spark Streaming gibt es zwei mögliche Ansätze für die Konfiguration von Spark Streaming, um Daten von Kafka zu erhalten - d. H. zwei Ansätze zur Integration von Kafka und Spark Streaming. Erstens können Sie die Kafka-Empfänger und das API-Kafka auf hoher Ebene verwenden. Der zweite (neuere) Ansatz ist die Arbeit ohne Empfänger. Für beide Ansätze gibt es unterschiedliche Programmiermodelle, die sich beispielsweise in Bezug auf Leistung und semantische Garantien unterscheiden.



Betrachten Sie diese Ansätze genauer

a. Empfängerorientierter Ansatz

In diesem Fall werden die Daten vom Empfänger empfangen. Mit der von Kafka bereitgestellten High-Level-API für den Verbrauch implementieren wir den Empfänger. Außerdem werden die Daten in den Spark Contractors gespeichert. In Kafka - Spark Streaming werden dann Aufgaben gestartet, in denen die Daten verarbeitet werden.

Bei diesem Ansatz besteht jedoch immer noch die Gefahr eines Datenverlusts im Fehlerfall (bei der Standardkonfiguration). In Kafka - Spark Streaming ist daher zusätzlich ein Forward-Write-Protokoll erforderlich, um Datenverlust zu vermeiden. Daher werden alle von Kafka empfangenen Daten synchron im Vorwärtsschreibprotokoll im verteilten Dateisystem gespeichert. Deshalb können auch nach einem Systemausfall alle Daten wiederhergestellt werden.

Als Nächstes überlegen wir, wie Sie diesen Ansatz bei der Verwendung von Empfängern in einer Anwendung mit Kafka - Spark Streaming verwenden können.

ich Verknüpfen

Jetzt verknüpfen wir unsere Streaming-Anwendung mit dem nächsten Artefakt für Scala / Java-Anwendungen, indem wir die Projektdefinitionen für SBT / Maven verwenden.

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.0

Bei der Bereitstellung unserer Anwendung müssen wir jedoch die oben genannte Bibliothek und ihre Abhängigkeiten hinzufügen. Dies ist für Python-Anwendungen erforderlich.

ii. Programmierung

Als Nächstes erstellen Sie den Eingabestrom, DStreamindem Sie ihn KafkaUtilsin den Code der Streaming-Anwendung importieren :

import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

Darüber hinaus können Sie mit den createStream-Varianten Schlüsselklassen und Werteklassen sowie entsprechende Klassen für deren Dekodierung angeben.

iii. Bereitstellung

Wie bei jeder Spark-Anwendung wird zur Ausführung der Befehl spark-submit verwendet. Die Details unterscheiden sich jedoch geringfügig in Scala / Java-Anwendungen und in Python-Anwendungen.

Darüber hinaus ist es mit der Hilfe, zu der –packagesSie die spark-streaming-Kafka-0-8_2.11Abhängigkeiten direkt hinzufügen können, spark-submitfür Anwendungen in Python nützlich, in denen es nicht möglich ist, Projekte mit Hilfe von SBT / Maven zu verwalten.

./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 ...

Sie können das Maven-JAR-Archiv auch aus dem Maven- spark-streaming-Kafka-0-8-assemblyRepository herunterladen . Dann fügen Sie es spark-submitmit - hinzu jars.

b. Direkter Ansatz (ohne Empfänger)

Nach dem Ansatz mit dem Einsatz von Empfängern wurde ein neuer Ansatz entwickelt - der „direkte“. Es bietet zuverlässige End-to-End-Garantien. In diesem Fall fragen wir Kafka regelmäßig nach dem Offset der gelesenen Daten (Offsets) für jedes Thema / Abschnitt und organisieren nicht die Übermittlung der Daten durch Empfänger. Darüber hinaus wird die Größe des Lesefragments bestimmt, es ist für die ordnungsgemäße Verarbeitung jedes Pakets erforderlich. Schließlich wird eine einfache, verbrauchsfähige API verwendet, um Datenbereiche von Kafka mit angegebenen Offsets zu lesen, insbesondere wenn Datenverarbeitungsaufgaben gestartet werden. Der gesamte Vorgang ist wie das Lesen von Dateien aus dem Dateisystem.

Hinweis: Diese Funktion wurde in Spark 1.3 für Scala und die Java-API sowie in Spark 1.4 für die Python-API veröffentlicht.

Lassen Sie uns nun diskutieren, wie Sie diesen Ansatz in unserer Streaming-Anwendung anwenden können.
Die Consumer-API wird unter folgendem Link genauer beschrieben:

Apache Kafka Consumer | Beispiele für Kafka Consumer

i. Binding

Richtig, dieser Ansatz wird nur in Scala / Java-Anwendungen unterstützt. Erstellen Sie mit dem folgenden Artefakt das SBT / Maven-Projekt.

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.0

ii. Programmierung

Als Nächstes importieren Sie KafkaUtils und erstellen eine Eingabe DStreamim Code der Streaming-Anwendung:

import org.apache.spark.streaming.kafka._
val directKafkaStream = KafkaUtils.createDirectStream[
[key class], [value class], [key decoder class], [value decoder class] ](
streamingContext, [map of Kafka parameters], [set of topics to consume])

In den Kafka-Parametern müssen Sie entweder metadata.broker.listoder angeben bootstrap.servers. Daher verbrauchen wir standardmäßig Daten ab dem letzten Offset in jedem Kafka-Abschnitt. Wenn Sie jedoch das Lesen vom kleinsten Fragment aus starten möchten, müssen Sie in den Kafka-Parametern die Konfigurationsoption festlegen auto.offset.reset.

Wenn KafkaUtils.createDirectStreamSie mit Optionen arbeiten , können Sie außerdem mit einem beliebigen Offset beginnen. Dann werden wir Folgendes tun, um auf die in jedem Paket verbrauchten Kafka-Fragmente zuzugreifen.

// Храним ссылку на актуальные диапазоны фрагментов, чтобы ее могли использовать и последующие потоки
var offsetRanges = Array.empty[OffsetRange]
directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
...
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
...
}

Wenn wir die Überwachung von Kafka auf Basis von Zookeeper mit Hilfe von Spezialwerkzeugen organisieren möchten, können wir Zookeeper selbst mit ihrer Hilfe aktualisieren.

iii. Bereitstellung

Der Bereitstellungsprozess ähnelt in diesem Fall dem Bereitstellungsprozess in der Version mit dem Empfänger.

3. Vorteile des direkten Ansatzes Der

zweite Ansatz zur Integration von Spark Streaming mit Kafka gewinnt den ersten aus folgenden Gründen:

a. Vereinfachte Parallelität

In diesem Fall müssen Sie nicht mehrere Kafka-Eingabeströme erstellen und zusammenführen. Kafka - Spark Streaming erstellt jedoch so viele RDD-Segmente wie Kafka-Verbrauchssegmente. Alle diese Kafka-Daten werden parallel gelesen. Daher können wir sagen, dass wir eine Eins-zu-Eins-Entsprechung zwischen den Segmenten Kafka und RDD haben werden. Dieses Modell ist klarer und einfacher zu konfigurieren.

b. Wirksamkeit

Um den Datenverlust im ersten Ansatz vollständig zu vermeiden, mussten Informationen im Protokoll des führenden Datensatzes gespeichert und anschließend repliziert werden. Dies ist in der Tat ineffizient, da die Daten zweimal repliziert werden: zum ersten Mal von Kafka selbst und zum anderen - durch das proaktive Schreibprotokoll. Bei dem zweiten Ansatz wird dieses Problem beseitigt, da es keinen Empfänger gibt, und daher ist auch das Vorwärtsschreibprotokoll nicht erforderlich. Wenn wir eine recht lange Speicherung von Daten in Kafka vorgesehen haben, können Sie Nachrichten direkt aus Kafka wiederherstellen.

c. Genau einmal Semantik

Im Prinzip haben wir die High-Level-Kafka-API für den ersten Ansatz verwendet, um verbrauchte Lesefragmente im Zookeeper zu speichern. Auf diese Weise können jedoch Daten von Kafka verwendet werden. Angenommen, gleichzeitig werden Datenverluste zuverlässig ausgeschlossen, besteht mit einiger Wahrscheinlichkeit eine geringe Wahrscheinlichkeit, dass einzelne Datensätze doppelt verwendet werden. Es dreht sich alles um die Inkonsistenz zwischen dem Mechanismus für die zuverlässige Datenübertragung in Kafka - Spark Streaming und dem Lesen von Fragmenten, die in Zookeeper auftreten. Im zweiten Ansatz verwenden wir daher eine einfache Kafka-API, die nicht auf Zookeeper zurückgreifen muss. Hier werden die gelesenen Fragmente in Kafka - Spark Streaming verfolgt, hierfür werden Kontrollpunkte verwendet. In diesem Fall wird die Inkonsistenz zwischen Spark Streaming und Zookeeper / Kafka beseitigt.

Dementsprechend erhält Spark Streaming auch im Fehlerfall jeden Eintrag einmalig. Hier müssen wir sicherstellen, dass unsere Ausgabeoperation, in der Daten in einem externen Speicher gespeichert werden, entweder eine idempotente oder eine atomare Transaktion ist, in der sowohl Ergebnisse als auch Offsets gespeichert werden. Genau so wird bei der Ableitung unserer Ergebnisse genau einmalige Semantik erreicht.

Allerdings gibt es einen Nachteil: Die Offsets im Zookeeper werden nicht aktualisiert. Daher erlauben die Kafka-basierten Zookeeper-Überwachungstools keinen Fortschritt der Überwachung.
Wir können jedoch immer noch Offsets beantragen, wenn die Verarbeitung auf diese Weise arrangiert wird - wir wenden auf jedes Paket an und aktualisieren Zookeeper selbst.

Das war alles, was wir über die Integration von Apache Kafka und Spark Streaming sprechen wollten. Ich hoffe es hat dir gefallen.