RabbitMQ Tutorial 6 - Remote Procedure Call

Ursprünglicher Autor: RabbitMQ Team
  • Übersetzung
  • Tutorial
In Fortsetzung der fünften Lektion zum Studium der Grundlagen von RabbitMQ veröffentliche ich eine Übersetzung der sechsten Lektion von der offiziellen Website . Alle Beispiele sind in Python geschrieben (unter Verwendung von Pika Version 0.9.8), aber sie können immer noch in den meisten gängigen Sprachen implementiert werden .

In der zweiten Lektion haben wir die Verwendung von Aufgabenwarteschlangen zum Verteilen von ressourcenintensiven Aufgaben auf mehrere Abonnenten untersucht.

Aber was ist, wenn wir die Funktion auf einem Remote-Computer ausführen und auf das Ergebnis warten möchten? Das ist eine ganz andere Geschichte. Dieses Muster ist allgemein als Remote Procedure Call (RPC, im Folgenden als RPC bezeichnet) bekannt.

In diesem Handbuch erstellen wir mit RabbitMQ ein RPC-System, das einen Client und einen skalierbaren RPC-Server enthält. Da wir keine zeitaufwändige Aufgabe haben, die verteilt werden muss, erstellen wir einen einfachen RPC-Server, der Fibonacci-Zahlen zurückgibt.

Kundenschnittstelle


Erstellen Sie eine einfache Clientklasse, um die Verwendung des RPC-Diensts zu veranschaulichen. Diese Klasse enthält eine Aufrufmethode , die RPC-Anforderungen sendet und blockiert, bis eine Antwort eingeht:

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)

RPC-Hinweis

Obwohl RPC ein weit verbreitetes Muster ist, wird es häufig kritisiert. Probleme treten normalerweise auf, wenn der Entwickler nicht genau weiß, welche Funktion er verwendet: lokal oder langsam, ausgeführt über RPC. Ein solches Durcheinander kann zu unvorhersehbarem Systemverhalten führen und den Debug-Prozess unnötig komplex machen. Anstatt die Software zu vereinfachen, kann eine unsachgemäße Verwendung von RPC zu unbeaufsichtigtem und unlesbarem Code führen.

Auf der Grundlage des Vorstehenden können die folgenden Empfehlungen ausgesprochen werden:
  • Stellen Sie sicher, dass ersichtlich ist, welche Funktion jeweils aufgerufen wird: local oder remote;
  • Dokumentieren Sie Ihr System. Machen Sie Abhängigkeiten zwischen Komponenten explizit.
  • Behandle die Bugs. Wie soll der Client reagieren, wenn der RPC-Server längere Zeit nicht reagiert?
  • Verwenden Sie im Zweifelsfall kein RPC. Verwenden Sie nach Möglichkeit eine asynchrone Pipeline anstelle einer blockierenden RPC, wenn die Ergebnisse asynchron an die nächste Verarbeitungsebene übergeben werden.

Ergebnisliste


Im Allgemeinen ist es einfach, RPC über RabbitMQ durchzuführen. Der Client sendet die Anfrage und der Server antwortet auf die Anfrage. Um eine Antwort zu erhalten, muss der Client eine Warteschlange übergeben, um die Ergebnisse mit der Anforderung zu veröffentlichen. Mal sehen, wie es im Code aussieht:

result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue
channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)
# ...какой-то код для чтения ответного сообщения из callback_queue ...

Nachrichteneigenschaften


AMQP verfügt über 14 vordefinierte Nachrichteneigenschaften. Die meisten von ihnen werden äußerst selten verwendet, mit Ausnahme der folgenden:

  • delivery_mode : markiert die Nachricht als "persistent" (mit einem Wert von 2) oder "temporär" (jeder andere Wert). Sie müssen sich diese Eigenschaft in der zweiten Lektion merken .
  • content_type : wird zur Beschreibung des Datenpräsentationsformats (MIME) verwendet. Für das häufig verwendete JSON-Format empfiehlt es sich beispielsweise, diese Eigenschaft auf application / json festzulegen.
  • reply_to : Wird normalerweise verwendet, um die Ergebniswarteschlange anzugeben.
  • correlation_id : Die Eigenschaft wird zum Zuordnen von RPC-Antworten zu Anforderungen verwendet.


Korrelations-ID


In der oben dargestellten Methode wurde vorgeschlagen, für jede RPC-Anforderung eine Antwortwarteschlange zu erstellen. Dies ist etwas redundant, aber zum Glück gibt es einen besseren Weg - lassen Sie uns für jeden Client eine gemeinsame Ergebniswarteschlange erstellen.

Dies wirft eine neue Frage auf, nachdem eine Antwort von dieser Warteschlange empfangen wurde, und es ist nicht klar, welcher Anforderung diese Antwort entspricht. Und hier bietet sich die Eigenschaft correlation_id an . Wir weisen dieser Eigenschaft für jede Anfrage einen eindeutigen Wert zu. Wenn wir später die empfangene Antwort basierend auf dem Wert dieser Eigenschaft aus der Antwortwarteschlange extrahieren, können wir die Anforderung eindeutig mit der Antwort abgleichen. Wenn wir in der Eigenschaft correlation_id auf einen unbekannten Wert stoßen , können wir diese Nachricht ignorieren, da sie keiner unserer Anforderungen entspricht.

Sie fragen sich vielleicht, warum wir unbekannte Nachrichten aus der Antwortwarteschlange einfach ignorieren möchten, anstatt das Skript zu unterbrechen? Dies ist auf die Wahrscheinlichkeit einer Racebedingung auf der Serverseite zurückzuführen. Obwohl dies unwahrscheinlich ist, ist ein Szenario möglich, in dem der RPC-Server eine Antwort sendet, aber keine Zeit hat, eine Bestätigung der Anforderungsverarbeitung zu senden. In diesem Fall verarbeitet ein neu gestarteter RPC-Server diese Anforderung erneut. Deshalb müssen wir auf dem Client mit wiederholten Antworten richtig umgehen. Darüber hinaus sollte RPC idealerweise idempotent sein.

Zusammenfassung


Bild

Unser RPC funktioniert wie folgt:

- Wenn der Client startet, erstellt er eine anonyme, eindeutige Ergebniswarteschlange.
- Um eine RPC-Anfrage zu stellen, sendet der Client eine Nachricht mit zwei Eigenschaften: reply_to , wobei der Wert die Ergebniswarteschlange und correlation_id ist , die für jede Anfrage auf einen eindeutigen Wert festgelegt sind.
- Die Anforderung wird an die Warteschlange rpc_queue gesendet .
- Der Server wartet auf Anforderungen aus dieser Warteschlange. Wenn die Anforderung empfangen wird, führt der Server seine Aufgabe aus und sendet eine Nachricht mit dem Ergebnis über die Warteschlange der Eigenschaft reply_to an den Client zurück .
- Der Client erwartet das Ergebnis aus der Ergebniswarteschlange. Wenn eine Nachricht empfangen wird, überprüft der Client die Eigenschaft correlation_id . Wenn es mit dem Wert aus der Anforderung übereinstimmt, wird das Ergebnis an die Anwendung gesendet.

Alles zusammen


Rpc_server.py-Servercode:

#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
    n = int(body)
    print " [.] fib(%s)"  % (n,)
    response = fib(n)
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print " [x] Awaiting RPC requests"
channel.start_consuming()


Der Servercode ist ziemlich einfach:

  • (4) Wir stellen wie gewohnt eine Verbindung her und deklarieren eine Warteschlange;
  • (11) Wir deklarieren unsere Funktion, die Fibonacci-Zahlen zurückgibt, wobei nur ganzzahlige positive Zahlen als Argument verwendet werden (diese Funktion funktioniert wahrscheinlich nicht mit großen Zahlen, höchstwahrscheinlich ist dies die langsamste mögliche Implementierung).
  • (19) Wir deklarieren die Rückruffunktion on_request für basic_consume , den Kern des RPC-Servers. Es wird ausgeführt, wenn die Anforderung empfangen wird. Nach Abschluss der Arbeit sendet die Funktion das Ergebnis zurück.
  • (32) Wir werden wahrscheinlich eines Tages mehr als einen Server starten wollen. Um die Last gleichmäßig auf mehrere Server zu verteilen, setzen wir prefetch_count .


