2016年5月30日 星期一

Task System (Thread Pool)

Overview

    This article is trying to implement a Task System by using C++11. A TaskSystem will create threads (at its initialization) and process tasks in parallel (see following figure). Each task can be defined and customized by the user.


Motivation

    After reading the "C++ Concurrency In Action", I want to do some exercise. Therefore, that's what this article about: Design and implement a Task System. Furthermore, I have seen an article that discussed about the difficulty to implement Task-Based Parallelism in C++. Therefore, this is what I temp to do here: to give an alternative way to approach the Task System in C++11.

Introduction

   As mentioned by Anthony Williams in his "C++ Concurrency In Action", there're  several circumstances that you might want to apply multi-threads:

1. Each thread perform different work: For example, you might want to use one thread for GUI, another for controlling the inner data so that the GUI can respond to user quickly. This might be the best situation to use threads.

2. Each thread do the same job but process different data: You might divide data into several chunks and submit each chunk to different threads. In this case, how do you split the data and how many threads is created (usually the same number as your processors) harshly effect the performance.

Task System

   The second case is what this article aim for. In that case, you might want to create threads as many as your processors at the beginning (since the creation of thread takes time). And then you can submit jobs (tasks) to those threads. The thread then processes your task and return its result in some way. This is what I call the Task System here (also called thread pool in "C++ Concurrency In Action").
   However, Bartosz Milewski have mentioned in this article that since the std::mutex as well as the thread_local keyword is Thread-Based instead of Task-Based, it has a potential difficulty to implement such Task System.

   In this article, I'd try to implement a simple Task System under two limitations:

1. The task will not migrate from one thread to another.

2. Once a task have been taken by one thread and is being processed, it will not be interrupted or be 'Context Switched' till the end of the task.

And its usage will look like follows (all of the code in this article should be compiled with C++11):
        TaskSystem taskSystem;
        taskSystem.Start();

        SomeTask task;
        std::future<ResultType> resultHolder = task.GetResult();
        taskSystem.Submit( std::make_shared(std::move(task)) );

        ResultType result = std::move(resultHolder.get());

    Here, the TaskSystem is created at the first line, then calling the TaskSystem::Start() will create threads (which will run the user defined task when you submit one). Then I create a Task (which should be inherited from TaskInterface), and get std::future from TaskInterface::GetResult(). Note that std::future is like a bridge between current thread and the work thread. You can hold it which will return the result when you call std::future::get(). At that time, the current thread will wait until the work thread finishes its task. Also note that the task is wrapped by the std::shared_ptr before it has been submitted so that the Task can be  deleted properly.
    The TaskInterface that every Task should inherit is shown as follows:
        template<typename Returntype>
        class TaskInterface
        {
                public:
                        virtual void Run(void) = 0;

                        virtual std::future<Returntype> GetResult(void) = 0;

                        virtual void Interrupt(void) = 0;

                        virtual ~TaskInterface() {};
        };

    The user should define what his Task should do by overriding the function TaskInterface::Run(). The return value is wrapped by std::future which can be obtained by calling TaskInterface::GetResult(). If you want to stop all tasks that have been submited to TaskSystem, you can call TaskSystem::Stop(). In this case, the TaskInterface::Interrupt() will be called if the task is still in the waiting queue of the TaskSystem. Finally, the virtual destructor should be overridden so that the derived class can be deleted properly in polymorphism.

    Here is the implementation of TaskSystem:
        template<typename TaskReturnType>
        class TaskSystem final
        {
                typedef std::shared_ptr< TaskInterface<TaskReturnType> > TaskPtr;

                public:
                        void Submit(const TaskPtr& taskPtr_);

                        void Start(size_t numberForProcessors = std::thread::hardware_concurrency() );

                        void Stop(void);

                        TaskSystem() : taskQueue(), listOfThreads(), shouldStop(false) {}
                        ~TaskSystem();
                private:
                        ThreadSafeQueue< TaskPtr > taskQueue;
                        std::vector<std::thread> listOfThreads;
                        std::atomic<bool> shouldStop;

                        // private functions
                        void WorkerThread(void);
        };

        // Implementation of TaskSystem
        template<typename TaskReturnType>
        void TaskSystem<TaskReturnType>::Submit(const TaskPtr& task_)
        {
                this->taskQueue.Push( task_ );
        }

        template<typename TaskReturnType>
        void TaskSystem<TaskReturnType>::Start(size_t numberForProcessors_)
        {
                try
                {
                        for(size_t i=0; i < numberForProcessors_; ++i)
                        {
                                this->listOfThreads.push_back(
                                                                std::thread(&TaskSystem::WorkerThread,
                                                                            this)
                                                             );
                        }
                }
                catch(...)
                {
                        this->shouldStop.store(true);
                        throw;
                }
        }

        template<typename TaskReturnType>
        void TaskSystem<TaskReturnType>::WorkerThread(void)
        {
                while(not this->shouldStop.load())
                {
                        try
                        {
                                TaskPtr taskPtr = *(this->taskQueue.TryPop());
                                taskPtr->Run();
                        }
                        catch(const QueueEmptyException&)
                        {
                                std::this_thread::yield();
                        }
                }
        }

        template<typename TaskReturnType>
        void TaskSystem<TaskReturnType>::Stop(void)
        {
                this->shouldStop.store(true);

                while(not this->taskQueue.IsEmpty() )
                {
                        try
                        {
                                TaskPtr taskPtr = *(this->taskQueue.TryPop());
                                taskPtr->Interrupt();
                        }
                        catch(...)
                        {
                                std::this_thread::yield();
                        }
                }
        }
        template<typename TaskReturnType>
        TaskSystem<TaskReturnType>::~TaskSystem()
        {
                this->shouldStop.store(true);

                for(size_t i=0; i < this->listOfThreads.size(); ++i)
                {
                        if ( this->listOfThreads[i].joinable() )
                                this->listOfThreads[i].join();
                }
        }

};


One can see that when the TaskSystem::Start() is called, TaskSystem will spawn threads with its default value equal to the hardware processors. Each thread runs function TaskSystem::WorkerThread() which will try to pop a task from ThreadSafeQueue (see "C++ Concurrency In Action"), and run that task.


Dividing data

   There're two kinds of situation that the data are divided:

1. Dividing data into chunks and each chunk is processed by one thread. For example, you have a list of data that need to be summed up. Suppose that you have four processors, you can divide the data into four groups. Then, you can define a Task that contained a group of data, and override the TaskInterface::Run() to summed up the data. In this case, dividing data is easy and is the ideal way to process the data in parallel.


2. Dividing data recursively: For the algorithms that use Divide-and-Conquer belong to this category. In this case, the TaskSystem (or, the ThreadPool) often get into trouble because of the absent of the Task-Based mutex and the incapable to perform 'Context Switch' between the tasks. This will be discussed in the next section.


Example: Parallel Merge Sort

The pseudocode of parallel merge sort is shown as follows:
ListType ParallelMergeSort(ListType inputList)
{
    ListType leftHalfList, rightHalfList = inputList.splitInHalf();
    spawn ParallelMergeSort(leftHalfList);
    spawn ParallelMergeSort(rightHalfList);

    sync;
    ListType result = Merge(leftHalfList, rightHalfList);
    return result;
}
    The ParallelMergeSort() first split the input list, then spawn two thread to process each half lists, wait for their result, and finally, Merge() the two half lists.
    However, this may yield dead lock if there is no available thread to run the two spawned tasks: The parent task is waiting for the two spawned tasks. The two child tasks, however, has no available thread to process their data. And since there's no 'Context Switch' for  task, there's no way to take the parent task off and run the two child tasks. Finally, dead lock happen.
    In the following, I try to make an another approach to the parallel merge sort by using following strategies:
1. Use single thread to do the recursive call.
2. Submit the Merge-part into TaskSystem to run in parallel.
The Code is shown below:
        template<typename ListType>
        future<ListType> ParallelMergeSort(ListType&& inputList_, TaskSystem<ListType>& taskSystem)
        {
                size_t numberOfElements = inputList_.size();

                if (numberOfElements > 2)
                {
                        typename ListType::iterator middlePoint = inputList_.begin();
                        std::advance(middlePoint, numberOfElements/2);

                        ListType leftList;
                        leftList.splice(leftList.begin(),
                                        inputList_, inputList_.begin(), middlePoint);
                        ListType rightList;
                        rightList.splice(rightList.begin(),
                                         inputList_, inputList_.begin(), inputList_.end());

                        future<ListType> leftSortedResult = ParallelMergeSort( std::move(leftList), taskSystem);
                        future<ListType>> rightSortedResult = ParallelMergeSort( std::move(rightList), taskSystem);

                        MergeTask<ListType> mergeTask( std::move(leftSortedResult), std::move(rightSortedResult) );
                        future<ListType> finalResult = mergeTask.GetResult();
                        taskSystem.Submit( std::make_shared< MergeTask<ListType> >(std::move(mergeTask)) );

                        return finalResult;
                }
                else if (numberOfElements == 2)
                {
                        ListType result;
                        result.splice( result.begin(), inputList_, inputList_.begin() );
                        if ( (*result.begin()) <= (*inputList_.begin()) )
                        {
                                result.splice(result.end(), inputList_, inputList_.begin());
                        }
                        else
                        {
                                result.splice(result.begin(), inputList_, inputList_.begin());
                        }

                        std::promise<ListType> resultHolder;
                        resultHolder.set_value( std::move(result) );

                        return resultHolder.get_future();
                }
                else
                {
                        std::promise<ListType> resultHolder;
                        resultHolder.set_value( std::move(inputList_) );

                        return resultHolder.get_future();
                }

        }


   This function takes two arguments: the first arguments required the user to move the std::List object to it (the '&&' is the Move semantics which can reserve the time to copy object), and the second is the reference to TaskSystem (where the merge tasks can be submitted to). The return value is std::future, which contained the sorted result.
    Inside the function, if the number of list elements > 2, it split the list by half and call the ParallelMergeSort() recursively till the list elements <=2. When the number of list elements ==2, it sort the element with ease, then return the result that wrapped by std::future. After finish the right and left recursive call, the function create a MergeTask and submit it to TaskSystem. The MergeTask then merge the two lists in parallel. Note that when I declare the ListType::iterator middlePoint, I add "typename" before the declaration since the "iterator" is depend on the template parameter "ListType" (see Effective C++ 3rd, Item 42).

   The MergeTask is implement as follows:
        template<typename ListType>
        class MergeTask : public TaskInterface<ListType>
        {
                public:
                        virtual void Run() override
                        {

                                ListType result;

                                ListType leftList = this->leftListHolder.get();
                                ListType rightList = this->rightListHolder.get();

                                while( (not leftList.empty())||(not rightList.empty() ) )
                                {
                                        if( *(leftList.begin()) <= *(rightList.begin()) )
                                        {
                                                result.splice(result.end(), leftList, leftList.begin() );
                                        }
                                        else
                                        {
                                                result.splice(result.end(), rightList, rightList.begin() );
                                        }

                                        if (leftList.empty())
                                        {
                                                result.splice(result.end(), rightList);
                                                break;
                                        }
                                        else if (rightList.empty())
                                        {
                                                result.splice(result.end(), leftList);
                                                break;
                                        }
                                }
                                this->resultHolder.set_value( std::move(result) );
                        }

                        virtual future<ListType> GetResult() override
                        {
                                return this->resultHolder.get_future();
                        }

                        virtual void Interrupt() override
                        {
                                TaskInterruptException interruptException;
                                this->resultHolder.set_exception( std::make_exception_ptr(interruptException) );
                        }

                        // Constructors
                        MergeTask(future<ListType>&& leftListHolder_, future<ListType>&& rightListHolder_)
                                : resultHolder(),
                                  leftListHolder( std::move(leftListHolder_) ),
                                  rightListHolder( std::move(rightListHolder_) )
                        {
                        }
                        MergeTask(MergeTask&& other_)
                                : resultHolder( std::move(other_.resultHolder) ),
                                  leftListHolder( std::move(other_.leftListHolder) ),
                                  rightListHolder( std::move(other_.rightListHolder) )
                        {
                        }

                        MergeTask(const MergeTask&) = delete;

                        virtual ~MergeTask() {};

                private:
                        std::promise<ListType> resultHolder;
                        future<ListType> leftListHolder;
                        future<ListType> rightListHolder;
        }; // End of class

    In MergeTask, the whole Merge job is define in Run(). This task should be initialized by given two std::future that represent the left-half-list and right-half-list, respectively. When this task is about to merge, it synchronize the two half lists by calling std::future::get(). The calculation result can be obtained from calling MergeTask::GetResult() and the caller will get a std::future which can be synchronized latter.
    This class also use std::promise, which can be image as a summon circle between the caller thread and the calculation thread. Once the calculation thread finish its calculation, it will call std::promise::set_value(), then the result will be transport to the std::future (which can be obtained by calling std::promise::get_future()) in the caller thread.

   The overall procedure to run parallel merge sort is shown as below:
int main()
{
        TaskSystem<ListType> taskSystem;
        taskSystem.Start();

        future<ListType> resultHolder = ParallelMergeSort( std::move(listToBeSorted), taskSystem );
        ListType result = std::move( resultHolder.get() );

        return 0;
}

Result

    Under the above approximation, calling the ParallelMergeSort() will not yield dead lock. However, the efficiency of such ParallelMergeSort() is even slower than the serial one. This may because of nature of the Divide-and-Conquer algorithms that the parent task (the task who spawn other tasks) should wait for its child tasks finished so that it can start to work. Moreover, the result of the child task should also be moved to its parent task which also harshly effect the performance (i.e. the Cache-Ping-Pong).

Conclusion

    This article is temp to implement the TaskSystem with some limitations. In the lower half of this article, I give a example that use TaskSystem to perform parallel merge sort. Although the TaskSystem seems not improve the performance in such approximation, it may get better performance if we don't use the algorithm that divided data recursively.
 
2016.06.01
Joshua P. R. Pan