[C++][Algorithm] Lock free data structure for the lock-or-delegate problem

This class solves the lock or delegate.
The case study could be the following, imagine you would like to protect a ressource
between your threads (it could be a file access, or any critical module).
The usual way to do that it, is with a mutex:

// All threads arrive there
mutex.lock();
// Do my thing alone
// Let someone else do its job and leave the mutex
mutex.unlock();

For some cases this approach works well or is the more appropriate.
However, it is easy to see that there is some bad points,
first of all the threads wait in the lock, and maybe they do not need to get the result
from the critical section now, maybe they have other things to do.
To solve this statement we can imagine using async and futures.

std::future<Type> result = std::async([&](){
    // All sync threads arrive there
    mutex.lock();
    // Do my thing alone
    // Let someone else do its job and leave the mutex
    mutex.unlock();
});
// Do something else...
// And when I need it just ask the future
result.get();

This approach can be fine, but you may create extra threads (more than the number of CPU)
and slowing down the work because there will be memory contension, switch,
caches degradation and process switchs.
So if the threads do not need the results, when they write to a file for example
we would like to have the following behavior:
ask for the mutex,
if I got it, I do my work and any work that have not been done because of me
else just put my work in a list and let the one who has the mutex do it.
This is what lock or delegate is about.
It can be implemented using two mutexes or one mutex plus one semaphore I guess.
And I propose an esay implementation based on mutexes.
But in order to avoid the usage of mutexes which can be heavy! we use lock free operations.
In the best case calling the LockOrDelegate will cost 2 lock-free instructions (mainly).
But you have to keep in mind that, the one who gets the lock may do the work of all others.
My implementation has been tested and looks to work fine.
There are certainly lots of possible implementations but the one I propose if very light
and take only a few lines which makes it very easy to understand.
(some may have already propose the same implementation has mine sorry for that in this case).

Based on mutexes

The main class:

//////////////////////////////////////////////////////
/// Lock or delegate work class
/// Author : Berenger (berenger.eu)
///
/// This class is the one based on mutexes.
/// Only one thread at a time can access the ressource.
/// It is more easy to understand compare to the lock free one.
//////////////////////////////////////////////////////

#include <atomic>
#include <cassert>
#include <mutex>
#include <list>
#include <functional>

class LockOrDelegate {
protected:
    // The list of task to proceed
    std::list<std::function<void(void)> > toProceed;
    // To protect the mutex
    std::mutex listMutex;
    // To know if a thread is the worker (which should proceed the list)
    std::mutex workerMutex;

    // Proceed everything
    void proceedAll(){
        // Protect the access to the list
        listMutex.lock();
        while(toProceed.size()){
            std::function<void(void)> fonc = toProceed.front();
            toProceed.pop_front();
            listMutex.unlock();

            // Proceed
            fonc();

            // Protect the access to the list
            listMutex.lock();
        }
        // release the worker mutex first!
        workerMutex.unlock();
        listMutex.unlock();
    }

public:
    //< Apply the function and all the list or return directly
    bool doItOrDelegate(std::function<void(void)> fonc){
        // We coule remove this test but it is just to avoid to put in the
        // list if it is not needed
        if(workerMutex.try_lock()){
            // If I am the worker
            fonc();
            proceedAll();
            // Done in proceedAll workerMutex.unlock();
            return true;
        }

        // Insert into the list
        listMutex.lock();
        toProceed.push_back(fonc);
        listMutex.unlock();

        // Maybe the worker did not see my work
        if(workerMutex.try_lock()){
            proceedAll();
            // Done in proceedAll  workerMutex.unlock();
            return true;
        }

        return false;
    }

};

The test function

#include <mutex>
#include <vector>
#include <cassert>
#include <iostream>
#include <unistd.h>
#include <thread>

// g++ -02 delegatelock.cpp -std=c++11 -o delegatelock.exe -lpthread
void test(const int nbThreads, const int Size, unsigned int sleepBetween, unsigned int sleepInside){
    std::cout << "[INFO] Size      = " << Size << "\n";
    std::cout << "[INFO] nbThreads = " << nbThreads << "\n";
    std::cout << "[INFO] sleepBetween = " << sleepBetween << "\n";
    std::cout << "[INFO] sleepInside  = " << sleepInside << "\n";

    std::vector<std::thread> threads(nbThreads);
    LockOrDelegate dl;
    std::mutex assertionMutex;

    int totalActions = 0;

    for(int idxThread = 0 ; idxThread < nbThreads ; ++idxThread){
        threads[idxThread] = std::thread([&](){
            int nbTimesOwn = 0;
            for(int i = 0 ; i < Size ; ++i){
                usleep(sleepBetween);
                const bool IOwn = dl.doItOrDelegate([&](){
                //const bool IOwn = dl.doItOrDelegateOpt([&](){
                    // Only one thread will be here at a time!
                    assert(assertionMutex.try_lock());
                    totalActions += 1;
                    usleep(sleepInside);
                    assertionMutex.unlock();
                });

                if(IOwn){
                    nbTimesOwn += 1;
                }
            }
            std::cout << "I Owned " << nbTimesOwn << "\n";
        });
    }

    for(int idxThread = 0 ; idxThread < nbThreads ; ++idxThread){
        threads[idxThread].join();
    }

    assert(totalActions == nbThreads*Size);
    std::cout << "Done.\n";
}

int main(){
    const int nbThreads   = std::thread::hardware_concurrency();

    test(nbThreads, 500, 400, 0);
    test(nbThreads, 50000, 40, 0);
    test(nbThreads, 500, 400, 30);

    return 0;
}

The Lock Free approach

The main class:

//////////////////////////////////////////////////////
/// Lock or delegate work class
/// Author : Berenger (berenger.eu)
///
/// This class is a lock free way to share ressource.
/// Only one thread at a time can access the ressource.
/// But if the ressource is already use, it will delegate the
/// work to the current "mutex" owner.
//////////////////////////////////////////////////////

#include <atomic>
#include <cassert>

/**
 * The two main ways to use the class are:
 * 1)
 *     bool doItOrDelegate(DataClass data){
 *        if(toProceed.getTicketAndInsert(data)){
 *            // I am owner I proceed every thing
 *            do{
 *                DataClass dataToProceed = toProceed.popData();
 *                // Do it to dataToProceed
 *            }while(toProceed.checkNext());
 *            // Tell if I was the owner
 *            return true;
 *        }
 *        return false;
 *    }
 *
 * 2)
 *     bool doItOrDelegate(DataClass data){
 *        const int ticket = toProceed.getTicket();
 *        if(ticket == 0){
 *            // I am owner I proceed my data wihout puting it in the list
 *            // Do it to data
 *            // And check if we need to work on others
 *            while(toProceed.checkNext()){
 *                DataClass dataToProceed = toProceed.popData();
 *                // Do it to dataToProceed
 *            }
 *            // Tell if I was the owner
 *            return true;
 *        }
 *        // I am not the owner someone else will work on it
 *        toProceed.insert(data);
 *        return false;
 *    }
 */
template<typename DataClass>
class Cumulator{
private:
    /** Node of our lock free list */
    struct LockFreeNode{
        //< Data
        DataClass data;
        //< Next node
        LockFreeNode* next;
        //< Init from data (next is set to null)
        LockFreeNode(const DataClass & inData)
            : data(inData), next(nullptr){
        }
    };

    //< Lock free list head
    std::atomic<LockFreeNode*> listHead;
    //< Current number of nodes in the list
    std::atomic<int> nbCumulate;
    
public:
    /** Create an empty cumulator */
    Cumulator() : listHead(nullptr), nbCumulate(0){
    }

    /**
     * Get current ticket (if zero is returned current caller is the owner)
     * After this call nbCumulate is inc by one.
     * insert must be called after!
     */
    int getTicket(){
        // Get my index
        int insertionPosition = nbCumulate.load();
        while(!nbCumulate.compare_exchange_weak(insertionPosition,insertionPosition+1));
        return insertionPosition;
    }

    /**
     * Insert into the list (should be call after getTicket)
     */
    void insert(const DataClass& inData){
        LockFreeNode* const newNode = new LockFreeNode(inData);
        newNode->next = listHead.load();
        // if listHead == newNode->next then listHead = newNode
        // else newNode->next = listHead
        while(!listHead.compare_exchange_weak(newNode->next,newNode));
        // *newNode cannot be used anylonger (it may have been pop)
    }

    /**
     * Insert data and return true if the current call become the owner of the ressource.
     * This function get a ticket and insert into the list.
     */
    bool getTicketAndInsert(const DataClass& inData){
        // Get the ticket (0 if we own)
        const int insertionPosition = getTicket();
        // Insert the data in any case
        insert(inData);
        // Inform the caller
        if( insertionPosition != 0 ){
            // We do not own
            return false;
        }
        else{        
            // We own
            return true;
        }
    }

    /**
     * This function should be call by the owner to get the next task.
     */
    DataClass popData(){
        assert(nbCumulate.load() > 0);
        
        // Request next node
        LockFreeNode* removedNode = listHead.load();
        // Maybe it has not been pushed yet (listHead.load() == nullptr)
        while((removedNode = listHead.load()) == nullptr || !listHead.compare_exchange_weak(removedNode,removedNode->next));
        assert(removedNode);
        // Get Data
        DataClass data = removedNode->data;
        // Delete node
        delete removedNode;

        return data;
    }
    
    /**
     * Once the data has been proceed and the ressource relaxed,
     * the owner should call this function to know if there is something more to do.
     */
    bool checkNext(){
        assert(nbCumulate.load() > 0);
        int removedPosition = nbCumulate.load();
        while(!nbCumulate.compare_exchange_weak(removedPosition,removedPosition-1));
        return removedPosition-1 != 0;
    }
};

An interface:

#include <functional>

/**
 * This class use a Cumulator correclty (with the two possible solution)
 * and accept as a working type functions (void in parameter and return value).
 */
class LockOrDelegate {
protected:
    Cumulator<std::function<void(void)> > toProceed;

public:
    //< Apply the function and all the list or return directly
    bool doItOrDelegate(std::function<void(void)> fonc){
        if(toProceed.getTicketAndInsert(fonc)){
            do{
                std::function<void(void)> foncToProceed = toProceed.popData();
                foncToProceed();
            }while(toProceed.checkNext());
            return true;
        }
        return false;
    }

    //< Apply the function and all the list or return directly
    //< It has one allocation less than the previous one because
    //< if we are the owner we do not put data in the list
    bool doItOrDelegateOpt(std::function<void(void)> data){
        const int ticket = toProceed.getTicket();
        if(ticket == 0){
            data();
            while(toProceed.checkNext()){
                std::function<void(void)> foncToProceed = toProceed.popData();
                foncToProceed();
            }
            return true;
        }
        // We do not own so put it in the list
        toProceed.insert(data);
        return false;
    }
};

The test functions:

#include <mutex>
#include <vector>
#include <cassert>
#include <iostream>
#include <unistd.h>
#include <thread>

// I compile this code with:
// g++ -O2 delegatelock.cpp -std=c++11 -o delegatelock.exe -lpthread
// As a test I create several threads and ask them
// To call Size times a function which assert that no one is
// inside the function as the same time as us and
// inc a counter.
// To test different behaviors we can increase the duration of sleep
// or remove the sleep inside the function.

void test(const int nbThreads, const int Size, unsigned int sleepBetween, unsigned int sleepInside){
    std::cout << "[INFO] Size      = " << Size << "\n";
    std::cout << "[INFO] nbThreads = " << nbThreads << "\n";
    std::cout << "[INFO] sleepBetween = " << sleepBetween << "\n";
    std::cout << "[INFO] sleepInside  = " << sleepInside << "\n";

    std::vector<std::thread> threads(nbThreads);
    LockOrDelegate dl;
    std::mutex assertionMutex;

    int totalActions = 0;

    for(int idxThread = 0 ; idxThread < nbThreads ; ++idxThread){
        threads[idxThread] = std::thread([&](){
            int nbTimesOwn = 0;
            for(int i = 0 ; i < Size ; ++i){
                usleep(sleepBetween);
                const bool IOwn = dl.doItOrDelegate([&](){
                //const bool IOwn = dl.doItOrDelegateOpt([&](){
                    // Only one thread will be here at a time!
                    assert(assertionMutex.try_lock());
                    totalActions += 1;
                    usleep(sleepInside);
                    assertionMutex.unlock();
                });

                if(IOwn){
                    nbTimesOwn += 1;
                }
            }
            std::cout << "I Owned " << nbTimesOwn << "\n";
        });
    }

    for(int idxThread = 0 ; idxThread < nbThreads ; ++idxThread){
        threads[idxThread].join();
    }

    assert(totalActions == nbThreads*Size);
    std::cout << "Done.\n";
}

int main(){
    const int nbThreads   = std::thread::hardware_concurrency();

    test(nbThreads, 500, 400, 0);
    test(nbThreads, 50000, 40, 0);
    test(nbThreads, 500, 400, 30);

    return 0;
}

Of course, it could be possible to use promise/future in this implementation, but they rely on mutexes and that makes them inappropriate here.