4
Sep
0

[C++] A tcp/ip server using OpenMP (with Linux socket)

In this post I put the code of a small program I developed a week ago about an OpenMP server for linux socket.
So this server is using a thread pool and tasks. Also I wrote a minimalist client that uses select to have a non blocking IO and read access.

Comments are in the code.

The server

#ifndef OMPSERVER_H
#define OMPSERVER_H

#include
#include
#include
#include
#include
#include
#include
#include

#include

#include

/** @author Berenger
  * @brief A basic tcp/ip server using openmp
  *
  * This class is a simple server that uses openmp
  * as a thread engine. Client Sockets are stored
  * in a vector. The vector is allocate at the
  * beginning, maximum clients == vector size.
  *
  * All in the hpp => Read & understand faster
  *
  * This code is just for fun, it is a beta that comes without
  * any warranty, you are using this code as your own risks...
  * Also it has not been tested for hours...
  *
  * Has Worked with : gcc (Ubuntu/Linaro 4.5.2-8ubuntu4) 4.5.2
  * If openmp is saying:
  * "libgomp: Thread creation failed: Resource temporarily unavailable",
  * then reduce the number of threads in the pool.
  */

class OmpServer
{
private:
    ///////////////////////////////////////////////
    // Static data
    ///////////////////////////////////////////////

    const static int MaximumClients     = 50;   //< Maximum nb clients also == Nb threads in Pool
    const static int NbQueuedClients    = 4;    //< Queued clients in the listen function
    const static int BufferSize         = 256;  //< Message buffer size     /** Generic method to test network C return value */     static bool IsValidReturn(const int inReturnedValue){         return inReturnedValue >= 0;
    }

    ///////////////////////////////////////////////
    // Attributs
    ///////////////////////////////////////////////

    sockaddr_in         address;    //< Internet address
    int                 sock;       //< Server socket
    std::vector    clients;    //< Clients sockets
    int                 nbClients;  //< Current Nb Clients
    bool                hasToStop;  //< To stop the server

    ///////////////////////////////////////////////
    // Private network functions methods
    ///////////////////////////////////////////////

    /** Bind socket to internet address */
    bool bind(const int inPort){
        address.sin_family       = AF_INET;
        address.sin_addr.s_addr  = INADDR_ANY;
        address.sin_port         = htons ( inPort );

        return IsValidReturn( ::bind( sock, (struct sockaddr*) &address, sizeof(address)) );
    }

    /** The server has to listen to the socket */
    bool listen(){
        return IsValidReturn( ::listen( sock, NbQueuedClients ) );
    }

    /** Accept a new client, this function is non-blocking */
    int accept(){
        int sizeOfAddress = sizeof(address);
        // Because of the nonblocking mode, if accept returns a
        // negative number we cannot know if there is an error OR no new client
        const int newClientSock = ::accept ( sock, (sockaddr*)&address, (socklen_t*)&sizeOfAddress );
        return (newClientSock < 0 ? -1 : newClientSock);
    }

    /** Process a client (manage communication) */
    void processClient(const int inClientSocket){
        // Try to add the new client
        if( !addClient(inClientSocket) ){
            IsValidReturn( write( inClientSocket, "Server is full!\n", 16) );
            ::close(inClientSocket);
            return;
        }

        // Try to send a message
        if( !IsValidReturn( write( inClientSocket, "Welcome\n", 9)) ){
            removeClient(inClientSocket);
            return;
        }

        // Wait message and broadcast
        const size_t BufferSize = 256;
        char recvBuffer[BufferSize];

        const int flag = fcntl(inClientSocket, F_GETFL, 0);
        fcntl(inClientSocket, F_SETFL, flag | O_NONBLOCK);

        while( true ){
            fd_set  readSet;
            FD_ZERO(&readSet);

            FD_SET(inClientSocket, &readSet);

            struct timeval timeout = {1, 0};
            select( inClientSocket + 1, &readSet, NULL, NULL, &timeout);

            // We have to perform the test after the select (may have spent 1s in the select)
            if( hasToStop ){
                break;
            }

            if( FD_ISSET(inClientSocket, &readSet) ){
                FD_CLR(inClientSocket, &readSet);
                const int lenghtRead = read(inClientSocket, recvBuffer, BufferSize);
                if(lenghtRead                     removeClient(inClientSocket);
                    break;
                }
                else if(0 < lenghtRead){
                    broadcast( recvBuffer , lenghtRead);
                }
            }
        }
    }

    ///////////////////////////////////////////////
    // Clients socket management
    ///////////////////////////////////////////////

    /** Remove a client from the vector (CriticalClient)*/
    void removeClient(const int inClientSocket){
        #pragma omp critical(CriticalClient)
        {
            int socketPosition = -1;
            for(int idxClient = 0 ; idxClient < nbClients ; ++idxClient){                 if( clients[idxClient] == inClientSocket ){                     socketPosition = idxClient;                     break;                 }             }             if( socketPosition != -1 ){                 ::close(inClientSocket);                 --nbClients;                 // we switch only the last client and the client to removed                 // because the vector is not ordered                 // if nbClient was 1 => clients[0] = clients[0]
                // if socketPosition is nbClient - 1 => clients[socketPosition] = clients[socketPosition]
                clients[socketPosition] = clients[nbClients];
            }
        }
    }

    /** Add a client in the vector if possible (CriticalClient)*/
    bool addClient(const int inClientSocket){
        bool cliendAdded = false;
        #pragma omp critical(CriticalClient)
        {
            if(nbClients != MaximumClients){
                clients[nbClients++] = inClientSocket;
                cliendAdded = true;
            }
        }
        return cliendAdded;
    }

    /** Send a message to all clients (CriticalClient)*/
    void broadcast(const char*const inMessage, const int inLenght){
        #pragma omp critical(CriticalClient)
        {
            for(int idxClient = 0 ; idxClient < nbClients ; ++idxClient){
                // if we failled we remove the client
                if( write(clients[idxClient], inMessage, inLenght)                     ::close(clients[idxClient]);
                    --nbClients;
                    clients[idxClient] = clients[nbClients];
                    --idxClient;
                }
            }
        }
    }

    /** Forbid copy */
    OmpServer(const OmpServer&){}
    OmpServer& operator=(const OmpServer&){ return *this; }

public:
    ///////////////////////////////////////////////
    // Public methods
    ///////////////////////////////////////////////

    /** The constructor inits the socket, but not only.
      * It also installs the socket with the bind and
      * starts the listen.
      * So, as soon the server is created, the clients
      * can be queued even if no call to run has been
      * performed.
      */
    OmpServer(const int inPort = 59898)
        : sock(-1), clients(MaximumClients,-1), nbClients(0), hasToStop(true) {
        memset(&address, 0, sizeof(address));

        if( IsValidReturn( sock = socket(AF_INET, SOCK_STREAM, 0)) ){
            // Set option to true
            const int optval = 1;
            if( !IsValidReturn(setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&optval, sizeof(optval)) ) ){
                close();
            }

            if( !bind(inPort) ){
                close();
            }

            if( !listen() ){
                close();
            }

            const int flag = fcntl(sock, F_GETFL, 0);
            fcntl(sock, F_SETFL, flag | O_NONBLOCK);
        }
    }

    /** Destructor closes the server socket
      * The clients have already been closed (at end of run)
      */
    virtual ~OmpServer(){
        close();
    }

    /** Is valid if sock != -1 */
    bool isValid() const {
        return sock != -1;
    }

    /** Simply set the loop boolean to true */
    void stopRun(){
        hasToStop = true;
    }

    /** This function closes the socket */
    bool close(){
        if(isValid()){
            const bool noErrorCheck = IsValidReturn(::close(sock));
            sock = -1;
            return noErrorCheck;
        }
        return false;
    }

    /** The run starts to accept the clients and execute
      * one thread per client.
      * When hasToStop is set to true (stopRun called),
      * the master thread stops to accept clients and waits all
      * others threads.
      * Then the clients sockets are closed (we choose here to perform
      * this at the end, but it can be done into the task...)
      *
      * Please remark that all threads are created at the beginning.
      * This is because the task in openmp are tied (even with untied).
      * In a better world, we will use the default number of thread,
      * create untied task, and let the thread switching from one task to another
      * but tasks switch very rarely so we need to create a pool of threads and
      * use one thread per task.
      */
    bool run(){
        if( omp_in_parallel() || !isValid() ){
            return false;
        }

        hasToStop = false;

        // You may be able to remove num_threads(MaximumClients) with intel (not sure)
        #pragma omp parallel num_threads(MaximumClients)
        {
            #pragma omp single nowait
            {
                while( !hasToStop ){
                    const int newSocket = accept();
                    if( newSocket != -1 ){
                        #pragma omp task untied
                        {
                            processClient(newSocket);
                        }
                    }
                    else{
                        usleep(200);
                    }
                }
                #pragma omp taskwait
            }
        }

        // Close sockets with default threads number this time
        #pragma omp parallel for
        for(int idxClient = 0 ; idxClient < nbClients ; ++idxClient ){
            ::close(clients[idxClient]);
        }
        nbClients = 0;

        return true;
    }
};

#endif // OMPSERVER_H

The Main

We use signal handling to stop the server (control + c).

#include "ompserver.h"

#include
#include

#include

/** @author Berenger
  * @brief Basic main to test the OmpServer.
  */

// The current Server
static OmpServer* ServerToClose = 0;

// Ctr + c => stop the server
void stopServerSignal(int){
    if( ServerToClose ){
        ServerToClose->stopRun();
    }
}

// Simplest Main as possible
int main(){
    // Install handler
    if( signal(SIGINT , stopServerSignal) == SIG_ERR ){
        std::cout << "Error cannot install signal handler..." << std::endl;
        return 56;
    }
    std::cout << "Presse Ctr + C to stop..." << std::endl;

    // Create and run the server
    OmpServer server;

    ServerToClose = &server;

    server.run();

    // Remove the ugly ^C
    std::cout << "\r";
    return 0;
}

The Client

#include
#include
#include
#include
#include
#include
#include
#include

#include
#include

/** @author Berenger
  * @brief A basic tcp/ip client using unix socket
  *
  * This is a simple main which creates a socket
  * with a port and an ip in hard. Then, it runs
  * an infinite loop until the user writes "quit"
  * or the server close the connexion.
  * Stdin & the socket are in a nonblocking state.
  * Stdin is sent to the server, socket is print to
  * stdout.
  */

int main(){
    // Const Declaration
    const int Stdin = STDIN_FILENO; //0
    const size_t BufferSize = 256;
    const int DefaultPort = 59898;
    const char* const DefaultAddress = "127.0.0.1";

    // Our socket
    int sock = -1;

    if( (sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0 ){
        std::cout << "[Error] Cannot create the socket" << std::endl;
        return 1;
    }

    // The Sockaddr
    struct sockaddr_in server;
    memset(&server, 0, sizeof(server));
    server.sin_family       = AF_INET;
    server.sin_addr.s_addr  = inet_addr(DefaultAddress);
    server.sin_port         = htons(DefaultPort);

    if( connect(sock, (struct sockaddr *) &server, sizeof(server)) < 0){
        std::cout << "[Error] Cannot connect to server" << std::endl;
        close(sock);
        return 1;
    }

    // Set the socket to non blocking state
    const int flag = fcntl(sock, F_GETFL, 0);
    fcntl(sock, F_SETFL, flag | O_NONBLOCK);

    // We are connected
    std::cout << "[Status] Connected, tape \"quit\" to exit" << std::endl;

    char sendBuffer[BufferSize + 1];
    char recvBuffer[BufferSize + 1];

    bool stop = false;

    while( !stop ){
        // Set the FD to our socket & stdin
        fd_set  readSet;
        FD_ZERO(&readSet);

        FD_SET(Stdin, &readSet);
        FD_SET(sock, &readSet);

        // if stdin != 0 first parameter should be Max(Stdin , sock) + 1
        select( sock + 1, &readSet, NULL, NULL, NULL);

        // Can we read from the socket?
        if( FD_ISSET(sock, &readSet) ){
            FD_CLR(sock, &readSet);
            const int lenghtRead = read(sock, recvBuffer, BufferSize);
            if(lenghtRead                 std::cout << "[Disconnected from server]" << std::endl;
                // we do not want to break the loop because
                // we may have to clean the stdin before
                stop = true;
            }
            else if(0 < lenghtRead){
                recvBuffer[lenghtRead] = '\0';
                std::cout << "[Data] " << recvBuffer;
            }
        }

        // Can we read from stdin?
        if( FD_ISSET(Stdin, &readSet) ){
            FD_CLR(Stdin, &readSet);
            memset(sendBuffer, 0, BufferSize);
            const int lenghtRead = read(Stdin, sendBuffer, BufferSize);
            if(lenghtRead){
                if( lenghtRead == 5 && memcmp("quit", sendBuffer, 4) == 0 ){
                    stop = true;
                }
                else if( write(sock, sendBuffer, lenghtRead) < 0){
                    std::cout << "[Disconnected from server]" << std::endl;
                    stop = true;
                }
            }
        }
    }

    close(sock);

    return 0;
}

References

http://gnosis.cx/publish/programming/sockets.html
http://rhoden.id.au/doc/sockets2.html
http://www.evanjones.ca/software/threading.html
The Design of OpenMP Tasks

Enjoyed reading this post?
Subscribe to the RSS feed and have all new posts delivered straight to you.

Comments are closed.

Celadon theme by the Themes Boutique