Python-Implementierung eines ereignisgesteuerten Paradigmas mit Coroutinen

    In diesem Artikel wird erläutert, wie Sie mit benutzerdefinierten Python-Generatoren Coroutinen implementieren, die den Empfang von Ereignissen einschalten. Die Einfachheit des Codes des resultierenden Moduls wird Sie angenehm überraschen und neue und wenig genutzte Merkmale der Sprache verdeutlichen, die mit solchen Generatoren erhalten werden können. Der Artikel hilft zu verstehen, wie es in ernsthaften Implementierungen angeordnet ist: Asyncio , Tornado , etc.

    Theoretische Momente und Haftungsausschluss


    Das Konzept der Koroutine ist sehr weit gefasst, daher sollten Sie entscheiden, welche Merkmale sie in unserer Implementierung haben werden:
    • Sie werden gemeinsam in einem Thread aufgeführt.
    • Die Ausführung kann unterbrochen werden, um auf ein bestimmtes Ereignis zu warten.
    • Die Ausführung kann fortgesetzt werden, nachdem das erwartete Ereignis empfangen wurde.
    • Kann das Ergebnis nach Fertigstellung zurückgeben.

    Als Ergebnis erhalten wir: ereignisorientierte Programmierung ohne Callback-Funktionen und kooperatives Multitasking . Der Effekt der Verwendung eines solchen Programmierparadigmas ist nur für Aufgaben von Bedeutung, die auf ungleichmäßige Ereignisse reagieren. Zuallererst sind dies E / A-Verarbeitungsaufgaben: Netzwerkserver, Benutzerschnittstellen usw. Eine weitere mögliche Anwendung ist die Aufgabe, den Zustand der Charaktere in der Spielwelt zu berechnen. Für Aufgaben, die lange Berechnungen erfordern, grundsätzlich nicht geeignet.
    Es sollte klar sein, dass, obwohl die laufende Coroutine nicht unterbrochen wurde, um auf das Ereignis zu warten, sich alle anderen in einem Stoppzustand befinden, selbst wenn das erwartete Ereignis bereits eingetreten ist.

    Die Basis von allem


    In Python sind Generatoren eine gute Basis für all dies, wenn sie im wörtlichen und im übertragenen Sinne richtig vorbereitet sind. Genauer gesagt, erweiterte Generatoren, deren API schließlich in Python Version 3.3 erstellt wurde. In früheren Versionen wurde die Rückgabe des Werts (Ergebnisses) nach Abschluss des Generators nicht implementiert, und es gab keinen praktischen Mechanismus zum Aufrufen eines Generators von einem anderen. Trotzdem waren Coroutine-Implementierungen früher, aber aufgrund der Einschränkungen gewöhnlicher Generatoren waren sie nicht so "schön" wie das, was wir bekommen. Ein sehr guter Artikel zu diesem Thema "Ein kurioser Kurs über Koroutinen und Parallelität"Der einzige Nachteil ist, dass es keine aktualisierte Version gibt. Wo die Implementierung von Coroutine in Python die neuesten Innovationen in der Sprache verwendet, insbesondere in der Enhanced Python Generators API. Nachfolgend sind die Funktionen der erweiterten Generatoren aufgeführt, die wir benötigen.
    Die Übertragung von Nachrichten an die Coroutine basiert auf der Möglichkeit, den Generatorstatus festzulegen. Kopieren Sie den folgenden Code in das Fenster des laufenden Python-Interpreters ab Version 3.3.
    def gen_factory():
        state = None
        while True:
            print("state:", state)
            state = yield state
    gen = gen_factory()
    

    Der Generator wird angelegt, er muss gestartet werden.
    >>> next(gen)
    state: None
    

    Der Ausgangszustand wird empfangen. Ändern Sie den Zustand:
    >>> gen.send("OK")
    state: OK
    'OK'
    

    Wir sehen, dass sich der Zustand geändert hat und als Ergebnis zurückgekehrt ist. Die folgenden Sendeaufrufe geben den Status zurück, den sie bereits senden.

    Warum brauchen wir das alles?


    Stellen Sie sich vor, Sie senden alle zwei Sekunden Grüße an Petrov, alle drei Sekunden an Ivanov und alle fünf Sekunden an die ganze Welt. In Form von Python-Code können Sie sich Folgendes vorstellen:
    def hello(name, timeout):
        while True:
            sleep(timeout)
            print("Привет, {}!".format(name))
    hello("Петров", 2.0)
    hello("Иванов", 3.0)
    hello("Мир", 5.0)
    

    Es sieht gut aus, aber nur Petrov wird begrüßt. Aber! Eine kleine Änderung, die sich nicht auf die Klarheit des Codes auswirkt, sondern auch umgekehrt, verdeutlicht unsere Idee, und dies kann bereits wie erwartet funktionieren.
    @coroutine
    def hello(name, timeout):
        while True:
            yield from sleep(timeout)
            print("Привет, {}!".format(name))
    hello("Петров", 2.0)
    hello("Иванов", 3.0)
    hello("Мир", 5.0)
    run()
    

    Der Code hat sich im Stil der Python-Methode herausgestellt - er veranschaulicht die Aufgabe klar, linear ohne Rückrufe, ohne unnötigen Schnickschnack mit Objekten, und Kommentare darin sind überflüssig. Es bleibt nur die Implementierung des Coroutine Decorators, seiner Version der Sleep-Funktion und der Run-Funktion. Bei der Umsetzung wird es natürlich nicht auf Schnickschnack verzichten. Dies ist jedoch auch eine pythonische Methode, um die Magie hinter der Fassade der Bibliotheksmodule zu verbergen.

    Am interessantesten


    Wir nennen das Modul mit der Implementierung unprätentiös - Parallelität, mit Bedeutung und spiegeln die Tatsache wider, dass es sich tatsächlich um die Implementierung von kooperativem Multitasking handelt. Es ist klar, dass der Dekorateur aus einer regulären Funktion einen Generator machen und ihn starten muss (den ersten Aufruf zum nächsten machen). Die Ausbeute aus dem Sprachkonstrukt leitet den Aufruf an den nächsten Generator weiter. Das heißt, die Schlaffunktion sollte einen Generator erzeugen, in dem Sie alle Magie verstecken können. Nur der Code des empfangenen Ereignisses wird an den Generator zurückgegeben, der es verursacht hat. Hier wird das zurückgegebene Ergebnis nicht verarbeitet, der Code kann hier im Wesentlichen nur ein Ergebnis erhalten, was bedeutet, dass das Timeout abgelaufen ist. Das Warten auf E / A kann verschiedene Arten von Ereignissen zurückgeben, z. B. Lesen / Schreiben / Zeitüberschreitung. Außerdem, Generatoren, die durch Funktionen vom Typ sleep generiert werden, können von jedem Datentyp einen Ertrag liefern, und dementsprechend ist ihre Funktionalität möglicherweise nicht auf das Warten auf Ereignisse beschränkt. Die Funktion run startet den Event-Dispatcher. Sie hat die Aufgabe, das Event von außen zu empfangen und / oder intern zu generieren, den Empfänger zu ermitteln und es tatsächlich zu versenden.
    Beginnen wir mit dem Dekorateur:
    class coroutine(object):
        """Делает из функции сопрограмму на базе расширенного генератора."""
        _current = None
        def __init__(self, callable):
            self._callable = callable
        def __call__(self, *args, **kwargs):
            corogen = self._callable(*args, **kwargs)
            cls = self.__class__
            if cls._current is None:
                try:
                    cls._current = corogen
                    next(corogen)
                finally:
                    cls._current = None
            return corogen
    

    Es wird in Form einer Klasse gemacht, ein typischer Trick, wie versprochen, es erzeugt und betreibt einen Generator. Eine Konstruktion mit _current wurde hinzugefügt, um zu vermeiden, dass der Generator gestartet wird, wenn die dekorierte Funktion, die ihn erstellt, im Körper eines anderen Generators aufgerufen wird. In diesem Fall wird der erste Anruf getätigt. Sie können auch herausfinden, an welchen Generator das Ereignis gesendet werden soll, damit es in einer Kette mit dem von der Sleep-Funktion erstellten Generator verbunden wird.
    def sleep(timeout):
        """Приостанавливает выполнение до получения события "таймаут истек"."""
        corogen = coroutine._current
        dispatcher.setup_timeout(corogen, timeout)
        revent = yield
        return revent
    

    Hier sehen wir den Aufruf dispatcher.setup_sleep, der dem Ereignis-Dispatcher mitteilt, dass der Generator auf ein Timeout-Ereignis wartet, nachdem die durch den Parameter timeout angegebene Anzahl von Sekunden abgelaufen ist.
    from collections import deque
    from time import time, sleep as sys_sleep
    class Dispatcher(object):
        """Объект реализующий диспечер событий."""
        def __init__(self):
            self._pending = deque()
            self._deadline = time() + 3600.0
        def setup_timeout(self, corogen, timeout):
            deadline = time() + timeout
            self._deadline = min([self._deadline, deadline])
            self._pending.append([corogen, deadline])
            self._pending = deque(sorted(self._pending, key=lambda a: a[1]))
        def run(self):
            """Запускает цикл обработки событий."""
            while len(self._pending) > 0:
                timeout = self._deadline - time()
                self._deadline = time() + 3600.0
                if timeout > 0:
                    sys_sleep(timeout)
                while len(self._pending) > 0:
                    if self._pending[0][1] <= time():
                        corogen, _ = self._pending.popleft()
                        try:
                            coroutine._current = corogen
                            corogen.send("timeout")
                        except StopIteration:
                            pass
                        finally:
                            coroutine._current = None
                    else:
                        break
    dispatcher = Dispatcher()
    run = lambda: dispatcher.run()
    

    Der Event-Dispatcher-Code ist ebenfalls nicht ungewöhnlich. Wohin Ereignisse gesendet werden, wird mit der Klassenvariablen coroutine._current festgelegt. Wenn das Modul geladen wird, wird eine Instanz der Klasse erstellt. In einer funktionierenden Implementierung sollte dies natürlich ein Singleton sein. Die collections.deque-Klasse wird anstelle der list-Klasse verwendet, da sie mit der popleft-Methode schneller und nützlicher ist. Nun, das ist alles und es gibt keine besondere Magie. All dies ist in der Implementierung fortschrittlicher Python-Generatoren sogar noch tiefer verborgen. Sie können nur richtig gekocht werden.

    Datei: concurrency.py
    # concurrency.py
    from collections import deque
    from time import time, sleep as sys_sleep
    class coroutine(object):
        """Делает из функции сопрограмму на базе расширенного генератора."""
        _current = None
        def __init__(self, callable):
            self._callable = callable
        def __call__(self, *args, **kwargs):
            corogen = self._callable(*args, **kwargs)
            cls = self.__class__
            if cls._current is None:
                try:
                    cls._current = corogen
                    next(corogen)
                finally:
                    cls._current = None
            return corogen
    def sleep(timeout):
        """Приостанавливает выполнение до получения события "таймаут истек"."""
        corogen = coroutine._current
        dispatcher.setup_timeout(corogen, timeout)
        revent = yield
        return revent
    class Dispatcher(object):
        """Объект реализующий диспечер событий."""
        def __init__(self):
            self._pending = deque()
            self._deadline = time() + 3600.0
        def setup_timeout(self, corogen, timeout):
            deadline = time() + timeout
            self._deadline = min([self._deadline, deadline])
            self._pending.append([corogen, deadline])
            self._pending = deque(sorted(self._pending, key=lambda a: a[1]))
        def run(self):
            """Запускает цикл обработки событий."""
            while len(self._pending) > 0:
                timeout = self._deadline - time()
                self._deadline = time() + 3600.0
                if timeout > 0:
                    sys_sleep(timeout)
                while len(self._pending) > 0:
                    if self._pending[0][1] <= time():
                        corogen, _ = self._pending.popleft()
                        try:
                            coroutine._current = corogen
                            corogen.send("timeout")
                        except StopIteration:
                            pass
                        finally:
                            coroutine._current = None
                    else:
                        break
    dispatcher = Dispatcher()
    run = lambda: dispatcher.run()
    


    Datei: sample.py
    # sample.py
    from concurency import coroutine, sleep, run
    @coroutine
    def hello(name, timeout):
        while True:
            yield from sleep(timeout)
            print("Привет, {}!".format(name))
    hello("Петров", 2.0)
    hello("Иванов", 3.0)
    hello("Мир", 5.0)
    run()
    



    Outro


    Wenn das Thema interessant ist, können Sie mit der Implementierung der Erwartung von E / A-Ereignissen am Beispiel eines asynchronen TCP-Echo-Servers fortfahren. Mit einem echten Event-Dispatcher, der als dynamische Bibliothek implementiert ist und in einer anderen, schnelleren Sprache als Python geschrieben ist.

    Jetzt auch beliebt: