RedisPipe - mehr Spaß zusammen

    Wenn ich daran denke, wie naive RPC-Clients funktionieren, erinnere ich mich an einen Witz:


    Gericht
    - Angeklagter, warum haben Sie eine Frau getötet?
    - Ich bin im Bus, der Schaffner tritt auf die Frau zu und verlangt, ein Ticket zu kaufen. Eine Frau öffnete ihre Handtasche, zog ihre Handtasche heraus, schloss ihre Handtasche, öffnete ihre Handtasche, zog ihre Handtasche heraus, schloss ihre Handtasche, öffnete ihre Handtasche, schloss ihre Handtasche, öffnete ihre Handtasche, öffnete ihre Handtasche, öffnete ihre Handtasche, schloss ihre Handtasche, öffnete ihre Handtasche , schloss den Geldbeutel, öffnete den Geldbeutel, legte den Geldbeutel dort hin.
    - Na und?
    - Der Controller gab ihr eine Fahrkarte. Eine Frau öffnete ihre Handtasche, zog ihre Handtasche heraus, schloss ihre Handtasche, öffnete ihre Handtasche, zog ihre Handtasche heraus, schloss ihre Handtasche, öffnete ihre Handtasche, stellte ihre Handtasche ein, schloss ihre Handtasche, öffnete ihre Handtasche, legte eine Fahrkarte dort ab, öffnete ihre Handtasche, öffnete ihre Handtasche , steckte die Handtasche hinein, schloss die Handtasche, öffnete die Handtasche, steckte die Handtasche hinein, schloss die Handtasche.
    "Nehmen Sie die Änderung", kam die Stimme des Controllers. Die Frau ... öffnete ihre Handtasche ...
    - Ja, töte sie ein wenig - der Staatsanwalt steht nicht auf.
    - Also habe ich es getan.
    © S.Altov


    Etwa dasselbe passiert im "Request-Response" -Prozess, wenn dies nicht ernst genommen wird:


    • Der Benutzerprozess schreibt eine serialisierte Anforderung "in den Socket" und kopiert sie tatsächlich in den Socketpuffer auf Betriebssystemebene.
      Dies ist eine ziemlich schwierige Operation. Es ist notwendig, einen Kontextwechsel vorzunehmen (auch wenn dies "einfach" sein kann).
    • Wenn es den Betriebssystemen so erscheint, dass etwas in das Netzwerk geschrieben werden kann, wird ein Paket gebildet (die Anforderung wird erneut kopiert) und an die Netzwerkkarte gesendet.
    • die Netzwerkkarte schreibt das Paket in das Netzwerk (möglicherweise Vorpufferung);
    • (Unterwegs kann das Paket mehrmals in Routern gepuffert werden).
    • Schließlich kommt das Paket am Zielhost an und wird auf der Netzwerkkarte gepuffert.
    • Die Netzwerkkarte sendet eine Benachrichtigung an das Betriebssystem. Wenn das Betriebssystem die Uhrzeit ermittelt, kopiert es das Paket in seinen Puffer und setzt das Bereitschafts-Flag für den Dateideskriptor.
    • (Wir müssen auch daran denken, als Antwort eine ACK zu senden);
    • Nach einiger Zeit erkennt die Serveranwendung, dass die Bereitschaft im Deskriptor (mithilfe von epoll) angegeben ist, und kopiert die Anforderung eines Tages in den Anwendungspuffer.
    • und schließlich verarbeitet die Serveranwendung die Anforderung.

    Wie Sie verstehen, erfolgt die Übertragung der Antwort auf genau dieselbe Weise nur in die entgegengesetzte Richtung. Somit verbringt jede Anforderung eine beträchtliche Zeit auf dem Betriebssystem für ihren Dienst, und jede Antwort gibt dieselbe Zeit erneut aus.


    Dies machte sich besonders nach Meltdown / Spectre bemerkbar, da die veröffentlichten Patches zu einem starken Anstieg der Kosten für Systemaufrufe führten. Anfang Januar 2018 begann unser Redis-Cluster plötzlich, eine halbe bis zwei Mal mehr CPU zu verbrauchen. Amazon hat die entsprechenden Kernel-Patches angewendet, um diese Sicherheitsanfälligkeiten zu schließen. (Es stimmt, Amazon hat später eine neue Version des Patches angewendet, und die CPU-Auslastung ist fast auf den vorherigen Stand zurückgegangen. Der Connector wurde jedoch bereits geboren.)


    Leider funktionieren alle bekannten Go-Connectors für Redis und Memcached auf diese Weise: Der Connector erstellt einen Pool von Verbindungen. Wenn Sie eine Anfrage senden müssen, nimmt er eine Verbindung aus dem Pool, schreibt eine Anfrage und wartet auf eine Antwort. (Es ist besonders traurig, dass der Memcached-Konnektor von Brad Fitzpatrick selbst geschrieben wurde.) Und einige Konnektoren sind so erfolglos in der Implementierung dieses Pools, dass das Zurückziehen einer Verbindung aus dem Pool zu einem Engpass für sich wird.


    Es gibt zwei Möglichkeiten, diese harte Arbeit, eine Anfrage / Antwort zu senden, auf verschiedene Weise zu erleichtern:


    1. Verwenden Sie den direkten Zugriff auf die Netzwerkkarte: DPDK, Netmap, PF_RING usw.
    2. Senden Sie nicht jede Anfrage / Antwort in einem separaten Paket, sondern kombinieren Sie sie nach Möglichkeit in größeren Paketen, dh verteilen Sie den Aufwand für die Arbeit mit dem Netzwerk bei mehreren Anfragen. Zusammen mehr Spaß!

    Die erste Option ist natürlich möglich. Zunächst einmal jedoch für Mutige, da Sie die TCP / IP-Implementierung selbst schreiben müssen (z. B. wie in ScyllaDB). Und zweitens erleichtern wir auf diese Weise die Situation nur auf einer Seite - auf derjenigen, die wir selbst schreiben. Ich möchte Redis (vorerst) immer noch nicht umschreiben, daher verbrauchen die Server so viel, auch wenn der Client das coole DPDK verwendet.


    Die zweite Option ist viel einfacher und erleichtert vor allem die Situation auf dem Client und auf dem Server. Zum Beispiel eine In-Memory - Datenbank prahlt , dass Millionen von der RPS umgehen kann, während Redis nicht dienen kann, und ein paar hunderttausend . Dieser Erfolg ist jedoch weniger die Realisierung dieser In-Memory-Datenbank als die Entscheidung, dass das Protokoll vollständig asynchron ist, und Clients sollten diese Asynchronität nach Möglichkeit verwenden. Was viele (besonders in Benchmarks verwendete) Clients erfolgreich implementieren, indem sie Anforderungen über eine TCP-Verbindung senden und wenn möglich gemeinsam an das Netzwerk senden.


    Ein bekannter Artikel zeigt, dass Redis auch eine Million Antworten pro Sekunde geben kann, wenn Pipelining verwendet wird. Persönliche Erfahrungen bei der Entwicklung von In-Memory-Speichern lassen darauf schließen, dass durch das Pipelining der Verbrauch an SYS-CPU erheblich reduziert wird und Sie den Prozessor und das Netzwerk effizienter nutzen können.


    Die Frage ist nur, wie Pipelining verwendet werden soll, wenn in der Anwendung Anfragen an Redis oft einzeln eingereicht werden. Wenn ein Server fehlt und der Redis-Cluster mit einer großen Anzahl von Shards verwendet wird, wird er selbst bei einem Paket von Anforderungen in einzelne Anforderungen für jedes Shard aufgeteilt.


    Die Antwort ist natürlich „naheliegend“: Führen Sie ein implizites Pipelaying durch, indem Sie Anforderungen von allen parallel arbeitenden Gorutin an einen Redis-Server sammeln und diese über eine Verbindung senden.


    Das implizite Pipelining ist übrigens bei Konnektoren in anderen Sprachen nicht so selten: nodejs node_redis , C # RedisBoost , Pythons Aioredis und viele andere. Viele dieser Konnektoren werden auf Ereignisschleifen geschrieben. Daher erscheint die Sammlung von Anforderungen aus parallelen "Berechnungsthreads" dort natürlich. Go fördert auch die Verwendung synchroner Schnittstellen, und anscheinend, weil sich nur wenige entscheiden, eigene Schleifen zu organisieren.


    Wir wollten Redis so effizient wie möglich einsetzen und haben uns deshalb entschlossen, einen neuen "better" (tm) -Anschluss zu schreiben: RedisPipe .


    Wie macht man implizites Paylayning?


    Grundschema:


    • Die Anwendungslogik von Gorutiny schreibt Anforderungen nicht direkt in das Netzwerk, sondern überträgt sie an den Gorutine-Collector.
    • Der Collector sammelt wann immer möglich ein Paket von Anforderungen, schreibt sie in das Netzwerk und überträgt sie an den Gorutine-Leser.
    • Goretin-Reader liest die Antworten aus dem Netzwerk, vergleicht sie mit den entsprechenden Anforderungen und benachrichtigt die Logik der angekommenen Antwort.

    Sie müssen die Antwort irgendwie mitteilen. Ein früherer Programmierer auf Go wird natürlich sagen: „Durch den Kanal!“.
    Dies ist jedoch nicht das einzig mögliche Primitiv für die Synchronisation und auch nicht das effektivste in der Go-Umgebung. Und da die Bedürfnisse verschiedener Personen unterschiedlich sind, werden wir den Mechanismus erweiterbar machen, so dass der Benutzer die Schnittstelle implementieren kann (nennen wir es Future):


    type Future interface {
        Resolve(val interface{})
    }

    Und dann sieht das grundlegende Schema so aus:


    type future struct {
        req Request
        fut Future
    }
    type Conn struct {
        c        net.Conn
        futmtx   sync.Mutex
        wfutures []future
        futtimer *time.Timer
        rfutures chan []future
    }
    func (c *Conn) Send(r Request, f Future) {
        c.futmtx.Lock()
        defer c.futmtx.Unlock()
        c.wfutures = append(c.wfutures, future{req: r, fut: f})
        if len(c.wfutures) == 1 {
            futtimer.Reset(100*time.Microsecond)
        }
    }
    func (c *Conn) writer() {
        for range c.futtimer.C {
            c.futmtx.Lock()
            futures, c.wfutures = c.wfutures, nil
            c.futmtx.Unlock()
            var b []byte
            for _, ft := range futures {
                b = AppendRequest(b, ft.req)
            }
            _, _err := c.c.Write(b)
            c.rfutures <- futures
        }
    }
    func (c *Conn) reader() {
        rd := bufio.NewReader(c.c)
        var futures []future
        for {
            response, _err := ParseResponse(rd)
            if len(futures) == 0 {
                futures = <- c.rfutures
            }
            futures[0].fut.Resolve(response)
            futures = futures[1:]
        }
    }

    Natürlich ist dies ein sehr vereinfachter Code. Ausgelassen:


    • Verbindungsaufbau;
    • E / A-Zeitüberschreitungen;
    • Fehlerbehandlung beim Lesen / Schreiben;
    • Verbindung zurücksetzen;
    • die Möglichkeit, die Anfrage vor dem Senden an das Netzwerk abzubrechen;
    • Optimierung der Speicherzuordnung (Wiederverwendung der Puffer- und Futures-Arrays).

    Jeder Eingabe-Ausgabefehler (einschließlich Timeout) im realen Code führt zu einem Rezolv-Fehler aller Future, der den gesendeten und ausstehenden Anforderungen entspricht.
    Die Verbindungsebene fordert Anforderungen nicht erneut an. Wenn Sie die Anforderung wiederholen müssen (können und können), kann dies auf der höheren Abstraktionsebene erfolgen (z. B. bei der unten beschriebenen Implementierung der Unterstützung für Redis Cluster).


    Bemerkung Anfangs sah das Schema etwas komplizierter aus. Aber im Verlauf von Experimenten vereinfacht sich diese Option.


    Anmerkung 2: Die Future.Resolve-Methode hat sehr strenge Anforderungen: Sie sollte so schnell wie möglich sein, fast nicht blockieren und auf keinen Fall Panik auslösen. Dies ist auf die Tatsache zurückzuführen, dass es synchron im Lesezyklus aufgerufen wird und dass "Bremsen" unweigerlich zu einer Verschlechterung führen. Die Implementierung von Future.Resolve sollte das notwendige Minimum an linearen Aktionen ausführen: das Warten zu wecken; Es ist möglich, den Fehler zu behandeln und eine asynchrone Wiederholung (in der Implementierung der Clusterunterstützung verwendet) zu senden.


    Wirkung


    Ein guter Benchmark ist die Hälfte des Artikels!


    Ein guter Benchmark ist einer, der der Nutzung durch die beobachteten Effekte möglichst nahe kommt. Und das ist nicht leicht zu machen.


    Option Benchmark , die meiner Meinung nach ziemlich ähnlich aussieht:


    • das Haupt- "Skript" emuliert 5 parallele Clients,
    • In jedem "Client" werden alle 300-1000 "gewünschten" rps auf einem Gorutina gestartet (3 Gorutins werden für 1000 Rps gestartet, 124 Gorutins für 128.000 Rps).
    • Gorutina verwendet eine separate Instanz des Limiters und sendet Anforderungen mit zufälligen Batches - von 5 bis 15 Anfragen.

    Durch die Zufälligkeit einer Reihe von Abfragen können Sie eine zufällige Verteilung von Reihen in der Zeitskala erreichen, die die tatsächliche Last besser widerspiegeln.


    Versteckter Text

    Неправильные варианты были:
    a) использовать один рейт-лимитер на все горутины «клиента» и обращаться к нему на каждый запрос — это приводит к черезмерному потреблению CPU самим рейт-лимитером, а также усиленному чередованию во времени горутин, что ухудшает характеристики RedisPipe на средних rps (но необъяснимо улучшает на высоких);
    b) использовать один рейт-лимитер на все горутины «клиента» и посылать запросы сериями — рейт-лимитер уже не так сильно жрёт CPU, но чередование горутин во времени лишь усиливается;
    с) использовать рейт-лимитер на каждую горутину, но посылать одинаковые серии по 10 запросов, — в этом сценарии горутины слишком одновременно просыпаются, что несправедливо улучшает результаты RedisPipe.


    Die Tests wurden an der Quad-Core-Instanz AWS c5-2xlarge durchgeführt. Redis Version 5.0.


    Das Verhältnis der gewünschten Intensität der Anforderungen, der daraus resultierenden Gesamtintensität und der von der Radieschen-CPU verbrauchten Menge:


    beabsichtigte rps Redigo
    rps /% CPU
    Redispipe keine Wartezeit
    rps /% CPU
    Redispipe 50µs
    rps /% CPU
    Redispipe 150µs
    rps /% CPU
    1000 * 5 5015/7% 5015/6% 5015/6% 5015/6%
    2000 * 5 10022/11% 10022/10% 10022/10% 10022/10%
    4000 * 5 20036/21% 20036/18% 20035/17% 20035/15%
    8000 * 5 40020/45% 40062/37% 40060/26% 40056/19%
    16000 * 5 79994/71% 80102/58% 80096/33% 80090/23%
    32000 * 5 159590/96% 160180/80% 160167/39% 160150/29%
    64000 * 5 187774/99% 320313/98% 320283/47% 320258/37%
    92000 * 5 183206/99% 480443/97% 480407/52% 480366/42%
    128.000 * 5 179744/99% 640484/97% 640488/55% 640428/46%

    Anfragerate Redis CPU


    Es ist anzumerken, dass der Redis den Prozessorkern schnell übersteht, wenn der Konnektor nach dem klassischen Schema (Request / Answer + Connection Pool) arbeitet. Danach werden mehr als 190 Kronen pro Sekunde zu einer unmöglichen Aufgabe.


    Mit RedisPipe können Sie die gesamte benötigte Leistung von Redis ausdrücken. Und je mehr wir pausieren, um parallele Anfragen zu sammeln, desto weniger CPU verbraucht Redis. Der greifbare Nutzen wird bereits bei 4 Krps vom Kunden (insgesamt 20 Krps) erzielt, wenn eine Pause von 150 Mikrosekunden verwendet wird.


    Selbst wenn die Pause eindeutig nicht verwendet wird, wenn Redis in die CPU einfährt, erscheint die Verzögerung von selbst. Außerdem werden Anforderungen vom Betriebssystem gepuffert. Dadurch kann RedisPipe die Anzahl der erfolgreich ausgeführten Anforderungen erhöhen, wenn der klassische Konnektor die Beine bereits absenkt.


    Dies ist das Hauptergebnis, für das ein neuer Connector erstellt werden musste.


    Was passiert mit dem CPU-Verbrauch auf dem Client und mit einer Verzögerung der Anfragen?


    beabsichtigte rps Redigo
    % CPU / ms
    Redispipe nowait
    % cpu / ms
    Redispipe 50ms
    % CPU / ms
    Redispipe 150ms
    % CPU / ms
    1000 * 5 13 / 0,03 20 / 0,04 46 / 0,16 44 / 0,26
    2000 * 5 25 / 0,03 33 / 0,04 77 / 0,16 71 / 0,26
    4000 * 5 48 / 0,03 60 / 0,04 124 / 0,16 107 / 0,26
    8000 * 5 94 / 0,03 119 / 0,04 178 / 0,15 141 / 0,26
    16000 * 5 184 / 0,04 206 / 0,04 228 / 0,15 177 / 0,25
    32000 * 5 341 / 0,08 322 / 0,05 280 / 0,15 226 / 0,26
    64000 * 5 316 / 1,88 469 / 0,08 345 / 0,16 307 / 0,26
    92000 * 5 313 / 2,88 511 / 0,16 398 / 0,17 366 / 0,27
    128.000 * 5 312 / 3,54 509 / 0,37 441 / 0,19 418 / 0,29

    Client-CPU Latenz


    Möglicherweise stellen Sie fest, dass RedisPipe auf kleinen Rps selbst mehr CPU verbraucht als der "Konkurrent", insbesondere wenn eine kurze Pause verwendet wird. Dies ist vor allem auf die Implementierung von Timern in Go und die Implementierung der im Betriebssystem verwendeten Systemaufrufe zurückzuführen (unter Linux ist dies futexsleep), da der Unterschied im Modus "ohne Pause" merklich geringer ist.


    Wenn rps wächst, wird der Overhead der Timer durch weniger Netzwerkaufwand kompensiert. Nach 16 Krps pro Kunde bringt die Verwendung von RedisPipe mit einer Pause von 150 Mikrosekunden spürbare Vorteile.


    Nachdem Redis in der CPU ruht, beginnt die Verzögerung bei Anforderungen, die den klassischen Connector verwenden, natürlich immer mehr. Nicht sicher, dass Sie in der Praxis oft 180 krps von einer Redis-Instanz erreichen. Wenn ja, denken Sie daran, dass Sie Probleme haben könnten.


    Während Redis nicht in die CPU hineinläuft, leidet die Verzögerung bei Anforderungen natürlich unter der Verwendung einer Pause. Dieser Kompromiss ist absichtlich in den Connector eingebettet. Dieser Kompromiss ist jedoch nur spürbar, wenn sich Redis und der Client auf demselben physischen Host befinden. Abhängig von der Netzwerktopologie kann ein Roundtrip zu einem benachbarten Host zwischen einhundert Mikrosekunden und Millisekunden liegen. Dementsprechend wird der Unterschied in der Verzögerung statt neunmal (0,26 / 0,03) dreifach (0,36 / 0,13) oder nur um einige zehn Prozent (1,26 / 1,03) gemessen.


    Wenn Redis als Cache verwendet wird, ist die Gesamtwartezeit für Antworten aus der Datenbank mit einem Cache-Miss-Fehler größer als die Gesamtanzahl der Wartezeiten auf Antworten von Redis. Daher wird angenommen, dass der Anstieg der Latenz nicht signifikant ist.


    Das wichtigste positive Ergebnis ist die Toleranz gegenüber der Lastzunahme: Wenn die Last des Dienstes plötzlich N-mal ansteigt, verbraucht Redis die CPU nicht N-mal. Um der Vervierfachung der Last von 160 auf 640 Krps standzuhalten, gab Redis nur 1,6-mal mehr CPU aus und erhöhte den Verbrauch von 29 auf 46%. Dies lässt uns keine Angst haben, dass Redis sich plötzlich verbiegt. Die Skalierbarkeit der Anwendung hängt auch nicht von der Arbeit des Connectors und den Netzwerkkosten ab (lesen Sie die Kosten der SYS-CPU).


    Bemerkung Der Benchmark-Code arbeitet mit kleinen Werten. Um mein Gewissen zu klären, wiederholte ich den Test mit Werten von 768 Bytes. Der CPU-Verbrauch von "Rettich" ist deutlich gestiegen (bis zu 66% bei einer Pause von 150 µs), und die Obergrenze für den klassischen Steckverbinder sinkt auf 170 krps. Alle betrachteten Proportionen blieben jedoch gleich und daher auch die Schlussfolgerungen.


    Cluster


    Zur Skalierung verwenden wir Redis Cluster . Dies erlaubt uns, Redis nicht nur als Cache zu verwenden, sondern auch als flüchtigen Speicher und gleichzeitig keine Daten zu verlieren, wenn ein Cluster erweitert / komprimiert wird.


    Redis Cluster verwendet das Prinzip eines intelligenten Clients, d. H. Der Client muss den Status des Clusters selbst überwachen und auf Hilfsfehler reagieren, die vom "Rettich" zurückgegeben werden, wenn das "Bouquet" von der Instanz zur Instanz wechselt.


    Dementsprechend muss der Client Verbindungen zu allen Redis-Instanzen im Cluster aufrechterhalten und für jede Anforderung eine Verbindung zu der erforderlichen herstellen. Und an diesem Ort hat der Client, der zuvor benutzt wurde (lass uns nicht mit dem Finger zeigen), schlecht zusammengebrochen. Der Autor, der das Go-Marketing (CSP, Channels, Goroutines) überschätzte, hat die Synchronisierung der Arbeit mit dem Status des Clusters durch Senden von Rückrufen an den zentralen Berg implementiert. Dies ist für uns zu einem ernsthaften Engpass geworden. Als temporärer Patch mussten wir vier Clients in einem Cluster ausführen, von denen jeder bis zu hundert Verbindungen im Pool zu jeder Redis-Instanz aufnahm.


    Dementsprechend bestand im neuen Connector die Aufgabe, diesen Fehler zu vermeiden. Alle Interaktionen mit dem Status des Clusters im Pfad der Abfrage werden so weit wie möglich gesperrt


    • Der Zustand des Clusters ist praktisch unveränderlich und nicht zahlreiche mit Atomen gewürzte Mutationen
    • Der Zugriff auf den Status erfolgt über atomic.StorePointer / atomic.LoadPointer und kann daher ohne Blockierung erhalten werden.

    So können Abfragen selbst während einer Clusterstatusaktualisierung den vorherigen Status verwenden, ohne befürchten zu müssen, auf eine Sperre zu warten.


    // storeConfig atomically stores config
    func (c *Cluster) storeConfig(cfg *clusterConfig) {
        p := (*unsafe.Pointer)(unsafe.Pointer(&c.config))
        atomic.StorePointer(p, unsafe.Pointer(cfg))
    }
    // getConfig loads config atomically
    func (c *Cluster) getConfig() *clusterConfig {
        p := (*unsafe.Pointer)(unsafe.Pointer(&c.config))
        return (*clusterConfig)(atomic.LoadPointer(p))
    }
    func (cfg *clusterConfig) slot2shardno(slot uint16) uint16 {
        return uint16(atomic.LoadUint32(&cfg.slots[slot]))
    }
    func (cfg *clusterConfig) slotSetShard(slot, shard uint16) {
        atomic.StoreUint32(&cfg.slots[slot], shard)
    }

    Der Clusterstatus wird alle 5 Sekunden aktualisiert. Wenn der Verdacht auf Clusterinstabilität besteht, wird die Aktualisierung erzwungen:


    func (c *Cluster) control() {
        t := time.NewTicker(c.opts.CheckInterval)
        defer t.Stop()
        // main control loop
        for {
            select {
            case <-c.ctx.Done():
                // cluster closed, exit control loop
                c.report(LogContextClosed{Error: c.ctx.Err()})
                return
            case cmd := <-c.commands:
                // execute some asynchronous "cluster-wide" actions
                c.execCommand(cmd)
                continue
            case <-forceReload:
                // forced mapping reload
                c.reloadMapping()
            case <-t.C:
                // regular mapping reload
                c.reloadMapping()
            }
        }
    }
    func (c *Cluster) ForceReloading() {
        select {
        case c.forceReload <- struct{}{}:
        default:
        }
    }

    Wenn die von Rettich empfangene MOVED- oder ASK-Antwort eine unbekannte Adresse enthält, wird deren asynchrone Hinzufügung zur Konfiguration eingeleitet. (Ich entschuldige mich, ich habe nicht herausgefunden, wie ich den Code vereinfachen kann, daher hier der Link .) Es ist nicht ohne Sperren möglich, aber sie werden nur für kurze Zeit genommen Die Haupterwartung wird durch das Speichern des Rückrufs im Array realisiert - in der gleichen zukünftigen Seitenansicht.


    Es werden Verbindungen zu allen Redis-Instanzen, zu den Mastern und zu den Slaves hergestellt. Abhängig von der bevorzugten Richtlinie und der Art der Anforderung (Lesen oder Schreiben) kann die Anforderung sowohl an den Master als auch an den Slave gesendet werden. Dabei wird die "Lebendigkeit" der Instanz berücksichtigt, die sich sowohl aus den Informationen zusammensetzt, die während der Aktualisierung des Clusterstatus als auch des aktuellen Verbindungsstatus erhalten wurden.


    func (c *Cluster) connForSlot(slot uint16, policy ReplicaPolicyEnum) (*redisconn.Connection, *errorx.Error) {
        var conn *redisconn.Connection
        cfg := c.getConfig()
        shard := cfg.slot2shard(slot)
        nodes := cfg.nodes
        var addr string
        switch policy {
        case MasterOnly:
            addr = shard.addr[0] // master is always first
            node := nodes[addr]
            if conn = node.getConn(c.opts.ConnHostPolicy, needConnected); conn == nil {
                conn = node.getConn(c.opts.ConnHostPolicy, mayBeConnected)
            }
        case MasterAndSlaves:
            n := uint32(len(shard.addr))
            off := c.opts.RoundRobinSeed.Current()
            for _, needState := range []int{needConnected, mayBeConnected} {
                mask := atomic.LoadUint32(&shard.good) // load health information
                for ; mask != 0; off++ {
                    bit := 1 << (off % n)
                    if mask&bit == 0 {
                        // replica isn't healthy, or already viewed
                        continue
                    }
                    mask &^= bit
                    addr = shard.addr[k]
                    if conn = nodes[addr].getConn(c.opts.ConnHostPolicy, needState); conn != nil {
                        return conn, nil
                    }
                }
            }
        }
        if conn == nil {
            c.ForceReloading()
            return nil, c.err(ErrNoAliveConnection)
        }
        return conn, nil
    }
    func (n *node) getConn(policy ConnHostPolicyEnum, liveness int) *redisconn.Connection {
        for _, conn := range n.conns {
            switch liveness {
            case needConnected:
                if c.ConnectedNow() {
                    return conn
                }
            case mayBeConnected:
                if c.MayBeConnected() {
                    return conn
                }
            }
        }
        return nil
    }

    Es gibt ein Rätsel RoundRobinSeed.Current(). Dies ist einerseits die Quelle des Zufalls, andererseits die Zufälligkeit, die sich nicht oft ändert. Wenn Sie für jede Anforderung eine neue Verbindung auswählen, verschlechtert dies die Wirksamkeit der Rohrplanung. Aus diesem Grund ändert die Standardimplementierung den Wert von Current alle zehn Millisekunden. Damit die Zeitverzögerungen kleiner werden, wählt jeder Host sein eigenes Intervall.


    Wie Sie sich erinnern, verwendet die Verbindung das Konzept von Future für asynchrone Anforderungen. Der Cluster verwendet dasselbe Konzept: Der Benutzer Future wird zu einem Cluster, und dieser wird der Verbindung zugeführt.


    Warum eine individuelle Zukunft gestalten? Im Cluster-Modus gibt "Radish" zunächst wunderbare "Fehler" von MOVED und ASK mit Informationen darüber zurück, wo Sie den benötigten Schlüssel finden müssen. Nachdem Sie einen solchen Fehler erhalten haben, müssen Sie die Anfrage an einen anderen Host wiederholen. Zweitens, da wir immer noch die Umleitungslogik implementieren müssen, warum sollte man die Anforderung nicht einbetten und erneut wiederholen, wenn ein E / A-Fehler vorliegt (natürlich nur bei der Leseanforderung):


    type request struct {
        c   *Cluster
        req Request
        cb  Future
        slot   uint16
        policy ReplicaPolicyEnum
        mayRetry bool
    }
    func (c *Cluster) SendWithPolicy(policy ReplicaPolicyEnum, req Request, cb Future) {
        slot := redisclusterutil.ReqSlot(req)
        policy = c.fixPolicy(slot, req, policy)
        conn, err := c.connForSlot(slot, policy, nil)
        if err != nil {
            cb.Resolve(err)
            return
        }
        r := &request{
            c:      c,
            req:    req,
            cb:     cb,
            slot:   slot,
            policy: policy,
            mayRetry: policy != MasterOnly || redis.ReplicaSafe(req.Cmd),
        }
        conn.Send(req, r, 0)
    }
    func (r *request) Resolve(res interface{}, _ uint64) {
        err := redis.AsErrorx(res)
        if err == nil {
            r.resolve(res)
            return
        }
        switch {
        case err.IsOfType(redis.ErrIO):
            if !r.mayRetry {
                // It is not safe to retry read-write operation
                r.resolve(err)
                return
            }
            fallthrough
        case err.HasTrait(redis.ErrTraitNotSent):
            // It is request were not sent at all, it is safe to retry both readonly and write requests.
            conn, err := r.c.connForSlot(r.slot, r.policy, r.seen)
            if err != nil {
                r.resolve(err)
                return
            }
            conn.Send(r.req, r)
            return
        case err.HasTrait(redis.ErrTraitClusterMove):
            addr := movedTo(err)
            ask := err.IsOfType(redis.ErrAsk)
            r.c.ensureConnForAddress(addr, func(conn *redisconn.Connection, cerr error) {
                if cerr != nil {
                    r.resolve(cerr)
                } else {
                    r.lastconn = conn
                    conn.SendAsk(r.req, r, ask)
                }
            })
            return
        default:
            // All other errors: just resolve.
            r.resolve(err)
        }
    }

    Dies ist auch ein vereinfachter Code. Es wurde eine Einschränkung in der Anzahl der Wiederholungen, das Speichern von Problemverbindungen usw. ausgelassen.


    Komfort


    Asynchrone Anfragen, Zukunft ist ein Superkul! Aber furchtbar unangenehm.


    Die Schnittstelle ist das Wichtigste. Sie können alles verkaufen, wenn er eine nyashny-Schnittstelle hat. Deshalb haben Redis und MongoDB an Popularität gewonnen.


    Daher müssen wir unsere asynchronen Anforderungen in synchrone Anfragen umwandeln.


    // Sync provides convenient synchronous interface over asynchronous Sender.
    type Sync struct {
        S Sender
    }
    // Do is convenient method to construct and send request.
    // Returns value that could be either result or error.
    func (s Sync) Do(cmd string, args ...interface{}) interface{} {
        return s.Send(Request{cmd, args})
    }
    // Send sends request to redis.
    // Returns value that could be either result or error.
    func (s Sync) Send(r Request) interface{} {
        var res syncRes
        res.Add(1)
        s.S.Send(r, &res)
        res.Wait()
        return res.r
    }
    type syncRes struct {
        r interface{}
        sync.WaitGroup
    }
    // Resolve implements Future.Resolve
    func (s *syncRes) Resolve(res interface{}) {
        s.r = res
        s.Done()
    }
    // Usage
    func get(s redis.Sender, key string) (interface{}, error) {
        res := redis.Sync{s}.Do("GET", key)
        if err := redis.AsError(res); err != nil {
            return nil, err
        }
        return res, nil
    }

    AsErrorsieht nicht nach einem Weg aus, um Fehler zu bekommen. Aber ich mag es, weil in meiner Darstellung der Ergebnisse - ist Result<T,Error>und AsError- ersatz Pattern - Matching.


    Nachteile


    Aber leider gibt es einen Teelöffel Teer in diesem Wohlbefinden.


    Das Redis-Protokoll beinhaltet keine Neuordnung von Anforderungen. Gleichzeitig hat es Sperrabfragen vom Typ BLPOP, BRPOP.


    Das ist ein Misserfolg.


    Wenn Sie wissen, dass eine solche Anforderung blockiert wird, werden alle darauf folgenden Anforderungen blockiert. Und nichts kann dagegen unternommen werden.


    Nach langer Diskussion wurde beschlossen, die Verwendung dieser Anfragen in der RedisPipe zu verbieten.


    Wenn Sie es wirklich brauchen, können Sie natürlich: den Parameter einstellen ScriptMode: true, und alles hängt von Ihrem Gewissen ab.


    Alternativen


    Tatsächlich gibt es eine Alternative, die ich nicht erwähnt habe, aber die gut informierte Leser dachten - der King of Cluster Twemproxy-Cache.


    Es tut, was Reds für Redis tut: Es verwandelt eine unhöfliche und seelenlose "Anfrage / Antwort" in das zarteste Pipelining.


    Twemproxy selbst wird jedoch unter der Tatsache leiden, dass er an dem System "Anfrage / Antwort" arbeiten muss. Diesmal Zweitens verwenden wir "Radieschen", einschließlich als "unzuverlässiger Speicher", und ändern manchmal die Größe des Clusters. Twemproxy erleichtert die Neuverteilung in keiner Weise und erfordert außerdem einen Neustart, wenn die Clusterkonfiguration geändert wird.


    Einfluss


    Ich hatte keine Zeit, einen Artikel zu schreiben, aber die Wellen von RedisPipe sind bereits weg. In Radix.v3 wurde ein Patch verabschiedet, der seinem Pool Pipelaying hinzufügte:


    Sheck RedisPipe und herauszufinden , ob seine Strategie aus der impliziten Pipelining / Dosierung eingearbeitet werden können
    automatische Befehle in Pool für Pipelining


    In der Geschwindigkeit sind sie etwas minderwertig (nach ihren Benchmarks zu urteilen; ich kann es jedoch nicht mit Sicherheit sagen). Ihr Vorteil ist jedoch, dass sie Blockierungsbefehle aus dem Pool an andere Verbindungen senden können.


    Fazit


    Bald im Jahr, da RedisPipe zur Effektivität unseres Services beiträgt.
    Und in Erwartung von „heißen Tagen“ ist die CPU auf den Redis-Servern eine der Ressourcen, deren Kapazität kein Problem darstellt.


    Benchmark- Repository


    Jetzt auch beliebt: