C++并发实战-第九章-总学习笔记第7

C++并发实战-第九章-总学习笔记第7

[TOC]
C++ 并发实战《C++ Concurrency in Action》的学习笔记 7, 记录第九章的部分 dvanced thread management.
内容是:

  • Thread pools
  • Handling dependencies between pool tasks
  • Work stealing for pool threads
  • Interrupting threads

Chapter 9 Advanced thread management

9.1 Thread pools

it’s impractical to have a separate thread for every task that can potentially be done in parallel with other tasks.

9.1.1 The simplest possible thread pool

simplest, a thread pool is a fixed number of worker threads.

Listing 9.1 Simple thread pool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <thread>
#include <vector>
#include <atomic>
struct join_threads
{
join_threads(std::vector<std::thread>&)
{}
};

class thread_pool
{
std::atomic_bool done;
thread_safe_queue<std::function<void()> > work_queue;
std::vector<std::thread> threads;
join_threads joiner;

void worker_thread()
{
while(!done)
{
std::function<void()> task;//users can’t wait for the tasks, and they can’t return any values
if(work_queue.try_pop(task))
{
task();
}
else
{
std::this_thread::yield(); // If there are no tasks on the queue, the function calls std::this_thread::yield() to take a small break
}
}
}
public:
thread_pool():
done(false),joiner(threads)
{
unsigned const thread_count=std::thread::hardware_concurrency();
try
{
for(unsigned i=0;i<thread_count;++i)
{
threads.push_back(
std::thread(&thread_pool::worker_thread,this));
}
}
catch(...)
{
done=true;
throw;
}
}

~thread_pool()
{
done=true; // set the done flag, and the join_threads instance will ensure that all the threads have completed before the pool is destroyed.
}

template<typename FunctionType>
void submit(FunctionType f)
{
work_queue.push(std::function<void()>(f));
}
};

注意声明的顺序很重要: Note that the order of declaration of the members is important: both the done flag and the worker_queue must be declared before the threads vector, which must in turn be declared before the joiner. This ensures that the members are destroyed in the right order;

适合的场景: tasks are entirely independent and don’t return any values or perform any blocking operations. 不过有风险 deadlock. 事实上有时候还不如 std::async.

9.1.2 Waiting for tasks submitted to a thread pool

Allow you to wait for tasks to complete and then pass return values from the task to the waiting thread.

还有一点要改进: because std::function<> requires that the stored function objects are copy-constructible. std::packaged_task<> instances are not copyable, just movable, you can no longer use std::function<> for the queue entries. 解决办法: type-erasure class with a function call operator.

Listing 9.2 A thread pool with waitable tasks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <deque>
#include <future>
#include <memory>
#include <functional>
#include <iostream>
#include <iostream>

class function_wrapper
{
struct impl_base {
virtual void call()=0;
virtual ~impl_base() {}
};
std::unique_ptr<impl_base> impl;
template<typename F>
struct impl_type: impl_base
{
F f;
impl_type(F&& f_): f(std::move(f_)) {}
void call() { f(); }
};
public:
template<typename F>
function_wrapper(F&& f):
impl(new impl_type<F>(std::move(f)))
{}

void call() { impl->call(); }

function_wrapper(function_wrapper&& other):
impl(std::move(other.impl))
{}

function_wrapper& operator=(function_wrapper&& other)
{
impl=std::move(other.impl);
return *this;
}

function_wrapper(const function_wrapper&)=delete;
function_wrapper(function_wrapper&)=delete;
function_wrapper& operator=(const function_wrapper&)=delete;
};

class thread_pool
{
public:
std::deque<function_wrapper> work_queue; //The queue now stores function_wrapper objects rather than std::function<void()> objects

template<typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type> //return type of the supplied function f ==> std::result_of
submit(FunctionType f)// the modified submit() function returns a std::future<>
{
typedef typename std::result_of<FunctionType()>::type result_type;

std::packaged_task<result_type()> task(std::move(f));//wrap the function f in a std::packaged_task<result_type()>
std::future<result_type> res(task.get_future());
work_queue.push_back(std::move(task)); // std::packaged_task<> isn’t copyable
return res;
}
// rest as before
};

Listing 9.3 parallel_accumulate using a thread pool with waitable tasks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <vector>
#include <future>
#include <thread>

template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);

if(!length)
return init;

unsigned long const block_size=25;
//working in terms of the number of blocks to use (num_blocks) B rather than the number of threads.
unsigned long const num_blocks=(length+block_size-1)/block_size;

std::vector<std::future<T> > futures(num_blocks-1);
thread_pool pool;

Iterator block_start=first;
for(unsigned long i=0;i<(num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
futures[i]=pool.submit(accumulate_block<Iterator,T>());
block_start=block_end;
}
T last_result=accumulate_block()(block_start,last);
T result=init;
for(unsigned long i=0;i<(num_blocks-1);++i)
{
result+=futures[i].get();
}
result += last_result;
return result;
}

注意 block_size 的选择: In order to make the most use of the scalability of your thread pool, you need to divide the work into the smallest blocks that it’s worth working with concurrently. 具体地: When there are only a few threads in the pool, each thread will process many blocks.
原因在于: an inherent overhead to submitting a task to a thread pool, having the worker thread run it, and passing the return value through a std::future<>, and for small tasks it’s not worth the payoff.

对于异常: The thread pool takes care of the exception safety too. 具体地: Any exception thrown by the task gets propagated through the future returned from submit(), and if the function exits with an exception, the thread pool destructor abandons any not-yet-completed tasks and waits for the pool threads to finish.

9.1.3 Tasks that wait for other tasks

类似于快排算法中存在等待 data chunk 排好的依赖场景, 由于 threads 数目有限可能导致死锁: 例如 they might end up all waiting for tasks that haven’t been scheduled because there are no free threads.

使用 thread pool 自动管理依赖:

Listing 9.4 An implementation of run_pending_task()

1
2
3
4
5
6
7
8
9
10
11
12
void thread_pool::run_pending_task()
{
function_wrapper task;
if(work_queue.try_pop(task))
{
task();
}
else
{
std::this_thread::yield();
}
}

Listing 9.5 A thread-pool–based implementation of Quicksort

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <list>
#include <algorithm>
#include <vector>

template<typename T>
struct sorter
{
thread_pool pool;

std::list<T> do_sort(std::list<T>& chunk_data)
{
if(chunk_data.empty())
{
return chunk_data;
}

std::list<T> result;
result.splice(result.begin(),chunk_data,chunk_data.begin());
T const& partition_val=*result.begin();

typename std::list<T>::iterator divide_point=
std::partition(
chunk_data.begin(),chunk_data.end(),
[&](T const& val){return val<partition_val;});

std::list<T> new_lower_chunk;
new_lower_chunk.splice(
new_lower_chunk.end(),
chunk_data,chunk_data.begin(),
divide_point);

thread_pool::task_handle<std::list<T> > new_lower=
pool.submit(
std::bind(
&sorter::do_sort,this,
std::move(new_lower_chunk)));

std::list<T> new_higher(do_sort(chunk_data));

result.splice(result.end(),new_higher);
while(!new_lower.is_ready())
{
pool.run_pending_task();
}

result.splice(result.begin(),new_lower.get());
return result;
}
};


template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
if(input.empty())
{
return input;
}
sorter<T> s;

return s.do_sort(input);
}

9.1.4 Avoiding contention on the work queue

对同一个 queque 的同步修改导致 contention, 即便是 lock-free 也有可能导致 cache ping-pong.

解决思路: to use a separate work queue per thread. Each thread then posts new items to its own queue and takes work from the global work queue only if there’s no work on its own individual queue.

Listing 9.6 A thread pool with thread-local work queues

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class thread_pool
{
thread_safe_queue<function_wrapper> pool_work_queue;

typedef std::queue<function_wrapper> local_queue_type;//notice that the local queue can be a plain std::queue<>
static thread_local std::unique_ptr<local_queue_type> //don’t want other threads that aren't part of your thread pool to have one;
local_work_queue;

void worker_thread()
{
local_work_queue.reset(new local_queue_type);

while(!done)
{
run_pending_task();
}
}

public:
template<typename FunctionType>
std::future<std::result_of<FunctionType()>::type>
submit(FunctionType f)
{
typedef std::result_of<FunctionType()>::type result_type;

std::packaged_task<result_type()> task(f);
std::future<result_type> res(task.get_future());
if(local_work_queue)
{
local_work_queue->push(std::move(task));
}
else
{
pool_work_queue.push(std::move(task));
}
return res;
}

void run_pending_task()
{
function_wrapper task;
if(local_work_queue && !local_work_queue->empty())
{
task=std::move(local_work_queue->front());
local_work_queue->pop();
task();
}
else if(pool_work_queue.try_pop(task))
{
task();
}
else
{
std::this_thread::yield();
}
}
// rest as before
};

上述设计的不好的点, 只有顶层的 chunk 会使用 thread pool: with the Quicksort example, only the topmost chunk would make it to the pool queue, because the remaining chunks would end up on the local queue of the worker thread that processed that one. This defeats the purpose of using a thread pool.

9.1.5 Work stealing

In order to allow a thread with no work to do to take work from another thread with a full queue, the queue must be accessible to the thread doing the stealing from run_pending_tasks().

复杂一点的做法: It’s possible to write a lock-free queue that allows the owner thread to push and pop at one end while other threads can steal entries from the other.

简单使用 mutex 的做法: We hope work stealing is a rare event, so there should be little contention on the mutex.

Listing 9.7 Lock-based queue for work stealing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#include <deque>
#include <mutex>
#include <memory>
class function_wrapper
{
struct impl_base {
virtual void call()=0;
virtual ~impl_base() {}
};
std::unique_ptr<impl_base> impl;
template<typename F>
struct impl_type: impl_base
{
F f;
impl_type(F&& f_): f(std::move(f_)) {}
void call() { f(); }
};
public:
template<typename F>
function_wrapper(F&& f):
impl(new impl_type<F>(std::move(f)))
{}

void call() { impl->call(); }

function_wrapper(function_wrapper&& other):
impl(std::move(other.impl))
{}

function_wrapper& operator=(function_wrapper&& other)
{
impl=std::move(other.impl);
return *this;
}

function_wrapper(const function_wrapper&)=delete;
function_wrapper(function_wrapper&)=delete;
function_wrapper& operator=(const function_wrapper&)=delete;
};

class work_stealing_queue
{
private:
typedef function_wrapper data_type;
std::deque<data_type> the_queue;
mutable std::mutex the_mutex;

public:
work_stealing_queue()
{}

work_stealing_queue(const work_stealing_queue& other)=delete;
work_stealing_queue& operator=(
const work_stealing_queue& other)=delete;

void push(data_type data)
{
std::lock_guard<std::mutex> lock(the_mutex);
the_queue.push_front(std::move(data));
}

bool empty() const
{
std::lock_guard<std::mutex> lock(the_mutex);
return the_queue.empty();
}

bool try_pop(data_type& res)
{
std::lock_guard<std::mutex> lock(the_mutex);
if(the_queue.empty())
{
return false;
}

res=std::move(the_queue.front());
the_queue.pop_front();
return true;
}

bool try_steal(data_type& res)
{
std::lock_guard<std::mutex> lock(the_mutex);
if(the_queue.empty())
{
return false;
}

res=std::move(the_queue.back());
the_queue.pop_back();
return true;
}
};

This means that this “queue” is a last-in-first-out stack for its own thread; 本线程上后进先出的理由

  • 局部性原理: This can help improve performance from a cache perspective, because the data related to that task is more likely to still be in the cache than the data related to a task pushed on the queue previously.
  • Also, it maps nicely to algorithms such as Quicksort. 具体地 By processing the most recent item first, you ensure that the chunk needed for the current call to complete is processed before the chunks needed for the other branches, reducing the number of active tasks and the total stack usage.

try_steal() takes items from the opposite end of the queue to try_pop() in order to minimize contention.

Listing 9.8 A thread pool that uses work stealing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#include <vector>
#include <atomic>
#include <thread>

class thread_pool
{
typedef function_wrapper task_type;

std::atomic_bool done;
thread_safe_queue<task_type> pool_work_queue;
std::vector<std::unique_ptr<work_stealing_queue> > queues;
std::vector<std::thread> threads;
join_threads joiner;

static thread_local work_stealing_queue* local_work_queue;
static thread_local unsigned my_index;

void worker_thread(unsigned my_index_)
{
my_index=my_index_;
local_work_queue=queues[my_index].get();
while(!done)
{
run_pending_task();//try to take a task from its thread’s own queue
}
}

bool pop_task_from_local_queue(task_type& task)
{
return local_work_queue && local_work_queue->try_pop(task);
}

bool pop_task_from_pool_queue(task_type& task)
{
return pool_work_queue.try_pop(task);
}

bool pop_task_from_other_thread_queue(task_type& task)
{
for(unsigned i=0;i<queues.size();++i)
{
//In order to avoid every thread trying to steal from the first thread in the list,
//each thread starts at the next thread in the list by offsetting the index of the queue to check by its own index .
unsigned const index=(my_index+i+1)%queues.size();
if(queues[index]->try_steal(task))
{
return true;
}
}

return false;
}

public:
thread_pool():
joiner(threads),done(false)
{
unsigned const thread_count=std::thread::hardware_concurrency();

try
{
for(unsigned i=0;i<thread_count;++i)
{
queues.push_back(std::unique_ptr<work_stealing_queue>(
new work_stealing_queue));
threads.push_back(
std::thread(&thread_pool::worker_thread,this,i));
}
}
catch(...)
{
done=true;
throw;
}
}

~thread_pool()
{
done=true;
}

template<typename ResultType>
using task_handle=std::unique_future<ResultType>;

template<typename FunctionType>
task_handle<std::result_of<FunctionType()>::type> submit(
FunctionType f)
{
typedef std::result_of<FunctionType()>::type result_type;

std::packaged_task<result_type()> task(f);
task_handle<result_type> res(task.get_future());
if(local_work_queue)
{
local_work_queue->push(std::move(task));
}
else
{
pool_work_queue.push(std::move(task));
}
return res;
}

void run_pending_task()
{
task_type task;
if(pop_task_from_local_queue(task) ||
pop_task_from_pool_queue(task) ||
pop_task_from_other_thread_queue(task))
{
task();
}
else
{
std::this_thread::yield();
}
}
};

一些可以继续改进的点, 例如 dynamically resizing the thread pool to ensure that there’s optimal CPU usage even when threads are blocked waiting for something such as I/O or a mutex lock.

9.2 Interrupting threads

有人提案把可中断的机制放入到标准中: A Cooperatively Interruptible Joining Thread, Rev 3, Nicolai Josuttis, Herb Sutter, Anthony Williams http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0660r3.pdf.

9.2.1 Launching and interrupting another thread

Listing 9.9 Basic implementation of interruptible_thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include <thread>
#include <future>
#include <mutex>
#include <condition_variable>
#include <queue>

template<typename T>
class threadsafe_queue
{
private:
std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}

void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}
};

class interrupt_flag
{
public:
void set();
bool is_set() const;
};
thread_local interrupt_flag this_thread_interrupt_flag;

class interruptible_thread
{
std::thread internal_thread;
interrupt_flag* flag;//注意此处的资源管理
public:
template<typename FunctionType>
interruptible_thread(FunctionType f)
{
std::promise<interrupt_flag*> p;
internal_thread=std::thread([f,&p]{
p.set_value(&this_thread_interrupt_flag);
f();
});
flag=p.get_future().get();
}
void interrupt()
{
if(flag)
{
flag->set(); // if you have a valid pointer to an interrupt flag, you have a thread to interrupt, so you can set the flag
}
}
};

9.2.2 Detecting that a thread has been interrupted

you can call interruption_point() function at a point where it’s safe to be interrupted, and it throws a thread_interrupted exception if the flag is set:

1
2
3
4
5
6
7
void interruption_point()
{
if(this_thread_interrupt_flag.is_set())
{
throw thread_interrupted();
}
}

然后在觉得可以被中断的地方放置这个函数, 判断需不需要中断.

1
2
3
4
5
6
7
8
void foo()
{
while(!done)
{
interruption_point();
process_next_item();
}
}

但最佳的中断点在 wait 的时候: Some of the best places for interrupting a thread are where it’s blocked waiting for something, which means that the thread isn’t running in order to call interruption_point().

9.2.3 Interrupting a condition variable wait

构建一个新的中断处理函数, 专门为了 wait.

a new function—interruptible_wait() —which you can then overload for the various things you might want to wait for, and you can work out how to interrupt the waiting.

The interrupt_flag structure would need to be able to store a pointer to a condition variable so that it can be notified in a call to set().

Listing 9.10 A broken version of interruptible_wait for std::condition_variable

1
2
3
4
5
6
7
8
9
void interruptible_wait(std::condition_variable& cv,
std::unique_lock<std::mutex>& lk)
{
interruption_point();
this_thread_interrupt_flag.set_condition_variable(cv); //associates the condition variable with interrupt_flag for the current thread
cv.wait(lk); //waits on the condition variable
this_thread_interrupt_flag.clear_condition_variable(); //clears the association with the condition variable
interruption_point();
}

this code is broken: there are two problems with it.

  1. std::condition_variable::wait() can throw an exception, so you might exit the function without removing the association of the interrupt flag with the condition variable. ===> removes the association in its destructor.
  2. race condition. 万一线程被中断了, 才受到 notify 这导致问题: If the thread is interrupted after the initial call to interruption_point(), but before the call to wait(), then it doesn’t matter whether the condition variable has been associated with the interrupt flag, because the thread isn’t waiting and so can’t be woken by a notify on the condition variable. You need to ensure that the thread can’t be notified between the last check for interruption and the call to wait().
    解决思路:
    • 锁, passing a reference to a mutex whose lifetime you don’t know to another thread (the thread doing the interrupting) for that thread to lock (in the call to interrupt()), without knowing whether that thread has locked the mutex already when it makes the call. This has the potential for deadlock and the potential to access a mutex after it has already been destroyed, so it’s a nonstarter.
    • wait_for(), have to wait before it sees the interruption (subject to the tick granularity of the clock).

Listing 9.11 Using a timeout in interruptible_wait for std::condition_variable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <future>
#include <algorithm>

class interrupt_flag
{
std::atomic<bool> flag;
std::condition_variable *thread_cond;
std::mutex set_clear_mutex;

public:
interrupt_flag() : thread_cond(0)
{
}
void set()
{
flag.store(true, std::memory_order_relaxed);
std::lock_guard<std::mutex> lk(set_clear_mutex);
if (thread_cond)
{
thread_cond->notify_all();
}
}
bool is_set() const
{
return flag.load(std::memory_order_relaxed);
}
void set_condition_variable(std::condition_variable &cv)
{
std::lock_guard<std::mutex> lk(set_clear_mutex);
thread_cond = &cv;
}
void clear_condition_variable()
{
std::lock_guard<std::mutex> lk(set_clear_mutex);
thread_cond = 0;
}
struct clear_cv_on_destruct
{
~clear_cv_on_destruct()
{
this_thread_interrupt_flag.clear_condition_variable();
}
};
};

void interruptible_wait(std::condition_variable& cv,
std::unique_lock<std::mutex>& lk)
{
interruption_point();
this_thread_interrupt_flag.set_condition_variable(cv);
interrupt_flag::clear_cv_on_destruct guard;
interruption_point();
cv.wait_for(lk,std::chrono::milliseconds(1));
interruption_point();
}

设置判断 predicate that’s being waited for, 也许可以避免等待多余的 1 ms.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
template<typename Predicate>
void interruptible_wait(std::condition_variable& cv,
std::unique_lock<std::mutex>& lk,
Predicate pred)
{
interruption_point();
this_thread_interrupt_flag.set_condition_variable(cv);
interrupt_flag::clear_cv_on_destruct guard;
while(!this_thread_interrupt_flag.is_set() && !pred())
{
cv.wait_for(lk,std::chrono::milliseconds(1));
}
interruption_point();
}

9.2.4 Interrupting a wait on std::condition_variable_any

Whereas std::condition_variable works only on std::unique_lock<std::mutex>, condition_variable_any can operate on any lock that meets the BasicLockable requirements.

Listing 9.12 interruptible_wait for std::condition_variable_any

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class interrupt_flag
{
std::atomic<bool> flag;
std::condition_variable *thread_cond;
std::condition_variable_any *thread_cond_any;
std::mutex set_clear_mutex;

public:
interrupt_flag() : thread_cond(0), thread_cond_any(0)
{
}
void set()
{
flag.store(true, std::memory_order_relaxed);
std::lock_guard<std::mutex> lk(set_clear_mutex);
if (thread_cond)
{
thread_cond->notify_all();
}
else if (thread_cond_any)
{
thread_cond_any->notify_all();
}
}
template <typename Lockable>
void wait(std::condition_variable_any &cv, Lockable &lk)
{
struct custom_lock
{
interrupt_flag *self;
Lockable &lk;
custom_lock(interrupt_flag *self_,
std::condition_variable_any &cond,
Lockable &lk_) : self(self_), lk(lk_)
{
self->set_clear_mutex.lock();
self->thread_cond_any = &cond;
}
void unlock()
{
lk.unlock();
self->set_clear_mutex.unlock();
//This allows threads that are trying to interrupt you to acquire the lock on set_clear_mutex and
//check the thread_cond_any pointer once you’re inside the wait() call but not before.
//This is exactly what you were after (but couldn’t manage) with std::condition_variable.
}
void lock()
{
std::lock(self->set_clear_mutex, lk);
}
~custom_lock()
{
self->thread_cond_any = 0;
self->set_clear_mutex.unlock();
}
};
custom_lock cl(this, cv, lk);
interruption_point();
cv.wait(cl);
interruption_point();
}
// rest as before
};
template <typename Lockable>
void interruptible_wait(std::condition_variable_any &cv,
Lockable &lk)
{
this_thread_interrupt_flag.wait(cv, lk);
}

9.2.5 Interrupting other blocking calls

what about other blocking waits: mutex locks, waiting for futures, and the like?
there’s no way to interrupt the wait short of fulfilling the condition being waited for, without access to the internals of the mutex or future.

1
2
3
4
5
6
7
8
9
10
11
template <typename T>
void interruptible_wait(std::future<T> &uf)
{
while (!this_thread_interrupt_flag.is_set()) //loop within the interruptible_wait() function
{
if (uf.wait_for(lk, std::chrono::milliseconds(1)) ==
std::future_status::ready)
break;
}
interruption_point();
}

9.2.6 Handling interruptions

Because thread_interrupted is an exception, all the usual exception-safety precautions must also be taken when calling code that can be interrupted, in order to ensure that resources aren’t leaked, and your data structures are left in a coherent state.

不能让异常传递到主线程上, But if you let exceptions propagate out of the thread function passed to the std::thread constructor, std::terminate() will be called, and the whole program will be terminated. 解决方法: put that catch block inside the wrapper you use for initializing the interrupt_flag.

1
2
3
4
5
6
7
8
9
internal_thread = std::thread([f, &p]
{
p.set_value(&this_thread_interrupt_flag);
try
{
f();
}
catch(thread_interrupted const&)
{} });

9.2.7 Interrupting background tasks on application exit

依次关闭后台运行的线程.

Listing 9.13 Monitoring the filesystem in the background

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
std::mutex config_mutex;
std::vector<interruptible_thread> background_threads;
void background_thread(int disk_id)//描述后台线程的工作
{
while (true)
{
interruption_point();
fs_change fsc = get_fs_changes(disk_id);
if (fsc.has_changes())
{
update_index(fsc);
}
}
}
void start_background_processing()
{
background_threads.push_back(
interruptible_thread(background_thread, disk_1));
background_threads.push_back(
interruptible_thread(background_thread, disk_2));
}
int main()
{
start_background_processing();
process_gui_until_exit();//程序整体退出, 依次退出所有线程
std::unique_lock<std::mutex> lk(config_mutex);
for (unsigned i = 0; i < background_threads.size(); ++i)
{
background_threads[i].interrupt();
}
for (unsigned i = 0; i < background_threads.size(); ++i)
{
background_threads[i].join();
}
}

Why do you interrupt all the threads before waiting for any? Why not interrupt each and then wait for it before moving on to the next?

  • The answer is concurrency. Threads will likely not finish immediately when they’re interrupted, because they have to proceed to the next interruption point and then run any destructor calls and exception-handling code necessary before they exit.
    怎么做呢? By joining with each thread immediately, you therefore cause the interrupting thread to wait, even though it still has useful work it could do—interrupt the other threads. Only when you have no more work to do (all the threads have been interrupted) do you wait.
作者

cx

发布于

2022-10-10

更新于

2022-10-25

许可协议