voidworker_thread() { while(!done) { std::function<void()> task;//users can’t wait for the tasks, and they can’t return any values if(work_queue.try_pop(task)) { task(); } else { std::this_thread::yield(); // If there are no tasks on the queue, the function calls std::this_thread::yield() to take a small break } } } public: thread_pool(): done(false),joiner(threads) { unsignedconst thread_count=std::thread::hardware_concurrency(); try { for(unsigned i=0;i<thread_count;++i) { threads.push_back( std::thread(&thread_pool::worker_thread,this)); } } catch(...) { done=true; throw; } }
~thread_pool() { done=true; // set the done flag, and the join_threads instance will ensure that all the threads have completed before the pool is destroyed. }
注意声明的顺序很重要: Note that the order of declaration of the members is important: both the done flag and the worker_queue must be declared before the threads vector, which must in turn be declared before the joiner. This ensures that the members are destroyed in the right order;
适合的场景: tasks are entirely independent and don’t return any values or perform any blocking operations. 不过有风险 deadlock. 事实上有时候还不如 std::async.
9.1.2 Waiting for tasks submitted to a thread pool
Allow you to wait for tasks to complete and then pass return values from the task to the waiting thread.
还有一点要改进: because std::function<> requires that the stored function objects are copy-constructible. std::packaged_task<> instances are not copyable, just movable, you can no longer use std::function<> for the queue entries. 解决办法: type-erasure class with a function call operator.
classthread_pool { public: std::deque<function_wrapper> work_queue; //The queue now stores function_wrapper objects rather than std::function<void()> objects
template<typename FunctionType> std::future<typename std::result_of<FunctionType()>::type> //return type of the supplied function f ==> std::result_of submit(FunctionType f)// the modified submit() function returns a std::future<> { typedeftypename std::result_of<FunctionType()>::type result_type; std::packaged_task<result_type()> task(std::move(f));//wrap the function f in a std::packaged_task<result_type()> std::future<result_type> res(task.get_future()); work_queue.push_back(std::move(task)); // std::packaged_task<> isn’t copyable return res; } // rest as before };
Listing 9.3 parallel_accumulate using a thread pool with waitable tasks
unsignedlongconst block_size=25; //working in terms of the number of blocks to use (num_blocks) B rather than the number of threads. unsignedlongconst num_blocks=(length+block_size-1)/block_size;
Iterator block_start=first; for(unsignedlong i=0;i<(num_threads-1);++i) { Iterator block_end=block_start; std::advance(block_end,block_size); futures[i]=pool.submit(accumulate_block<Iterator,T>()); block_start=block_end; } T last_result=accumulate_block()(block_start,last); T result=init; for(unsignedlong i=0;i<(num_blocks-1);++i) { result+=futures[i].get(); } result += last_result; return result; }
注意 block_size 的选择: In order to make the most use of the scalability of your thread pool, you need to divide the work into the smallest blocks that it’s worth working with concurrently. 具体地: When there are only a few threads in the pool, each thread will process many blocks. 原因在于: an inherent overhead to submitting a task to a thread pool, having the worker thread run it, and passing the return value through a std::future<>, and for small tasks it’s not worth the payoff.
对于异常: The thread pool takes care of the exception safety too. 具体地: Any exception thrown by the task gets propagated through the future returned from submit(), and if the function exits with an exception, the thread pool destructor abandons any not-yet-completed tasks and waits for the pool threads to finish.
9.1.3 Tasks that wait for other tasks
类似于快排算法中存在等待 data chunk 排好的依赖场景, 由于 threads 数目有限可能导致死锁: 例如 they might end up all waiting for tasks that haven’t been scheduled because there are no free threads.
使用 thread pool 自动管理依赖:
Listing 9.4 An implementation of run_pending_task()
解决思路: to use a separate work queue per thread. Each thread then posts new items to its own queue and takes work from the global work queue only if there’s no work on its own individual queue.
Listing 9.6 A thread pool with thread-local work queues
typedef std::queue<function_wrapper> local_queue_type;//notice that the local queue can be a plain std::queue<> staticthread_local std::unique_ptr<local_queue_type> //don’t want other threads that aren't part of your thread pool to have one; local_work_queue; voidworker_thread() { local_work_queue.reset(new local_queue_type); while(!done) { run_pending_task(); } }
上述设计的不好的点, 只有顶层的 chunk 会使用 thread pool: with the Quicksort example, only the topmost chunk would make it to the pool queue, because the remaining chunks would end up on the local queue of the worker thread that processed that one. This defeats the purpose of using a thread pool.
9.1.5 Work stealing
In order to allow a thread with no work to do to take work from another thread with a full queue, the queue must be accessible to the thread doing the stealing from run_pending_tasks().
复杂一点的做法: It’s possible to write a lock-free queue that allows the owner thread to push and pop at one end while other threads can steal entries from the other.
简单使用 mutex 的做法: We hope work stealing is a rare event, so there should be little contention on the mutex.
This means that this “queue” is a last-in-first-out stack for its own thread; 本线程上后进先出的理由
局部性原理: This can help improve performance from a cache perspective, because the data related to that task is more likely to still be in the cache than the data related to a task pushed on the queue previously.
Also, it maps nicely to algorithms such as Quicksort. 具体地 By processing the most recent item first, you ensure that the chunk needed for the current call to complete is processed before the chunks needed for the other branches, reducing the number of active tasks and the total stack usage.
try_steal() takes items from the opposite end of the queue to try_pop() in order to minimize contention.
staticthread_local work_stealing_queue* local_work_queue; staticthread_localunsigned my_index; voidworker_thread(unsigned my_index_) { my_index=my_index_; local_work_queue=queues[my_index].get(); while(!done) { run_pending_task();//try to take a task from its thread’s own queue } }
boolpop_task_from_other_thread_queue(task_type& task) { for(unsigned i=0;i<queues.size();++i) { //In order to avoid every thread trying to steal from the first thread in the list, //each thread starts at the next thread in the list by offsetting the index of the queue to check by its own index . unsignedconst index=(my_index+i+1)%queues.size(); if(queues[index]->try_steal(task)) { returntrue; } } returnfalse; }
一些可以继续改进的点, 例如 dynamically resizing the thread pool to ensure that there’s optimal CPU usage even when threads are blocked waiting for something such as I/O or a mutex lock.
classinterruptible_thread { std::thread internal_thread; interrupt_flag* flag;//注意此处的资源管理 public: template<typename FunctionType> interruptible_thread(FunctionType f) { std::promise<interrupt_flag*> p; internal_thread=std::thread([f,&p]{ p.set_value(&this_thread_interrupt_flag); f(); }); flag=p.get_future().get(); } voidinterrupt() { if(flag) { flag->set(); // if you have a valid pointer to an interrupt flag, you have a thread to interrupt, so you can set the flag } } };
9.2.2 Detecting that a thread has been interrupted
you can call interruption_point() function at a point where it’s safe to be interrupted, and it throws a thread_interrupted exception if the flag is set:
但最佳的中断点在 wait 的时候: Some of the best places for interrupting a thread are where it’s blocked waiting for something, which means that the thread isn’t running in order to call interruption_point().
9.2.3 Interrupting a condition variable wait
构建一个新的中断处理函数, 专门为了 wait.
a new function—interruptible_wait() —which you can then overload for the various things you might want to wait for, and you can work out how to interrupt the waiting.
The interrupt_flag structure would need to be able to store a pointer to a condition variable so that it can be notified in a call to set().
Listing 9.10 A broken version of interruptible_wait for std::condition_variable
1 2 3 4 5 6 7 8 9
voidinterruptible_wait(std::condition_variable& cv, std::unique_lock<std::mutex>& lk) { interruption_point(); this_thread_interrupt_flag.set_condition_variable(cv); //associates the condition variable with interrupt_flag for the current thread cv.wait(lk); //waits on the condition variable this_thread_interrupt_flag.clear_condition_variable(); //clears the association with the condition variable interruption_point(); }
this code is broken: there are two problems with it.
std::condition_variable::wait() can throw an exception, so you might exit the function without removing the association of the interrupt flag with the condition variable. ===> removes the association in its destructor.
race condition. 万一线程被中断了, 才受到 notify 这导致问题: If the thread is interrupted after the initial call to interruption_point(), but before the call to wait(), then it doesn’t matter whether the condition variable has been associated with the interrupt flag, because the thread isn’t waiting and so can’t be woken by a notify on the condition variable. You need to ensure that the thread can’t be notified between the last check for interruption and the call to wait(). 解决思路:
锁, passing a reference to a mutex whose lifetime you don’t know to another thread (the thread doing the interrupting) for that thread to lock (in the call to interrupt()), without knowing whether that thread has locked the mutex already when it makes the call. This has the potential for deadlock and the potential to access a mutex after it has already been destroyed, so it’s a nonstarter.
wait_for(), have to wait before it sees the interruption (subject to the tick granularity of the clock).
Listing 9.11 Using a timeout in interruptible_wait for std::condition_variable
9.2.4 Interrupting a wait on std::condition_variable_any
Whereas std::condition_variable works only on std::unique_lock<std::mutex>, condition_variable_any can operate on any lock that meets the BasicLockable requirements.
Listing 9.12 interruptible_wait for std::condition_variable_any
public: interrupt_flag() : thread_cond(0), thread_cond_any(0) { } voidset() { flag.store(true, std::memory_order_relaxed); std::lock_guard<std::mutex> lk(set_clear_mutex); if (thread_cond) { thread_cond->notify_all(); } elseif (thread_cond_any) { thread_cond_any->notify_all(); } } template <typename Lockable> voidwait(std::condition_variable_any &cv, Lockable &lk) { structcustom_lock { interrupt_flag *self; Lockable &lk; custom_lock(interrupt_flag *self_, std::condition_variable_any &cond, Lockable &lk_) : self(self_), lk(lk_) { self->set_clear_mutex.lock(); self->thread_cond_any = &cond; } voidunlock() { lk.unlock(); self->set_clear_mutex.unlock(); //This allows threads that are trying to interrupt you to acquire the lock on set_clear_mutex and //check the thread_cond_any pointer once you’re inside the wait() call but not before. //This is exactly what you were after (but couldn’t manage) with std::condition_variable. } voidlock() { std::lock(self->set_clear_mutex, lk); } ~custom_lock() { self->thread_cond_any = 0; self->set_clear_mutex.unlock(); } }; custom_lock cl(this, cv, lk); interruption_point(); cv.wait(cl); interruption_point(); } // rest as before }; template <typename Lockable> voidinterruptible_wait(std::condition_variable_any &cv, Lockable &lk) { this_thread_interrupt_flag.wait(cv, lk); }
9.2.5 Interrupting other blocking calls
what about other blocking waits: mutex locks, waiting for futures, and the like? there’s no way to interrupt the wait short of fulfilling the condition being waited for, without access to the internals of the mutex or future.
1 2 3 4 5 6 7 8 9 10 11
template <typename T> voidinterruptible_wait(std::future<T> &uf) { while (!this_thread_interrupt_flag.is_set()) //loop within the interruptible_wait() function { if (uf.wait_for(lk, std::chrono::milliseconds(1)) == std::future_status::ready) break; } interruption_point(); }
9.2.6 Handling interruptions
Because thread_interrupted is an exception, all the usual exception-safety precautions must also be taken when calling code that can be interrupted, in order to ensure that resources aren’t leaked, and your data structures are left in a coherent state.
不能让异常传递到主线程上, But if you let exceptions propagate out of the thread function passed to the std::thread constructor, std::terminate() will be called, and the whole program will be terminated. 解决方法: put that catch block inside the wrapper you use for initializing the interrupt_flag.
std::mutex config_mutex; std::vector<interruptible_thread> background_threads; voidbackground_thread(int disk_id)//描述后台线程的工作 { while (true) { interruption_point(); fs_change fsc = get_fs_changes(disk_id); if (fsc.has_changes()) { update_index(fsc); } } } voidstart_background_processing() { background_threads.push_back( interruptible_thread(background_thread, disk_1)); background_threads.push_back( interruptible_thread(background_thread, disk_2)); } intmain() { start_background_processing(); process_gui_until_exit();//程序整体退出, 依次退出所有线程 std::unique_lock<std::mutex> lk(config_mutex); for (unsigned i = 0; i < background_threads.size(); ++i) { background_threads[i].interrupt(); } for (unsigned i = 0; i < background_threads.size(); ++i) { background_threads[i].join(); } }
Why do you interrupt all the threads before waiting for any? Why not interrupt each and then wait for it before moving on to the next?
The answer is concurrency. Threads will likely not finish immediately when they’re interrupted, because they have to proceed to the next interruption point and then run any destructor calls and exception-handling code necessary before they exit. 怎么做呢? By joining with each thread immediately, you therefore cause the interrupting thread to wait, even though it still has useful work it could do—interrupt the other threads. Only when you have no more work to do (all the threads have been interrupted) do you wait.