System.IO.Pipelines: Hochleistungs-E / A in .NET

Ursprünglicher Autor: David Fowler
  • Übersetzung
System.IO.Pipelines ist eine neue Bibliothek, die die Codeorganisation in .NET vereinfacht. Es ist schwierig, hohe Leistung und Genauigkeit zu gewährleisten, wenn Sie mit komplexem Code umgehen müssen. Die Aufgabe von System.IO.Pipelines besteht darin, den Code zu vereinfachen. Mehr unter dem Schnitt!



Die Bibliothek entstand aus den Bemühungen des .NET Core-Entwicklungsteams, das Kestrel zu einem der schnellsten Webserver der Branche machen sollte . Es wurde ursprünglich als Teil der Kestrel-Implementierung konzipiert, entwickelte sich jedoch zu einer wiederverwendbaren API, die in Version 2.1 als erstklassige BCL-API (System.IO.Pipelines) verfügbar ist.

Welche Probleme löst sie?


Um Daten aus einem Stream oder Socket richtig analysieren zu können, müssen Sie eine große Menge Standardcode schreiben. Gleichzeitig gibt es viele Fallstricke, die sowohl den Code selbst als auch dessen Unterstützung komplizieren.

Welche Schwierigkeiten treten heute auf?


Beginnen wir mit einer einfachen Aufgabe. Wir müssen einen TCP-Server schreiben, der durch Zeilen getrennte Nachrichten (\ n) vom Client empfängt.

TCP Server mit NetworkStream


AUSBAU: Wie bei jeder Aufgabe, die eine hohe Leistung erfordert, sollte jeder Fall anhand der Merkmale Ihrer Anwendung betrachtet werden. Es ist möglich, dass Ausgabenressourcen für die Verwendung verschiedener Ansätze, die weiter diskutiert werden, nicht sinnvoll sind, wenn der Umfang der Netzwerkanwendung nicht sehr groß ist.

Normaler .NET-Code vor der Verwendung von Pipelines sieht folgendermaßen aus:

async Task ProcessLinesAsync(NetworkStream stream)
	{
	    var buffer = newbyte[1024];
	    await stream.ReadAsync(buffer, 0, buffer.Length);
	    // Process a single line from the buffer
	    ProcessLine(buffer);
	}

Siehe sample1.cs auf GitHub.

Dieser Code wird wahrscheinlich mit lokalen Tests funktionieren. Es gibt jedoch einige Fehler:

  • Nach einem einzigen Aufruf von ReadAsync wird möglicherweise nicht die gesamte Nachricht empfangen (bis zum Ende der Zeile).
  • Die Ausgabe der stream.ReadAsync () -Methode wird ignoriert - die Datenmenge, die tatsächlich in den Puffer übertragen wird.
  • Der Code behandelt nicht das Empfangen mehrerer Zeilen in einem einzelnen ReadAsync-Aufruf.

Dies sind die häufigsten Fehler beim Lesen von Streaming-Daten. Um dies zu vermeiden, müssen Sie einige Änderungen vornehmen:

  • Sie müssen die eingehenden Daten puffern, bis eine neue Zeichenfolge gefunden wird.
  • Es ist notwendig, alle in den Puffer zurückgegebenen Zeilen zu analysieren.

async Task ProcessLinesAsync(NetworkStream stream)
	{
	    var buffer = newbyte[1024];
	    var bytesBuffered = 0;
	    var bytesConsumed = 0;
	    while (true)
	    {
	        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, buffer.Length - bytesBuffered);
	        if (bytesRead == 0)
	        {
	            // EOFbreak;
	        }
	        // Keep track of the amount of buffered bytes
	        bytesBuffered += bytesRead;
	        var linePosition = -1;
	        do
	        {
	            // Look for a EOL in the buffered data
	            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed);
	            if (linePosition >= 0)
	            {
	                // Calculate the length of the line based on the offsetvar lineLength = linePosition - bytesConsumed;
	                // Process the line
	                ProcessLine(buffer, bytesConsumed, lineLength);
	                // Move the bytesConsumed to skip past the line we consumed (including \n)
	                bytesConsumed += lineLength + 1;
	            }
	        }
	        while (linePosition >= 0);
	    }
	}

Siehe sample2.cs auf GitHub

Ich wiederhole: Dies könnte mit lokalen Tests funktionieren, aber manchmal gibt es Zeilen, die länger als 1 KB (1024 Bytes) sind. Es ist notwendig, den Eingabepuffer zu vergrößern, bis eine neue Zeile gefunden wird.

Außerdem sammeln wir bei der Verarbeitung langer Zeichenfolgen Puffer in einem Array. Wir können diesen Prozess mit ArrayPool verbessern, um eine Neuzuweisung von Puffern während der Analyse langer Zeilen, die vom Client kommen, zu vermeiden.

async Task ProcessLinesAsync(NetworkStream stream)
	{
	    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
	    var bytesBuffered = 0;
	    var bytesConsumed = 0;
	    while (true)
	    {
	        // Calculate the amount of bytes remaining in the buffervar bytesRemaining = buffer.Length - bytesBuffered;
	        if (bytesRemaining == 0)
	        {
	            // Double the buffer size and copy the previously buffered data into the new buffervar newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
	            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
	            // Return the old buffer to the pool
	            ArrayPool<byte>.Shared.Return(buffer);
	            buffer = newBuffer;
	            bytesRemaining = buffer.Length - bytesBuffered;
	        }
	        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
	        if (bytesRead == 0)
	        {
	            // EOFbreak;
	        }
	        // Keep track of the amount of buffered bytes
	        bytesBuffered += bytesRead;
	        do
	        {
	            // Look for a EOL in the buffered data
	            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed, bytesBuffered - bytesConsumed);
	            if (linePosition >= 0)
	            {
	                // Calculate the length of the line based on the offsetvar lineLength = linePosition - bytesConsumed;
	                // Process the line
	                ProcessLine(buffer, bytesConsumed, lineLength);
	                // Move the bytesConsumed to skip past the line we consumed (including \n)
	                bytesConsumed += lineLength + 1;
	            }
	        }
	        while (linePosition >= 0);
	    }
	}

Siehe sample3.cs auf GitHub.

Der Code funktioniert, aber jetzt hat sich die Puffergröße geändert, was zu vielen Kopien führt. Es wird auch mehr Speicher verwendet, da die Logik den Puffer nach der Verarbeitung von Zeichenfolgen nicht verringert. Um dies zu vermeiden, können Sie eine Liste mit Puffern speichern und die Puffergröße nicht jedes Mal ändern, wenn Zeilen länger als 1 KB ankommen.

Außerdem erhöhen wir die Puffergröße nicht um 1 KB, bis sie vollständig leer ist. Dies bedeutet, dass wir immer kleinere Puffer an ReadAsync senden, wodurch die Anzahl der Aufrufe an das Betriebssystem steigt.

Wir werden versuchen, dies zu beseitigen und einen neuen Puffer zuzuweisen, sobald die Größe des vorhandenen Puffers weniger als 512 Bytes beträgt:

publicclassBufferSegment
	{
	    publicbyte[] Buffer { get; set; }
	    publicint Count { get; set; }
	    publicint Remaining => Buffer.Length - Count;
	}
	async Task ProcessLinesAsync(NetworkStream stream)
	{
	    constint minimumBufferSize = 512;
	    var segments = new List<BufferSegment>();
	    var bytesConsumed = 0;
	    var bytesConsumedBufferIndex = 0;
	    var segment = new BufferSegment { Buffer = ArrayPool<byte>.Shared.Rent(1024) };
	    segments.Add(segment);
	    while (true)
	    {
	        // Calculate the amount of bytes remaining in the bufferif (segment.Remaining < minimumBufferSize)
	        {
	            // Allocate a new segment
	            segment = new BufferSegment { Buffer = ArrayPool<byte>.Shared.Rent(1024) };
	            segments.Add(segment);
	        }
	        var bytesRead = await stream.ReadAsync(segment.Buffer, segment.Count, segment.Remaining);
	        if (bytesRead == 0)
	        {
	            break;
	        }
	        // Keep track of the amount of buffered bytes
	        segment.Count += bytesRead;
	        while (true)
	        {
	            // Look for a EOL in the list of segmentsvar (segmentIndex, segmentOffset) = IndexOf(segments, (byte)'\n', bytesConsumedBufferIndex, bytesConsumed);
	            if (segmentIndex >= 0)
	            {
	                // Process the line
	                ProcessLine(segments, segmentIndex, segmentOffset);
	                bytesConsumedBufferIndex = segmentOffset;
	                bytesConsumed = segmentOffset + 1;
	            }
	            else
	            {
	                break;
	            }
	        }
	        // Drop fully consumed segments from the list so we don't look at them againfor (var i = bytesConsumedBufferIndex; i >= 0; --i)
	        {
	            var consumedSegment = segments[i];
	            // Return all segments unless this is the current segmentif (consumedSegment != segment)
	            {
	                ArrayPool<byte>.Shared.Return(consumedSegment.Buffer);
	                segments.RemoveAt(i);
	            }
	        }
	    }
	}
	(int segmentIndex, int segmentOffest) IndexOf(List<BufferSegment> segments, bytevalue, int startBufferIndex, int startSegmentOffset)
	{
	    var first = true;
	    for (var i = startBufferIndex; i < segments.Count; ++i)
	    {
	        var segment = segments[i];
	        // Start from the correct offsetvar offset = first ? startSegmentOffset : 0;
	        var index = Array.IndexOf(segment.Buffer, value, offset, segment.Count - offset);
	        if (index >= 0)
	        {
	            // Return the buffer index and the index within that segment where EOL was foundreturn (i, index);
	        }
	        first = false;
	    }
	    return (-1, -1);
	}

Siehe sample4.cs auf GitHub

Der Code wird dadurch viel komplizierter. Während der Suche nach einem Trennzeichen überwachen wir die gefüllten Puffer. Verwenden Sie dazu die Liste, in der die gepufferten Daten angezeigt werden, wenn Sie nach einem neuen Zeilenbegrenzer suchen. Als Ergebnis nehmen ProcessLine und IndexOf eine Liste anstelle von Byte [], Offset und Count. Die Parsing-Logik beginnt mit der Verarbeitung eines oder mehrerer Segmente des Puffers.

Nun verarbeitet der Server Teilmeldungen und verwendet den kombinierten Speicher, um den gesamten Speicherbedarf zu reduzieren. Es müssen jedoch einige Änderungen vorgenommen werden:

  1. Von ArrayPoolbyte verwenden wir nur Byte [] - die standardmäßig verwalteten Arrays. Mit anderen Worten, wenn die ReadAsync- oder WriteAsync-Funktion ausgeführt wird, hängt die Lebensdauer der Puffer von der Zeit ab, zu der eine asynchrone Operation ausgeführt wird (um mit der eigenen E / A-API des Betriebssystems zu interagieren). Da sich festgehaltener Speicher nicht verschieben kann, wirkt sich dies auf die Leistung des Garbage Collectors aus und kann zu einer Fragmentierung des Arrays führen. Möglicherweise müssen Sie die Poolimplementierung ändern, je nachdem, wie lange der asynchrone Vorgang auf die Ausführung wartet.
  2. Der Durchsatz kann verbessert werden, indem die Verknüpfung zwischen Leselogik und Verarbeitung aufgehoben wird. Wir erhalten den Effekt der Stapelverarbeitung, und jetzt kann die Parsing-Logik große Datenmengen lesen und große Pufferblöcke verarbeiten, anstatt einzelne Zeilen zu analysieren. Dadurch wird der Code noch komplizierter:

    • Es ist notwendig, zwei Zyklen zu erstellen, die unabhängig voneinander arbeiten. Der erste liest Daten aus dem Socket und der zweite analysiert die Puffer.
    • Wir brauchen eine Möglichkeit, der Analyselogik mitzuteilen, dass die Daten verfügbar sind.
    • Sie müssen auch feststellen, was passiert, wenn die Schleife zu schnell Daten aus dem Socket liest. Wir brauchen einen Weg, um den Lesezyklus zu regulieren, wenn die Syntaxanalyse-Logik nicht mithält. Dies wird allgemein als "Flusssteuerung" oder "Flusswiderstand" bezeichnet.
    • Wir müssen sicherstellen, dass die Daten sicher übertragen werden. Jetzt wird der Puffersatz sowohl vom Lesezyklus als auch vom Syntaxanalysezyklus verwendet. Sie arbeiten unabhängig voneinander auf verschiedenen Threads.
    • Die Speicherverwaltungslogik ist auch an zwei verschiedenen Codefragmenten beteiligt: ​​Ausleihen von Daten aus dem Pufferpool, der Daten aus dem Socket liest, und Zurückkehren aus dem Pufferpool, der Logik des Parsens.
    • Sie müssen äußerst vorsichtig mit dem Zurückgeben von Puffern sein, nachdem Sie die Parsing-Logik ausgeführt haben. Ansonsten besteht die Möglichkeit, dass wir den Puffer zurückgeben, in den die Socket-Leselogik noch geschrieben wird.

Die Schwierigkeit beginnt abzurollen (und dies ist nicht in allen Fällen der Fall!). Um ein Hochleistungsnetzwerk zu erstellen, müssen Sie sehr komplexen Code schreiben.

Das Ziel von System.IO.Pipelines ist es, dieses Verfahren zu vereinfachen.

TCP Server und System.IO.Pipelines


Mal sehen, wie System.IO.Pipelines funktioniert:

async Task ProcessLinesAsync(Socket socket)
	{
	    var pipe = new Pipe();
	    Task writing = FillPipeAsync(socket, pipe.Writer);
	    Task reading = ReadPipeAsync(pipe.Reader);
	    return Task.WhenAll(reading, writing);
	}
	async Task FillPipeAsync(Socket socket, PipeWriter writer)
	{
	    constint minimumBufferSize = 512;
	    while (true)
	    {
	        // Allocate at least 512 bytes from the PipeWriter
	        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
	        try 
	        {
	            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
	            if (bytesRead == 0)
	            {
	                break;
	            }
	            // Tell the PipeWriter how much was read from the Socket
	            writer.Advance(bytesRead);
	        }
	        catch (Exception ex)
	        {
	            LogError(ex);
	            break;
	        }
	        // Make the data available to the PipeReader
	        FlushResult result = await writer.FlushAsync();
	        if (result.IsCompleted)
	        {
	            break;
	        }
	    }
	    // Tell the PipeReader that there's no more data coming
	    writer.Complete();
	}
	async Task ReadPipeAsync(PipeReader reader)
	{
	    while (true)
	    {
	        ReadResult result = await reader.ReadAsync();
	        ReadOnlySequence<byte> buffer = result.Buffer;
	        SequencePosition? position = null;
	        do 
	        {
	            // Look for a EOL in the buffer
	            position = buffer.PositionOf((byte)'\n');
	            if (position != null)
	            {
	                // Process the line
	                ProcessLine(buffer.Slice(0, position.Value));
	                // Skip the line + the \n character (basically position)
	                buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
	            }
	        }
	        while (position != null);
	        // Tell the PipeReader how much of the buffer we have consumed
	        reader.AdvanceTo(buffer.Start, buffer.End);
	        // Stop reading if there's no more data comingif (result.IsCompleted)
	        {
	            break;
	        }
	    }
	    // Mark the PipeReader as complete
	    reader.Complete();
	}

Siehe sample5.cs auf GitHub

In der Pipeline-Version unseres Zeilenlesers gibt es zwei Zyklen:

  • FillPipeAsync liest aus dem Socket und schreibt in PipeWriter.
  • ReadPipeAsync liest aus PipeReader und analysiert eingehende Zeilen.

Im Gegensatz zu den ersten Beispielen gibt es keine speziell zugewiesenen Puffer. Dies ist eine der Hauptfunktionen von System.IO.Pipelines. Alle Pufferverwaltungsaufgaben werden an PipeReader / PipeWriter-Implementierungen übergeben.

Das Verfahren wird vereinfacht: Wir verwenden Code nur für die Geschäftslogik, anstatt ein komplexes Puffermanagement zu implementieren.

Die erste Schleife ruft zuerst PipeWriter.GetMemory (int) auf, um eine bestimmte Menge Speicherplatz vom Hauptrecorder zu erhalten. Dann wird PipeWriter.Advance (int) aufgerufen, der PipeWriter mitteilt, wie viele Daten tatsächlich in den Puffer geschrieben werden. Darauf folgt ein Aufruf von PipeWriter.FlushAsync (), damit PipeReader auf die Daten zugreifen kann.

Die zweite Schleife verbraucht Puffer, die von PipeWriter aufgezeichnet wurden, aber anfangs von einem Socket kamen. Wenn eine Anforderung an PipeReader.ReadAsync () zurückgegeben wird, erhalten wir ein ReadResult, das zwei wichtige Meldungen enthält: Daten, die in der Form ReadOnlySequence gelesen werden, sowie der logische Datentyp IsCompleted, der dem Leser mitteilt, ob der Recorder den Auftrag abgeschlossen hat (EOF). Wenn das Trennzeichen für das Zeilenende (EOL) gefunden wird und die Zeichenfolge analysiert wird, teilen wir den Puffer in Teile auf, um das bereits verarbeitete Fragment zu überspringen. Danach wird PipeReader.AdvanceTo aufgerufen, und es teilt PipeReader mit, wie viel Daten verbraucht wurden.

Am Ende jedes Zyklus ist die Arbeit sowohl des Lesers als auch des Recorders abgeschlossen. Folglich gibt der Hauptkanal den gesamten zugewiesenen Speicher frei.

System.IO.Pipelines


Teillesung


Neben der Speicherverwaltung erfüllt System.IO.Pipelines eine weitere wichtige Funktion: Es betrachtet die Daten im Kanal, verbraucht sie jedoch nicht.

PipeReader hat zwei Haupt-APIs: ReadAsync und AdvanceTo. ReadAsync empfängt Daten vom Kanal. AdvanceTo informiert PipeReader darüber, dass diese Puffer nicht mehr vom Lesegerät benötigt werden, sodass Sie sie entfernen können (beispielsweise zum Hauptpufferpool zurückkehren).

Nachfolgend finden Sie ein Beispiel für ein HTTP-Analysegerät, das Daten aus den Kanaldatenpuffer liest, bis eine geeignete Startlinie angezeigt wird.



ReadOnlySequenceT


Die Kanalimplementierung speichert eine Liste zugeordneter Puffer, die zwischen PipeWriter und PipeReader übertragen werden. PipeReader.ReadAsync offenbart ReadOnlySequence, einen neuen BCL-Typ, der aus einem oder mehreren ReadOnlyMemory <T> -Segmenten besteht. Es ist vergleichbar mit Span oder Memory, was uns die Möglichkeit gibt, Arrays und Strings zu betrachten.



Innerhalb des Kanals gibt es Zeiger, die zeigen, wo sich der Leser und der Rekorder in der allgemeinen Gruppe der ausgewählten Daten befinden, und sie auch aktualisieren, während sie geschrieben werden und die Daten lesen. SequencePosition ist ein einzelner Punkt in der verknüpften Pufferliste und wird verwendet, um ReadOnlySequence <T> effizient zu trennen.

Da ReadOnlySequence <T> ein Segment und mehr unterstützt, ist die Standardoperation von Hochleistungslogik die Trennung von schnellen und langsamen Pfaden basierend auf der Anzahl der Segmente.

Als Beispiel stellen wir eine Funktion vor, die eine ASCII-ReadOnlySequence in eine Zeichenfolge konvertiert:

stringGetAsciiString(ReadOnlySequence<byte> buffer)
	{
	    if (buffer.IsSingleSegment)
	    {
	        return Encoding.ASCII.GetString(buffer.First.Span);
	    }
	    returnstring.Create((int)buffer.Length, buffer, (span, sequence) =>
	    {
	        foreach (var segment in sequence)
	        {
	            Encoding.ASCII.GetChars(segment.Span, span);
	            span = span.Slice(segment.Length);
	        }
	    });
	}

Siehe sample6.cs auf github

Durchflusswiderstand und Flusskontrolle


Idealerweise arbeiten Lesen und Analyse zusammen: Der Lesestrom verbraucht Daten aus dem Netzwerk und speichert sie in Puffern, während der Analysestrom die entsprechenden Datenstrukturen erstellt. Die Analyse dauert normalerweise länger als das Kopieren von Datenblöcken aus einem Netzwerk. Folglich kann der Lesestrom den Analysestrom leicht überlasten. Daher muss der Lesestrom entweder langsamer werden oder mehr Speicher verbrauchen, um Daten für den Analysestrom zu speichern. Um eine optimale Leistung zu gewährleisten, ist ein Gleichgewicht zwischen der Pausenfrequenz und der Zuweisung großer Speichermengen erforderlich.

Um dieses Problem zu lösen, verfügt die Pipeline über zwei Flusssteuerungsfunktionen: PauseWriterThreshold und ResumeWriterThreshold. PauseWriterThreshold legt fest, wie viele Daten vor dem Anhalten von PipeWriter.FlushAsync zwischengespeichert werden müssen. ResumeWriterThreshold legt fest, wie viel Speicher der Leser verbrauchen kann, bevor er den Recorder wieder aufnimmt.



PipeWriter.FlushAsync ist "blockiert", wenn die Datenmenge im Pipeline-Stream die in PauseWriterThreshold festgelegte Grenze überschreitet, und "unlocked", wenn die unter ResumeWriterThreshold festgelegte Grenze unterschritten wird. Um zu verhindern, dass die Verbrauchsgrenze überschritten wird, werden nur zwei Werte verwendet.

E / A-Planung


Wenn Sie async / await verwenden, werden nachfolgende Vorgänge normalerweise entweder in Pool-Threads oder im aktuellen SynchronizationContext aufgerufen.

Bei der Implementierung von E / A ist es sehr wichtig, genau zu kontrollieren, wo sie ausgeführt werden, um den Prozessor-Cache effizienter zu nutzen. Dies ist wichtig für Hochleistungsanwendungen wie Webserver. System.IO.Pipelines verwendet PipeScheduler, um zu bestimmen, wo asynchrone Callbacks platziert werden sollen. Auf diese Weise können Sie sehr genau steuern, welche Threads für E / A verwendet werden sollen.

Ein praktisches Beispiel ist der Kestrel Libuv-Transport, bei dem E / A-Rückrufe über dedizierte Kanäle der Ereignisschleife ausgeführt werden.

Das PipeReader-Muster bietet noch weitere Vorteile.


  • Einige Basissysteme unterstützen das "Warten ohne Pufferung": Der Puffer muss erst dann zugewiesen werden, wenn im Basissystem verfügbare Daten angezeigt werden. In Linux mit epoll können Sie also keinen Puffer zum Lesen bereitstellen, bis die Daten vorbereitet sind. Dies vermeidet eine Situation, in der viele Threads auf Daten warten, und Sie müssen sofort eine große Menge an Speicher reservieren.
  • Die Standard-Pipeline vereinfacht die Aufzeichnung von Netzwerkcode-Unit-Tests: Die Analyselogik ist vom Netzwerkcode getrennt, und die Unit-Tests führen diese Logik nur in Puffern im Speicher aus und verbrauchen sie nicht direkt aus dem Netzwerk. Es ermöglicht auch das Testen komplexer Muster mit gesendeten Teildaten. ASP.NET Core verwendet es, um verschiedene Aspekte des Kestrel-HTTP-Parsers zu testen.
  • Systeme, die benutzerdefinierten Code für die Verwendung der Haupt-OS-Puffer zulassen (z. B. die registrierte Windows-E / A-API), sind anfänglich für die Verwendung von Pipelines geeignet, da die PipeReader-Implementierung immer Puffer bereitstellt.

Andere verwandte Typen


Wir haben auch eine Reihe neuer, einfacher BCL-Typen zu System.IO.Pipelines hinzugefügt:

  • MemoryPoolT , IMemoryOwnerT , MemoryManagerT . .NET Core 1.0 wurde um ArrayPoolT erweitert , und .NET Core 2.1 verfügt jetzt über eine allgemeinere abstrakte Darstellung für den Pool, der mit beliebigen MemoryTs funktioniert. Wir erhalten einen Erweiterungspunkt, der erweiterte Verteilungsstrategien sowie die Steuerung des Pufferpuffers ermöglicht (z. B. vordefinierte Puffer anstelle von ausschließlich verwalteten Arrays).
  • Der IBufferWriterT ist ein Empfänger zum Aufnehmen synchroner gepufferter Daten (implementiert von PipeWriter).
  • IValueTaskSource - ValueTaskT existiert seit der Veröffentlichung von .NET Core 1.1, hat jedoch in .NET Core 2.1 äußerst effiziente Tools erworben, die unterbrechungsfreie asynchrone Vorgänge ohne Verteilung ermöglichen. Weitere Informationen finden Sie hier .

Wie benutze ich Förderer?


API-Paket sind NuGet System.IO.Pipelines .

Ein Beispiel für eine .NET Server 2.1-Serveranwendung, die Pipelines für die Verarbeitung von Inline-Nachrichten verwendet (aus dem obigen Beispiel), finden Sie hier . Es kann mit dem Dotnet-Lauf (oder Visual Studio) ausgeführt werden. In diesem Beispiel wird eine Datenübertragung vom Socket an Port 8087 erwartet, und die empfangenen Nachrichten werden in die Konsole geschrieben. Um eine Verbindung zu Port 8087 herzustellen, können Sie einen Client wie netcat oder putty verwenden. Senden Sie eine SMS und sehen Sie, wie es funktioniert.

Momentan arbeitet die Pipeline in Kestrel und SignalR, und wir hoffen, dass sie in Zukunft in vielen Netzwerkbibliotheken und Komponenten der .NET-Community weiter verbreitet sein wird.

Jetzt auch beliebt: