13
Jul
0

[C++][OpenMP] Parallel Quick Sort, source code

In this post I will give a code of a parallel quick sort.
This sort can be improve because we are in shared memory architecture. But, I build it to create a MPI version (source code also available on the blog). And as said “Introduction to Parallel Computing” (A.G. A.G. G.K. V.K.) I need a thread version before a MPI one.

Another (an little faster version) is available here. This second version use a completely different algorithm!!!

PS : I developed several quick sort (available on this blog), a sequential version, an openmp tasks version, a openmp not inplace version, an mpi version and a Qt concurent version.


(Sequential Sorts are available here)

The code

First wee need the Openmp custom barrier

#ifndef FOMPBARRIER_HPP
#define FOMPBARRIER_HPP

#include <omp.h>
#include <climits>

/** This function is a custom omp barrier
  * Because openmp give only a global barrier we need
  * to be ablo to peform a barrier operation between a group
  * of thread only.
  */

class FOmpBarrier {
private:
    int nbThreads;          //<The number of threads for this barrier
    int currentNbThread;    //<The current number of threads waiting
    bool sense;             //<Direct barrier feedback protection
    omp_lock_t mutex;       //<To have an atomic int

    FOmpBarrier(FOmpBarrier&){}
    FOmpBarrier& operator=(FOmpBarrier&){return *this;}

public:
    /** Constructor with the number of threads */
    FOmpBarrier(const int inNbThreads = INT_MAX)
        : nbThreads(inNbThreads), currentNbThread(0), sense(false) {
        omp_init_lock( &mutex );
    }

    /** Destructor, release the omp lock */
    ~FOmpBarrier(){
        omp_destroy_lock( &mutex );
    }

    /** Perform a barrier */
    void wait(){
        const bool mySense = sense;
        omp_set_lock( &mutex );
        const int nbThreadsArrived = (++currentNbThread);
        omp_unset_lock( &mutex );

        if(nbThreadsArrived == nbThreads) {
            currentNbThread = 0;
            sense = !sense;
            #pragma omp flush(sense)
        }
        else {
            volatile const bool* const ptSense = &sense;
            while( (*ptSense) == mySense){
            }
        }
    }

    /** Change the number of threads */
    void setNbThreads(const int inNbThread){
        omp_set_lock( &mutex );
        nbThreads = inNbThread;
        omp_unset_lock( &mutex );
    }
};

#endif // FOMPBARRIER_HPP

Then we can code the sort:

#include <cstdio>
#include <omp.h>
#include <cmath>
#include <ctime>
#include <cstdlib>
#include <cstring>

////////////////////////////////////////////////////////////
// Miscialenous functions
////////////////////////////////////////////////////////////

/** Swap to value */
template <class NumType>
inline void Swap(NumType& value, NumType& other){
    NumType temp = value;
    value = other;
    other = temp;
}

////////////////////////////////////////////////////////////
// Split information
////////////////////////////////////////////////////////////

long getLeft(const long inSize, const int inIdProc, const int inNbProc) {
    const double step = (double(inSize) / inNbProc);
    return long(ceil(step * inIdProc));
}

long getRight(const long inSize, const int inIdProc, const int inNbProc) {
    const double step = (double(inSize) / inNbProc);
    const long res = long(ceil(step * (inIdProc+1)));
    if(res > inSize) return inSize;
    else return res;
}

long getOtherRight(const long inSize, const int other, const int inNbProc) {
    const double step = (double(inSize) / inNbProc);
    const long res = long(ceil(step * (other+1)));
    if(res > inSize) return inSize;
    else return res;
}

int getProc(const int position, const long inSize, const int inNbProc) {
    const double step = (double(inSize) / inNbProc);
    return int(position/step);
}
    ////////////////////////////////////////////////////////////
    // Quick sort
    ////////////////////////////////////////////////////////////

    /* use in the sequential qs */
    template <class SortType, class PivotType>
    static FSize QsPartition(SortType array[], FSize left, FSize right){
        const FSize part = right;
        Swap(array[part],array[(right + left ) / 2]);
        --right;

        while(true){
            while(PivotType(array[left]) < PivotType(array[part])){
                ++left;
            }
            while(right >= left && PivotType(array[part]) <= PivotType(array[right])){
                --right;
            }
            if(right < left) break;

            Swap(array[left],array[right]);
            ++left;
            --right;
        }

        Swap(array[part],array[left]);

        return left;
    }

    /* a local iteration of qs */
    template <class SortType, class PivotType>
    static void QsLocal(SortType array[], const PivotType& pivot,
               FSize myLeft, FSize myRight,
               FSize& prefix, FSize& sufix){

        FSize leftIter = myLeft;
        FSize rightIter = myRight;

        while(true){
            while(PivotType(array[leftIter]) <= pivot && leftIter < rightIter){
                ++leftIter;
            }
            while(leftIter <= rightIter && pivot < PivotType(array[rightIter])){
                --rightIter;
            }
            if(rightIter < leftIter) break;

            Swap(array[leftIter],array[rightIter]);
            ++leftIter;
            --rightIter;
        }

        prefix = leftIter - myLeft;
        sufix = myRight - myLeft - prefix + 1;
    }

   /* a sequential qs */
    template <class SortType, class PivotType>
    static void QsSequential(SortType array[], const FSize left, const FSize right){
        if(left < right){
            const FSize part = QsPartition<SortType,PivotType>(array, left, right);
            QsSequential<SortType,PivotType>(array,part + 1,right);
            QsSequential<SortType,PivotType>(array,left,part - 1);
        }
    }

    /* the openmp qs */
    template <class SortType, class PivotType>
    static void QsOmp(SortType array[], const FSize size){
        FTRACE( FTrace::FFunction functionTrace(__FUNCTION__, "Quicksort" , __FILE__ , __LINE__) );
        struct Fix{
            FSize pre;
            FSize suf;
        };

        const int NbOfThreads = omp_get_max_threads();

        Fix fixes[NbOfThreads + 1];
        Fix allFixesSum[NbOfThreads + 1][NbOfThreads];

        memset(fixes,0,sizeof(Fix) * NbOfThreads);
        memset(allFixesSum,0,sizeof(Fix) * (NbOfThreads + 1) * NbOfThreads);

        SortType*const temporaryArray = reinterpret_cast<SortType*>(new char[sizeof(SortType) * size]);

        FOmpBarrier barriers[NbOfThreads];

        #pragma omp parallel
        {
            const int myThreadId = omp_get_thread_num();

            FSize myLeft = getLeft(size, myThreadId, omp_get_num_threads());
            FSize myRight = getRight(size, myThreadId, omp_get_num_threads()) - 1;

            FSize startIndex = 0;
            FSize endIndex = size - 1;

            int firstProc = 0;
            int lastProc = omp_get_num_threads() - 1;

            while( firstProc != lastProc && (endIndex - startIndex + 1) != 0){
                Fix* const fixesSum = &allFixesSum[0][firstProc];
                const FSize nbElements = endIndex - startIndex + 1;

                if(myThreadId == firstProc){
                    barriers[firstProc].setNbThreads( lastProc - firstProc + 1);
                }

                // sort QsLocal part of the array
                const PivotType pivot = (PivotType(array[startIndex]) + PivotType(array[endIndex]) )/2;
                barriers[firstProc].wait();

                QsLocal(array, pivot, myLeft, myRight, fixes[myThreadId].pre, fixes[myThreadId].suf);

                // wait others that work on this part
                #pragma omp flush(array)
                barriers[firstProc].wait();

                // merge result
                if(myThreadId == firstProc){
                    fixesSum[firstProc].pre = 0;
                    fixesSum[firstProc].suf = 0;
                    for(int idxProc = firstProc ; idxProc <= lastProc ; ++idxProc){
                        fixesSum[idxProc + 1].pre = fixesSum[idxProc].pre + fixes[idxProc].pre;
                        fixesSum[idxProc + 1].suf = fixesSum[idxProc].suf + fixes[idxProc].suf;
                    }
                    #pragma omp flush(fixesSum)
                }
                // prepare copy
                if(myThreadId == firstProc + 1){
                    FMemUtils::memcpy(&temporaryArray[startIndex], &array[startIndex], sizeof(SortType) * nbElements );
                    #pragma omp flush(temporaryArray)
                }

                barriers[firstProc].wait();

                // copy my result where it belong (< pivot)
                FMemUtils::memcpy(&array[startIndex + fixesSum[myThreadId].pre], &temporaryArray[myLeft], sizeof(SortType) * fixes[myThreadId].pre);

                // copy my result where it belong (> pivot)
                const FSize sufoffset = fixesSum[lastProc + 1].pre + startIndex;
                FMemUtils::memcpy(&array[sufoffset + fixesSum[myThreadId].suf], &temporaryArray[myLeft + fixes[myThreadId].pre ], sizeof(SortType) * fixes[myThreadId].suf);

                barriers[firstProc].wait();

                // find my next QsLocal part
                int splitProc = getProc(sufoffset - startIndex, nbElements, lastProc - firstProc + 1) + firstProc;
                if(splitProc == lastProc){
                    --splitProc;
                }

                if( myThreadId <= splitProc ){
                    endIndex = sufoffset - 1;
                    lastProc = splitProc;
                }
                else{
                    startIndex = sufoffset;
                    firstProc = splitProc + 1;
                }

                myLeft = getLeft(endIndex - startIndex + 1, myThreadId - firstProc, lastProc - firstProc + 1) + startIndex;
                myRight = getRight(endIndex - startIndex + 1, myThreadId - firstProc, lastProc - firstProc + 1) + startIndex - 1;
            }

            QsSequential<SortType,PivotType>(array,myLeft,myRight);
        }

        delete[] reinterpret_cast<char*>(temporaryArray);
    }

Using the code

////////////////////////////////////////////////////////////
// Main
////////////////////////////////////////////////////////////

bool isSorted(long long array[], const long size){
    for(int idx = 1; idx < size ; ++idx){
        if(array[idx-1] > array[idx]){
            return false;
        }
    }
    return true;
}

void print(long long array[], const int size){
    for(int idx = 0 ;idx < size; ++idx){
        printf("%lld\t",array[idx]);
    }
    printf("\n");
}

int main(int, char**){
    const long Size = 40000000;
    long long* const array = new long long[Size];

    srand(0);

    for(long idx = 0 ; idx < Size ; ++idx){
        array[idx] = rand();
    }

    QsOmp(array, Size);

    if(isSorted(array,Size)){
        printf("Is sorted\n");
    }
    else{
        printf("Error array is not sorted!\n");
        return -1;
    }

    delete [] array;
    //print(array,Size);
    return 0;
}

License

License is as all the blog LGPLa href=”http://www.amazon.com/Introduction-Parallel-Computing-Ananth-Grama/dp/0201648652/ref=sr_1_1?ie=UTF8

Enjoyed reading this post?
Subscribe to the RSS feed and have all new posts delivered straight to you.

Comments are closed.

Celadon theme by the Themes Boutique