Overview
This article is trying to implement a Task System by using C++11. A TaskSystem will create threads (at its
initialization) and process tasks in parallel (see following figure). Each task
can be defined and customized by the user.
Motivation
After reading the
"C++ Concurrency In Action", I want to do some exercise. Therefore, that's what this article about: Design and implement a Task System. Furthermore, I have seen an
article that discussed about the difficulty to implement
Task-Based Parallelism in C++. Therefore, this is what I temp to do here: to give an alternative way to approach the Task System in C++11.
Introduction
As mentioned by
Anthony Williams in his "
C++ Concurrency In Action", there're several circumstances that you might want to apply multi-threads:
1. Each thread perform different work: For example, you might want to use one thread for GUI, another for controlling the inner data so that the GUI can respond to user quickly. This might be the best situation to use threads.
2. Each thread do the same job but process different data: You might divide data into several chunks and submit each chunk to different threads. In this case, how do you split the data and how many threads is created (usually the same number as your processors) harshly effect the performance.
Task System
The second case is what this article aim for. In that case, you might want to create threads as many as your processors at the beginning (since the creation of thread takes time). And then you can submit jobs (tasks) to those threads. The thread then processes your task and return its result in some way. This is what I call the
Task System here (also called thread pool in "
C++ Concurrency In Action").
However,
Bartosz Milewski have mentioned in this article that since the
std::mutex as well as the
thread_local keyword is
Thread-Based instead of
Task-Based, it has a potential difficulty to implement such
Task System.
In this article, I'd try to implement a simple
Task System under two limitations:
1. The task will not migrate from one thread to another.
2. Once a task
have been taken by one thread and is being processed, it will not be interrupted or be '
Context Switched' till the end of the task.
And its usage will look like follows (all of the code in this article should be compiled with C++11):
TaskSystem taskSystem;
taskSystem.Start();
SomeTask task;
std::future<ResultType> resultHolder = task.GetResult();
taskSystem.Submit( std::make_shared(std::move(task)) );
ResultType result = std::move(resultHolder.get());
Here, the
TaskSystem is created at the first line, then calling the
TaskSystem::Start() will create threads (which will run the user defined task when you submit one). Then I create a
Task (which should be inherited from
TaskInterface), and get
std::future from
TaskInterface::GetResult(). Note that
std::future is like a bridge between current thread and the work thread. You can hold it which will return the result when you call
std::future::get(). At that time
, the current thread will wait
until the work thread finishes its task. Also
note that the task is wrapped by the
std::shared_ptr before it has been submitted so that the
Task can be deleted properly.
The
TaskInterface that every
Task should inherit is shown as follows:
template<typename Returntype>
class TaskInterface
{
public:
virtual void Run(void) = 0;
virtual std::future<Returntype> GetResult(void) = 0;
virtual void Interrupt(void) = 0;
virtual ~TaskInterface() {};
};
The user should define what his
Task should do by overriding the function
TaskInterface::Run(). The return value is wrapped by
std::future which can be obtained by calling
TaskInterface::GetResult(). If you want to stop all tasks that have been submited to
TaskSystem, you can call
TaskSystem::Stop(). In this case, the
TaskInterface::Interrupt() will be called if the task is still in the waiting queue of the
TaskSystem. Finally, the virtual destructor should be overridden so that the derived class can be deleted properly in
polymorphism.
Here is the implementation of TaskSystem:
template<typename TaskReturnType>
class TaskSystem final
{
typedef std::shared_ptr< TaskInterface<TaskReturnType> > TaskPtr;
public:
void Submit(const TaskPtr& taskPtr_);
void Start(size_t numberForProcessors = std::thread::hardware_concurrency() );
void Stop(void);
TaskSystem() : taskQueue(), listOfThreads(), shouldStop(false) {}
~TaskSystem();
private:
ThreadSafeQueue< TaskPtr > taskQueue;
std::vector<std::thread> listOfThreads;
std::atomic<bool> shouldStop;
// private functions
void WorkerThread(void);
};
// Implementation of TaskSystem
template<typename TaskReturnType>
void TaskSystem<TaskReturnType>::Submit(const TaskPtr& task_)
{
this->taskQueue.Push( task_ );
}
template<typename TaskReturnType>
void TaskSystem<TaskReturnType>::Start(size_t numberForProcessors_)
{
try
{
for(size_t i=0; i < numberForProcessors_; ++i)
{
this->listOfThreads.push_back(
std::thread(&TaskSystem::WorkerThread,
this)
);
}
}
catch(...)
{
this->shouldStop.store(true);
throw;
}
}
template<typename TaskReturnType>
void TaskSystem<TaskReturnType>::WorkerThread(void)
{
while(not this->shouldStop.load())
{
try
{
TaskPtr taskPtr = *(this->taskQueue.TryPop());
taskPtr->Run();
}
catch(const QueueEmptyException&)
{
std::this_thread::yield();
}
}
}
template<typename TaskReturnType>
void TaskSystem<TaskReturnType>::Stop(void)
{
this->shouldStop.store(true);
while(not this->taskQueue.IsEmpty() )
{
try
{
TaskPtr taskPtr = *(this->taskQueue.TryPop());
taskPtr->Interrupt();
}
catch(...)
{
std::this_thread::yield();
}
}
}
template<typename TaskReturnType>
TaskSystem<TaskReturnType>::~TaskSystem()
{
this->shouldStop.store(true);
for(size_t i=0; i < this->listOfThreads.size(); ++i)
{
if ( this->listOfThreads[i].joinable() )
this->listOfThreads[i].join();
}
}
};
One can see that when the
TaskSystem::Start() is called,
TaskSystem will spawn threads with its default value equal to the hardware processors. Each thread runs function
TaskSystem::WorkerThread() which will try to pop a task from
ThreadSafeQueue (see "
C++ Concurrency In Action"), and run that task.
Dividing data
There're two kinds of situation that the data are divided:
1. Dividing data into chunks and each chunk is processed by one thread. For example, you have a list of data that need to be summed up. Suppose that you have four processors, you can divide the data into four groups. Then, you can define a Task that contained a group of data, and override the
TaskInterface::Run() to summed up the data. In this case, dividing data is easy and is the ideal way to process the data in parallel.
2. Dividing data recursively: For the algorithms that use Divide-and-Conquer belong to this category. In this case, the
TaskSystem (or, the
ThreadPool) often get into trouble because of the absent of the
Task-Based mutex and the incapable to perform '
Context Switch' between the tasks. This will be discussed in the next section.
Example: Parallel Merge Sort
The pseudocode of parallel merge sort is shown as follows:
ListType ParallelMergeSort(ListType inputList)
{
ListType leftHalfList, rightHalfList = inputList.splitInHalf();
spawn ParallelMergeSort(leftHalfList);
spawn ParallelMergeSort(rightHalfList);
sync;
ListType result = Merge(leftHalfList, rightHalfList);
return result;
}
The ParallelMergeSort() first split the input list, then spawn two thread to process each half lists, wait for their result, and finally, Merge() the two half lists.
However, this may yield dead lock if there is no available thread to run the two spawned tasks: The parent task is waiting for the two spawned tasks. The two child tasks, however, has no available thread to process their data. And since there's no '
Context Switch' for task, there's no way to take the parent task off and run the two child tasks. Finally, dead lock happen.
In the following, I try to make an another approach to the parallel merge sort by using following strategies:
1. Use single thread to do the recursive call.
2. Submit the Merge-part into TaskSystem to run in parallel.
The Code is shown below:
template<typename ListType>
future<ListType> ParallelMergeSort(ListType&& inputList_, TaskSystem<ListType>& taskSystem)
{
size_t numberOfElements = inputList_.size();
if (numberOfElements > 2)
{
typename ListType::iterator middlePoint = inputList_.begin();
std::advance(middlePoint, numberOfElements/2);
ListType leftList;
leftList.splice(leftList.begin(),
inputList_, inputList_.begin(), middlePoint);
ListType rightList;
rightList.splice(rightList.begin(),
inputList_, inputList_.begin(), inputList_.end());
future<ListType> leftSortedResult = ParallelMergeSort( std::move(leftList), taskSystem);
future<ListType>> rightSortedResult = ParallelMergeSort( std::move(rightList), taskSystem);
MergeTask<ListType> mergeTask( std::move(leftSortedResult), std::move(rightSortedResult) );
future<ListType> finalResult = mergeTask.GetResult();
taskSystem.Submit( std::make_shared< MergeTask<ListType> >(std::move(mergeTask)) );
return finalResult;
}
else if (numberOfElements == 2)
{
ListType result;
result.splice( result.begin(), inputList_, inputList_.begin() );
if ( (*result.begin()) <= (*inputList_.begin()) )
{
result.splice(result.end(), inputList_, inputList_.begin());
}
else
{
result.splice(result.begin(), inputList_, inputList_.begin());
}
std::promise<ListType> resultHolder;
resultHolder.set_value( std::move(result) );
return resultHolder.get_future();
}
else
{
std::promise<ListType> resultHolder;
resultHolder.set_value( std::move(inputList_) );
return resultHolder.get_future();
}
}
This function takes two arguments: the first arguments required the user to move the std::List object to it (the '&&' is the
Move semantics which can reserve the time to copy object), and the second is the reference to
TaskSystem (where the merge tasks can be submitted to). The return value is
std::future, which contained the sorted result.
Inside the function, if the number of list elements > 2, it split the list by half and call the ParallelMergeSort() recursively till the list elements <=2. When the number of list elements ==2, it sort the element with ease, then return the result that wrapped by
std::future. After finish the right and left recursive call, the function create a
MergeTask and submit it to
TaskSystem. The
MergeTask then merge the two lists in parallel. Note that when I declare the
ListType::iterator middlePoint, I add "
typename" before the declaration since the "iterator" is depend on the template parameter "
ListType" (see
Effective C++ 3rd, Item 42).
The MergeTask is implement as follows:
template<typename ListType>
class MergeTask : public TaskInterface<ListType>
{
public:
virtual void Run() override
{
ListType result;
ListType leftList = this->leftListHolder.get();
ListType rightList = this->rightListHolder.get();
while( (not leftList.empty())||(not rightList.empty() ) )
{
if( *(leftList.begin()) <= *(rightList.begin()) )
{
result.splice(result.end(), leftList, leftList.begin() );
}
else
{
result.splice(result.end(), rightList, rightList.begin() );
}
if (leftList.empty())
{
result.splice(result.end(), rightList);
break;
}
else if (rightList.empty())
{
result.splice(result.end(), leftList);
break;
}
}
this->resultHolder.set_value( std::move(result) );
}
virtual future<ListType> GetResult() override
{
return this->resultHolder.get_future();
}
virtual void Interrupt() override
{
TaskInterruptException interruptException;
this->resultHolder.set_exception( std::make_exception_ptr(interruptException) );
}
// Constructors
MergeTask(future<ListType>&& leftListHolder_, future<ListType>&& rightListHolder_)
: resultHolder(),
leftListHolder( std::move(leftListHolder_) ),
rightListHolder( std::move(rightListHolder_) )
{
}
MergeTask(MergeTask&& other_)
: resultHolder( std::move(other_.resultHolder) ),
leftListHolder( std::move(other_.leftListHolder) ),
rightListHolder( std::move(other_.rightListHolder) )
{
}
MergeTask(const MergeTask&) = delete;
virtual ~MergeTask() {};
private:
std::promise<ListType> resultHolder;
future<ListType> leftListHolder;
future<ListType> rightListHolder;
}; // End of class
In MergeTask, the whole Merge job is define in Run(). This task should be initialized by given two std::future that represent the left-half-list and right-half-list, respectively. When this task is about to merge, it synchronize the two half lists by calling std::future::get(). The calculation result can be obtained from calling
MergeTask::GetResult() and the caller will get a
std::future which can be synchronized latter.
This class also use
std::promise, which can be image as a summon circle between the caller thread and the calculation thread. Once the calculation thread finish its calculation, it will call
std::promise::set_value(), then the result will be transport to the
std::future (which can be obtained by calling
std::promise::get_future()) in the caller thread.
The overall procedure to run parallel merge sort is shown as below:
int main()
{
TaskSystem<ListType> taskSystem;
taskSystem.Start();
future<ListType> resultHolder = ParallelMergeSort( std::move(listToBeSorted), taskSystem );
ListType result = std::move( resultHolder.get() );
return 0;
}
Result
Under the above approximation, calling the ParallelMergeSort() will not yield dead lock. However, the efficiency of such ParallelMergeSort() is even slower than the serial one. This may because of nature of the Divide-and-Conquer algorithms that the parent task (the task who spawn other tasks) should wait for its child tasks finished so that it can start to work. Moreover, the result of the child task should also be moved to its parent task which also harshly effect the performance (i.e. the
Cache-Ping-Pong).
Conclusion
This article is temp to implement the
TaskSystem with some limitations. In the lower half of this article, I give a example that use
TaskSystem to perform parallel merge sort. Although the
TaskSystem seems not improve the performance in such approximation, it may get better performance if we don't use the algorithm that divided data recursively.
2016.06.01
Joshua P. R. Pan