Client-Code rpc_client.py:

#!/usr/bin/env python
import pika
import uuid
class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body
    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print " [x] Requesting fib(30)"
response = fibonacci_rpc.call(30)
print " [.] Got %r" % (response,)


Der Client-Code ist etwas komplizierter:

  • (7) Wir stellen eine Verbindung und einen Kanal her und kündigen eine eindeutige Ergebniswarteschlange für die eingegangenen Antworten an.
  • (16) Wir abonnieren die Ergebniswarteschlange, um Antworten vom RPC zu erhalten.
  • (18) Die Rückruffunktion ' on_response ', die beim Empfang jeder Antwort ausgeführt wird, führt eine eher triviale Aufgabe aus - für jede empfangene Antwort prüft sie, ob correlation_id dem entspricht, was wir erwarten. In diesem Fall wird die Antwort in self.response gespeichert und die Schleife unterbrochen .
  • (23) Als Nächstes definieren wir unsere Aufrufmethode , die tatsächlich eine RPC-Anforderung ausführt.
  • (24) Bei dieser Methode wird zuerst eine eindeutige correlation_id generiert und gespeichert. Die Rückruffunktion on_response verwendet diesen Wert, um die gewünschte Antwort zu verfolgen.
  • (25) Als Nächstes stellen wir die Anfrage mit den Eigenschaften reply_to und correlation_id in die Warteschlange.
  • (32) Als nächstes beginnt der Prozess des Wartens auf eine Antwort;
  • (33) Und am Ende geben wir das Ergebnis an den Benutzer zurück.


Unser RPC-Service ist bereit. Wir können den Server starten:

$ python rpc_server.py
 [x] Awaiting RPC requests

Führen Sie den Client aus, um Fibonacci-Zahlen zu erhalten:

$ python rpc_client.py
 [x] Requesting fib(30)

Die vorgestellte RPC-Implementierungsoption ist nicht die einzig mögliche, hat jedoch die folgenden Vorteile:

  • Wenn der RPC-Server zu langsam ist, können Sie problemlos einen weiteren hinzufügen. Versuchen Sie, die zweite Datei rpc_server.py in der neuen Konsole auszuführen.
  • Auf der Clientseite muss für RPC nur eine Nachricht gesendet und empfangen werden. Es ist kein synchroner Aufruf von queue_declare erforderlich . Infolgedessen verwaltet der RPC-Client einen Anforderungs-Antwort-Zyklus für eine RPC-Anforderung.


Unser Code ist jedoch vereinfacht und versucht nicht einmal, komplexere (aber natürlich wichtige) Probleme wie die folgenden zu lösen:

  • Wie soll der Client reagieren, wenn der Server nicht läuft?
  • Sollte der Client eine Zeitüberschreitung für RPC haben?
  • Sollte der Server an einem bestimmten Punkt "ausfallen" und eine Ausnahme auslösen, sollte er an den Client übergeben werden?
  • Schutz vor ungültigen eingehenden Nachrichten (z. B. Überprüfung akzeptabler Grenzen) vor der Verarbeitung.


Alle Managementartikel


RabbitMQ Tutorial 1 - Hello World (Python)
RabbitMQ Tutorial 2 - Queue (Python)
RabbitMQ Tutorial 3 - Publish / Subscribe (PHP)
RabbitMQ Tutorial 4 - Routing (PHP)
RabbitMQ Tutorial 5 - Topics (PHP)
RabbitMQ Tutorial 6 - Remote Call Prozeduren (dieser Artikel, Python)

Jetzt auch beliebt: