Asynchroner Datenaustausch mit einer Remote-Anwendung über SSH

    Guten Tag, Freunde und Kollegen. Mein Name ist immer noch Dmitry Smirnov, und ich bin immer noch zu meiner großen Freude der Entwickler des ISP-Systems. Ich habe vor einiger Zeit angefangen, an einem völlig neuen Projekt zu arbeiten, das mich sehr inspiriert hat, weil in unserem Fall das Fehlen von Legacy-Code und die Unterstützung von alten Compilern das Neue ist. Hallo, Boost, C ++ 17 und alle anderen Freuden der modernen Entwicklung.

    Zufällig waren alle meine bisherigen Projekte multithreading bzw. ich hatte sehr wenig Erfahrung mit asynchronen Lösungen. Dies war für mich in dieser Entwicklung am angenehmsten, neben modernen leistungsstarken Werkzeugen.

    Eine der letzten Aufgaben bestand darin, einen Wrapper über die Bibliothek libssh2 zu schreiben .in der Realität einer asynchronen Anwendung, die Boost.Asio verwendet und nicht mehr als zwei Threads erzeugen kann. Darüber und erzählen.



    Hinweis: Der Autor geht davon aus, dass der Leser mit den Grundlagen der asynchronen Entwicklung und boost :: asio vertraut ist.

    Aufgabe


    Im Allgemeinen war die Aufgabe wie folgt: Verbindung mit einem Remote-Server mithilfe von rsa-key oder Login und Passwort; Laden Sie ein Skript auf die Remote-Maschine herunter und führen Sie es aus. Lesen Sie seine Antworten und senden Sie ihm Befehle über dieselbe Verbindung. In diesem Fall wird der Fluss natürlich nicht blockiert (dies ist die Hälfte des gesamten möglichen Pools).

    Haftungsausschluss : Ich weiß, dass die Arbeit mit SSH in Poco implementiert ist, aber ich habe keinen Weg gefunden, ihn mit Asio zu heiraten, und etwas Eigenes zu schreiben, war interessanter :-).

    Initialisierung


    Um die Bibliothek zu initialisieren und zu minimieren, entschied ich mich für das übliche Singleton:

    Init ()
    classLibSSH2 {public:
      staticvoidInit(){
         static LibSSH2 instance;
      }
    private:
      explicitLibSSH2(){
         if (libssh2_init(0) != 0) {
            throwstd::runtime_error("libssh2 initialization failed");
         }
      }
      ~LibSSH2() {
         std::cout << "shutdown libssh2" << std::endl;
         libssh2_exit();
      }
    };



    In dieser Entscheidung gibt es natürlich auch Fallstricke, laut meinem Lieblingsreferenzbuch „Tausend und eine Möglichkeit, sich in C ++ in den Fuß zu schießen“. Wenn jemand einen Stream generiert, der vergessen wird, um mit dem Essen zu beginnen, und der Hauptfluss früher endet, können interessante Spezialeffekte auftreten. In diesem Fall werde ich diese Möglichkeit jedoch nicht berücksichtigen.

    Hauptentitäten


    Nach der Analyse des Beispiels wird klar, dass wir für unsere kleine Bibliothek drei einfache Entitäten benötigen: einen Socket, eine Session und eine Pipe. Da es nicht schlecht ist, synchrone Instrumente zu haben, werden wir Asio vorerst beiseite lassen.

    Beginnen wir mit einer einfachen Steckdose:

    Sockel
    classSocket {public:
      explicitSocket() : m_sock(socket(AF_INET, SOCK_STREAM, 0)){
         if (m_sock == -1) {
            throwstd::runtime_error("failed to create socket");
         }
      }
      ~Socket() { close(m_sock); }
    private:
      int m_sock = -1;
    }


    Nun die Sitzung:

    Sitzung
    classSession {public:
      explicitSession(constbool enable_compression) : m_session(libssh2_session_init()){
         if (m_session == nullptr) {
            throwstd::runtime_error("failed to create libssh2 session");
         }
         libssh2_session_set_blocking(m_session, 0);
         if (enable_compression) {
            libssh2_session_flag(m_session, LIBSSH2_FLAG_COMPRESS, 1);
         }
      }
      ~Session() {
         conststd::string desc = "Shutting down libssh2 session";
         libssh2_session_disconnect(m_session, desc.c_str());
         libssh2_session_free(m_session);
      }
    private:
      LIBSSH2_SESSION *m_session;
    }
    


    Da wir jetzt einen Socket und eine Session haben, wäre es schön, eine Wait-Funktion für den Socket in der Realität von libssh2 zu schreiben:

    Warten auf Sockel
    intWaitSocket()const{
      pollfd fds{};
      fds.fd = sock;
      fds.events = 0;
      if ((libssh2_session_block_directions(session) & LIBSSH2_SESSION_BLOCK_INBOUND) != 0) { 
         fds.events |= POLLIN;
      }
      if ((libssh2_session_block_directions(session) & LIBSSH2_SESSION_BLOCK_OUTBOUND) != 0) {
         fds.events |= POLLOUT;
      }
      return poll(&fds, 1, 10);
    }


    Tatsächlich unterscheidet sich dies praktisch nicht vom obigen Beispiel, außer dass es select anstelle von poll verwendet.

    Es gibt einen Kanal. In libssh2 gibt es mehrere Arten von Kanälen: Leerlauf, SCP, Direkt-TCP. Wir interessieren uns für den einfachsten Kanal:

    Channel
    classSimpleChannel {public:
      explicitSimpleChannel(session){
         while ((m_channel = libssh2_channel_open_session(session) == nullptr &&
               GetSessionLastError() == LIBSSH2_ERROR_EAGAIN) {
            WaitSocket();
         }
         if (m_channel == nullptr) {
            throwstd::runtime_error("Critical error while opening simple channel");
         }
      }
      void SendEof() {
         while (libssh2_channel_send_eof(m_channel) == LIBSSH2_ERROR_EAGAIN) {
            WaitSocket();
         }
         while (libssh2_channel_wait_eof(m_channel) == LIBSSH2_ERROR_EAGAIN) {
           WaitSocket();
         }
      }
      ~SimpleChannel() {
         CloseChannel();
      }
    private:
      void CloseChannel() {
         int rc;
         while ((rc = libssh2_channel_close(m_channel)) == LIBSSH2_ERROR_EAGAIN) {
            WaitSocket();
         }
         libssh2_channel_free(m_channel);
      }
      LIBSSH2_CHANNEL *m_channel;
    };


    Nun, da alle grundlegenden Tools bereit sind, muss noch eine Verbindung zum Host hergestellt und die erforderlichen Manipulationen durchgeführt werden. Die asynchrone Aufzeichnung auf dem Kanal und die synchrone Synchronisation sind natürlich sehr unterschiedlich, der Verbindungsaufbau ist jedoch nicht der Fall.

    Deshalb schreiben wir die Basisklasse:

    Basisverbindung
    classBaseConnectionImpl {protected:
      explicitBaseConnectionImpl(const SshConnectData &connect_data)///< Это любая удобная структура, содержащая информацию о подключении
            : m_session(connect_data.enable_compression)
            , m_connect_data(connect_data){
         LibSSH2::Init();
         ConnectSocket();
         HandShake();
         ProcessKnownHosts();
         Auth();
      }
      ///Следующие три метода понадобятся нам чуть позжеboolCheckSocket(int type)const{
         pollfd fds{};
         fds.fd = m_sock;
         fds.events = type;
         return poll(&fds, 1, 0) == 1;
      }
      boolWantRead()const{
         return CheckSocket(POLLIN);
      }
      boolWantWrite()const{
         return CheckSocket(POLLOUT);
      }
      /*Я не был уверен, что реализация соединения, которая почти полностью взята из примера
     * будет кому-то интересна.
     */voidConnectSocket(){...}
      voidHandShake(){...}
      voidAuth(){...}
      classSocketm_sock;classSessionm_session;classSimpleChannel;
      SshConnectData m_connect_data;
    }; 


    Jetzt können wir die einfachste Klasse schreiben, um eine Verbindung zu einem Remote-Host herzustellen und einen Befehl darauf auszuführen:

    Synchrone Verbindung
    classConnection::Impl : public BaseConnectionImpl {
    public:
      explicitImpl(const SshConnectData &connect_data)
            : BaseConnectionImpl(connect_data){}
      template <typename Begin>
      voidWriteToChannel(LIBSSH2_CHANNEL *channel, Begin ptr, size_t size){
         do {
            int rc;
            while ((rc = libssh2_channel_write(channel, ptr, size)) == LIBSSH2_ERROR_EAGAIN) {
               WaitSocket();
            }
            if (rc < 0) {
               break;
            }
            size -= rc;
            ptr += rc;
         } while (size != 0);
      }
      voidExecuteCommand(conststd::string &command, conststd::string &in = ""){
         SimpleChannel channel(*this);
         int return_code = libssh2_channel_exec(channel, command.c_str());
         if (return_code != 0 && return_code != LIBSSH2_ERROR_EAGAIN) {
            throwstd::runtime_error("Critical error while executing ssh command");
         }
         if (!in.empty()) {
            WriteToChannel(channel, in.c_str(), in.size());
            channel.SendEof();
         }
         std::string response;
         for (;;) {
            int rc;
            do {
               std::array<char, 4096> buffer{};
               rc = libssh2_channel_read(channel, buffer.data(), buffer.size());
               if (rc > 0) {
                  boost::range::copy(boost::adaptors::slice(buffer, 0, rc), std::back_inserter(response));
               } elseif (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN) {
                  throwstd::runtime_error("libssh2_channel_read error (" + std::to_string(rc) + ")");
               }
            } while (rc > 0);
            if (rc == LIBSSH2_ERROR_EAGAIN) {
               WaitSocket();
            } else {
               break;
            }
         }
       }
    };


    Bis jetzt hatten wir alles, um die Beispiele von libssh2 in eine zivilere Form zu bringen. Nun, da wir alle einfachen Werkzeuge zum synchronen Schreiben von Daten in den Kanal haben, können wir zu Asio gehen.

    Ein Standard-Socket ist gut, aber nicht sehr praktisch, wenn Sie auf seine Bereitschaft warten müssen, asynchron zu lesen / schreiben, während Sie Ihr eigenes Ding tun. Hier kommt boost :: asio :: ip :: tcp :: socket, was eine wunderbare Methode hat:

    async_wait(wait_type, WaitHandler)

    Es ist wunderbar aus einem normalen Socket aufgebaut, für den wir bereits im Vorfeld eine Verbindung aufgebaut haben und boost :: asio :: io_context - den Ausführungskontext unserer Anwendung.

    Asynchroner Verbindungsdesigner
    classAsyncConnection::Impl : public BaseConnectionImpl,
     publicstd::enable_shared_from_this<AsyncConnection::Impl> {
    public:
    Impl(boost::asio::io_context &context, const SshConnectData &connect_data)
         : BaseConnectionImpl(connect_data)
         , m_tcp_socket(context, tcp::v4(), m_sock.GetSocket()) {
      m_tcp_socket.non_blocking(true);
    }
    };



    Jetzt müssen wir mit der Ausführung eines Befehls auf einem Remote-Host beginnen und, sobald Daten von ihm eingehen, einen Rückruf einleiten.

    voidAsyncRun(conststd::string &command, CallbackType &&callback){
      m_read_callback = std::move(callback);
      auto ec = libssh2_channel_exec(*m_channel, command.c_str());
      TryRead();
    }

    Indem wir den Befehl ausführen, übertragen wir die Kontrolle auf die TryRead () - Methode.

    voidTryRead(){
      if (m_read_in_progress) {
         return;
      }
      m_tcp_socket.async_wait(tcp::socket::wait_read, [this, self = shared_from_this()](auto ec) {
         if (WantRead()) {
            ReadHandler(ec);
         }
         if (m_complete) {
            return;
         }
         TryRead();
      });
    }
    

    Zunächst prüfen wir, ob der Lesevorgang bereits von einem früheren Aufruf gestartet wurde. Wenn nicht, erwarten wir die Lesebereitschaft der Steckdose. Als Wait-Handler wird ein regulärer Lambda mit dem Capture von shared_from_this () verwendet.

    Achten Sie auf den WantRead () - Aufruf. Async_wait hat, wie sich herausgestellt hat, auch seine Schwächen und kann bei einem Timeout einfach zurückkehren. Um in diesem Fall keine unnötigen Aktionen durchzuführen, habe ich beschlossen, die Socket-Durchgangsabfrage ohne Zeitüberschreitung zu überprüfen - ob der Socket jetzt wirklich lesen möchte. Wenn nicht, dann führen wir einfach TryRead () erneut aus und warten. Ansonsten fahren wir sofort mit dem Lesen und Übertragen von Daten zum Callback fort.

    voidReadHandler(const boost::system::error_code &error){
      if (error != boost::system::errc::success) {
         return;
      }
      m_read_in_progress = true;
      int ec = LIBSSH2_ERROR_EAGAIN;
      std::array<char, 4096> buffer {};
      while ((ec = libssh2_channel_read(*m_channel, buffer.data(), buffer.size())) > 0) {
         std::string tmp;
         boost::range::copy(boost::adaptors::slice(buffer, 0, ec), std::back_inserter(tmp));
         if (m_read_callback != nullptr) {
            m_read_callback(tmp);
         }
      }
       m_read_in_progress = false;
    }
    

    Somit startet eine unendliche asynchrone Leseschleife der laufenden Anwendung. Der nächste Schritt für uns ist das Senden von Anweisungen an die Anwendung:

    voidAsyncWrite(conststd::string &data, WriteCallbackType &&callback){
      m_input += data;
      m_write_callback = std::move(callback);
      TryWrite();
    }
    

    Die an die asynchrone Aufzeichnung und den Rückruf übertragenen Daten werden in der Verbindung gespeichert. Und führe den nächsten Zyklus aus, nur diesmal den Rekord:

    Schreibzyklus
    voidTryWrite(){
      if (m_input.empty() || m_write_in_progress) {
         return;
      }
      m_tcp_socket.async_wait(tcp::socket::wait_write, [this, self = shared_from_this()](auto ec) {
         if (WantWrite()) {
            WriteHandler(ec);
         } 
         if (m_complete) {
            return;
         }
         TryWrite();
      });
    }
    voidWriteHandler(const boost::system::error_code &error){
      if (error != boost::system::errc::success) {
         return;
      }
      m_write_in_progress = true;
      int ec = LIBSSH2_ERROR_EAGAIN;
      while (!m_input.empty()) {
         auto ptr = m_input.c_str();
         auto read_size = m_input.size();
         while ((ec = libssh2_channel_write(*m_channel, ptr, read_size)) > 0) {
            read_size -= ec;
            ptr += ec;
         }
         AssertResult(ec);
         m_input.erase(0, m_input.size() - read_size);
         if (ec == LIBSSH2_ERROR_EAGAIN) {
            break;
         }
      }
      if (m_input.empty() && m_write_callback != nullptr) {
         m_write_callback();
      }
      m_write_in_progress = false;
    }
    


    Daher schreiben wir Daten in den Kanal, bis alle erfolgreich übertragen wurden. Dann geben wir die Kontrolle an den Anrufer zurück, damit Sie eine neue Dateneinheit übertragen können. Sie können also nicht nur Anweisungen an eine bestimmte Anwendung auf dem Host senden, sondern auch Dateien beliebiger Größe in kleinen Portionen herunterladen, ohne den Thread zu blockieren, was wichtig ist.

    Mit Hilfe dieser Bibliothek konnte ich erfolgreich ein Skript auf einem Remote-Server ausführen, der Änderungen des Dateisystems überwacht, gleichzeitig seine Ausgabe liest und verschiedene Befehle sendet. Im Allgemeinen: eine sehr wertvolle Erfahrung bei der Anpassung der C-Stil-Bibliothek für ein modernes C ++ - Projekt mit Boost.

    Ich würde gerne die Tipps erfahrener Benutzer von Boost.Asio lesen, um mehr zu erfahren und Ihre Lösung zu verbessern :-).

    Jetzt auch beliebt: