C++并发实战-第六章-总学习笔记第4

C++并发实战-第六章-总学习笔记第4

[TOC]
C++ 并发实战《C++ Concurrency in Action》的学习笔记 4, 记录第六章的部分 Designing lock-based concurrent data structures.
内容是介绍有锁情况下的 data structure 的设计原则, 并且提供了 stack, queue, hash map, and linked list 的设计范例.
不仅仅考虑 thread safety, 还考虑了 concurrency efficiency 和 exception safety 等等设计思想, 非常值得品读与借鉴.

Chapter 6 Designing lock-based concurrent data structures

Some general guidelines for designing data structures for concurrency.

6.1 What does it mean to design for concurrency?

thread-safe: designing a data structure for concurrency means that multiple threads can access the data structure concurrently, either performing the same or distinct operations, and each thread will see a self-consistent view of the data structure. No data will be lost or corrupted, all invariants will be upheld, and there’ll be no problematic race conditions.
形式 In general, a data structure will be safe only for particular types of concurrent access.

  1. 单一操作: It may be possible to have multiple threads performing one type of operation on the data structure concurrently, whereas another operation requires exclusive access by a single thread.
  2. 不同操作: Alternatively, it may be safe for multiple threads to access a data structure concurrently if they’re performing different actions, whereas multiple threads performing the same action would be problematic.

Truly designing for concurrency means more than that, though: it means providing the opportunity for concurrency to threads accessing the data structure. 不是强制并发.

serialization: threads take turns accessing the data protected by the mutex; they must access it serially rather than concurrently. 序列化依次访问. 此时的原则: the smaller the protected region(越小越好), the fewer operations are serialized, and the greater the potential for concurrency.

6.1.1 Guidelines for designing data structures for concurrency

Two aspects to consider when designing data structures for concurrent access:

  1. safe

    • 不可见: Ensure that no thread can see a state where the invariants of the data structure have been broken by the actions of another thread.
    • 包裹范围: Take care to avoid race conditions inherent in the interface to the data structure by providing functions for complete operations rather than for operation steps.
    • 异常: Pay attention to how the data structure behaves in the presence of exceptions to ensure that the invariants are not broken.
    • 死锁: Minimize the opportunities for deadlock when using the data structure by restricting the scope of locks and avoiding nested locks where possible.
    • 构造/析构函数的特别注意: Generally, constructors and destructors require exclusive access to the data structure, but it’s up to the user to ensure that they’re not accessed before construction is complete or after destruction has started.
  2. enabling genuine concurrent access

    • Can the scope of locks be restricted to allow some parts of an operation to be performed outside the lock?
    • Can different parts of the data structure be protected with different mutexes?
    • Do all operations require the same level of protection?
    • Can a simple change to the data structure improve the opportunities for concurrency without affecting the operational semantics?

总结成一句话: how can you minimize the amount of serialization that must occur and enable the greatest amount of true concurrency?

6.2 Lock-based concurrent data structures

设计思路: ensuring that the right mutex is locked when accessing the data and that the lock is held for the minimum amount of time.

6.2.1 A thread-safe stack using locks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <exception>
#include <stack>
#include <mutex>
#include <memory>

struct empty_stack: std::exception
{
const char* what() const throw()
{
return "empty stack";
}
};

template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack(){}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
data=other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;

void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
std::shared_ptr<T> const res(
std::make_shared<T>(std::move(data.top())));
data.pop();
return res;
}
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
value=std::move(data.top());
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};

设计点:

  1. each member function with a lock on the mutex.
    • The only member functions that aren’t safe are the constructors and destructors, but this isn’t a problem; the object can be constructed only once and destroyed only once. The user must ensure that other threads aren’t able to access the stack until it’s fully constructed and must ensure that all threads have ceased accessing the stack before it’s destroyed.
  2. 解决成员函数间的 race condition: between empty() and either of the pop() functions; separate top() and pop() member functions.
  3. potential sources of exceptions:
    • 资源: that’s always safe, and the use of std::lock_guard<> ensures that the mutex is never left locked.
    • 内存不够: either copying/moving the data value throws an exception or not enough memory can be allocated to extend the underlying data structure. Either way, std::stack<> guarantees it will be safe.
    • 越界访问: an empty_stack exception.
    • the call to std::make_shared might throw because it can’t allocate memory for the new object and the internal data required for reference counting, or the copy constructor or move constructor of the data item to be returned might throw when copying/moving into the freshly-allocated memory. In both cases, the C++ runtime and Standard Library ensure that there are no memory leaks and the new object (if any) is correctly destroyed. Because you still haven’t modified the underlying stack, you’re OK.
    • underlying 容器的操作: The call to data.pop() is guaranteed not to throw.
    • value=std::move(data.top());: the copy assignment or move assignment operator that can throw, rather than the construction of a new
      object and an std::shared_ptr instance. Again, you don’t modify the data structure until the call to data.pop(), which is still guaranteed not to throw, so this overload is exception-safe too.
    • empty() doesn’t modify any data, so that’s exception-safe.
  4. deadlock: it’s sensible to require that users of the stack be responsible for ensuring this.
  5. serialization: there’s significant contention on the stack.
    • because a waiting thread must either consume precious resources checking for data or the user must write external wait and notification code

6.2.2 A thread-safe queue using locks and condition variables

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>

template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue()
{}

void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(std::move(new_value));
data_cond.notify_one();
}

void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=std::move(data_queue.front());
data_queue.pop();
}

std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
std::shared_ptr<T> res(
std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}

bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return false;
value=std::move(data_queue.front());
data_queue.pop();
}

std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(
std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}

bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};

If you exclude the wait_and_pop() functions, the analysis you did for the stack applies just as well here.

Rather than continuously calling empty(), the waiting thread can call wait_and_pop() and the data structure will handle the waiting with a condition variable. The call to data_cond.wait() won’t return until the underlying queue has at least one element, so you don’t have to worry about the possibility of an empty queue at this point in the code, and the data is still protected with the lock on the mutex. 把上面 satck 说的 condition variable 融入了 data structure 中了.

exception safety twist:
一个线程抛出异常可能导致其他 sleep 的线程都不会被唤醒: if more than one thread is waiting when an entry is pushed onto the queue, only one thread will be woken by the call to data_cond.notify_one(). But if that thread then throws an exception in wait_and_pop(), such as when the new std::shared_ptr<> is constructed, none of the other threads will be woken.

解决办法:

  • data_cond.notify_all(): 效率不高.
  • to have wait_and_pop() call notify_one() if an exception is thrown, so that another thread can attempt to retrieve the stored
    value.
  • to move the std::shared_ptr<> initialization to the push() call and store std::shared_ptr<> instances rather than direct data values. Copying the std::shared_ptr<> out of the internal std::queue<> then can’t throw an exception, so wait_and_pop() is safe again. 改进如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>

template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T> > data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue()
{}

void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=std::move(*data_queue.front());
data_queue.pop();
}

bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return false;
value=std::move(*data_queue.front());
data_queue.pop();
}

std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
std::shared_ptr<T> res=data_queue.front(); // 改进点
data_queue.pop();
return res;
}

std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res=data_queue.front();
data_queue.pop();
return res;
}

bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}

void push(T new_value)
{
std::shared_ptr<T> data(
std::make_shared<T>(std::move(new_value))); //改进点2: 在 lock 之外分配空间
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
data_cond.notify_one();
}
};

改进点2: There’s an additional benefit: the allocation of the new instance can now be done outside the lock in push().

6.2.3 A thread-safe queue using fine-grained locks and condition variables

思路: In order to use finer-grained locking, you need to look inside the queue at its constituent parts and associate one mutex with each distinct data item.

queue 使用单向链表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <memory>

template<typename T>
class queue
{
private:
struct node
{
T data;
std::unique_ptr<node> next;

node(T data_):
data(std::move(data_))
{}
};

std::unique_ptr<node> head;
node* tail;

public:
queue():
tail(nullptr)
{}

queue(const queue& other)=delete;
queue& operator=(const queue& other)=delete;

std::shared_ptr<T> try_pop()
{
if(!head)
{
return std::shared_ptr<T>();
}
std::shared_ptr<T> const res(
std::make_shared<T>(std::move(head->data)));
std::unique_ptr<node> const old_head=std::move(head);
head=std::move(old_head->next); //head 访问 next
if(!head)
tail=nullptr;
return res;
}

void push(T new_value)
{
std::unique_ptr<node> p(new node(std::move(new_value)));
node* const new_tail=p.get();
if(tail)
{
tail->next=std::move(p);//tail 访问 next
}
else
{
head=std::move(p);
}
tail=new_tail;
}
};

单线程下的设计, 在加入多线程访问的锁之前需要注意一点:
很自然地想到对 head 与 tail 分别加锁, 但是如果 head = tail, push()pop() 都会去访问 next(也就是自身) 导致重复加锁.

ENABLING CONCURRENCY BY SEPARATING DATA

For an empty queue, head and tail now both point to the dummy node rather than being NULL.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#include <memory>
template<typename T>
class queue
{
private:
struct node
{
std::shared_ptr<T> data; //the node now stores the data by pointer
std::unique_ptr<node> next;
};

std::unique_ptr<node> head;
node* tail;

public:
queue():
head(new node),tail(head.get())//create dummy node in constructor
{}

queue(const queue& other)=delete;
queue& operator=(const queue& other)=delete;

std::shared_ptr<T> try_pop()
{
if(head.get()==tail) //不是与 NULL 做检查, 而是与 tail
{
return std::shared_ptr<T>();
}
std::shared_ptr<T> const res(head->data);
std::unique_ptr<node> const old_head=std::move(head);
head=std::move(old_head->next);
return res;
}

void push(T new_value)
{
std::shared_ptr<T> new_data(
std::make_shared<T>(std::move(new_value))); //std::make_shared to avoid the overhead of a second memory allocation for the reference count
std::unique_ptr<node> p(new node); //new dummy node
tail->data=new_data; //set the data on the old dummy node to your newly allocated copy of the new_value
node* const new_tail=p.get();
tail->next=std::move(p);
tail=new_tail;
}
};

thread-safe:

  • push() now accesses only tail, not head, which is an improvement.
  • try_pop() accesses both head and tail, but tail is needed only for the initial comparison, so the lock is short-lived.
  • dummy node means try_pop() and push() are never operating on the same node.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include <memory>
#include <mutex>

template<typename T>
class threadsafe_queue
{
private:
struct node
{
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};

std::mutex head_mutex; //针对 head 的锁
std::unique_ptr<node> head;
std::mutex tail_mutex; //针对 tail 的锁
node* tail;

node* get_tail()//因为在 pop_head() 中只用到 tail 的锁一次, 因此考虑同函数包起来
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
return tail;
}

std::unique_ptr<node> pop_head()
{
std::lock_guard<std::mutex> head_lock(head_mutex); //只需要锁住 head 即可
if(head.get()==get_tail())
{
return nullptr;
}
std::unique_ptr<node> const old_head=std::move(head);
head=std::move(old_head->next);
return old_head;
}


public:
threadsafe_queue():
head(new node),tail(head.get())
{}

threadsafe_queue(const threadsafe_queue& other)=delete;
threadsafe_queue& operator=(const threadsafe_queue& other)=delete;

std::shared_ptr<T> try_pop()
{
std::unique_ptr<node> old_head=pop_head();
return old_head?old_head->data:std::shared_ptr<T>(); //此处不想要加锁
}

void push(T new_value)
{
std::shared_ptr<T> new_data(
std::make_shared<T>(std::move(new_value)));
std::unique_ptr<node> p(new node);
node* const new_tail=p.get();
std::lock_guard<std::mutex> tail_lock(tail_mutex); //从此处开始使用到 tail, 因此从此处开始加锁
tail->data=new_data;
tail->next=std::move(p);
tail=new_tail;
}
};

这里容易遇到 data race 的点是

  • try_pop()push() 都访问 tail. 解决办法是 try_pop() 里调用的 get_tail() 里的 tail_mutexpush() 中的是同一个锁, 这样就保证了肯定有确定的顺序.
  • 同时存在 2 个 mutex 时, 谁先被访问谁先被 lock: call to get_tail() occurs inside the lock on head_mutex. If it didn’t, the call to pop_head() could be stuck in between the call to
    get_tail() and the lock on the head_mutex, because other threads called try_pop()(and thus pop_head()) and acquired the lock first, preventing your initial thread from making progress.
    This ensures that no other threads can change head, and tail only ever moves further away (as new nodes are added in calls to push()), which is perfectly safe.
1
2
3
4
5
6
7
8
9
10
11
12
13
//This is a broken implementation
std::unique_ptr<node> pop_head()
{
node* const old_tail=get_tail(); //不是在 head_mutex 内部调用 tail_mutex lock
std::lock_guard<std::mutex> head_lock(head_mutex);
if(head.get()==old_tail)
{
return nullptr;
}
std::unique_ptr<node> old_head=std::move(head);
head=std::move(old_head->next);
return old_head;
}

异常方面:

  • try_pop() is exception-safe.
  • push() allocates a new instance of T on the heap and a new instance of node, either of which might throw an exception. But both of the newly allocated objects are assigned to smart pointers, so they’ll be freed if an exception is thrown. Once the lock is acquired, none of the remaining operations in push() can throw an exception.

deadlock 方面:
always acquires the head_mutex, and then the tail_mutex, so this will never deadlock.

效率方面:

  • 唯一的短板: try_pop() holds the tail_mutex for only a short time, to protect a read from tail. Consequently, almost the entirety of a call to try_pop() can occur concurrently with a call to push().
  • the operations performed while holding the head_mutex are quite minimal; the expensive delete (in the destructor of the node pointer) is outside the lock.

WAITING FOR AN ITEM TO POP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <memory>
#include <mutex>
#include <condition_variable>

template<typename T>
class threadsafe_queue
{
private:
struct node
{
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};

std::mutex head_mutex;
std::unique_ptr<node> head;
std::mutex tail_mutex;
node* tail;
std::condition_variable data_cond;
public:
threadsafe_queue():
head(new node),tail(head.get())
{}
threadsafe_queue(const threadsafe_queue& other)=delete;
threadsafe_queue& operator=(const threadsafe_queue& other)=delete;

std::shared_ptr<T> try_pop();
bool try_pop(T& value);
std::shared_ptr<T> wait_and_pop();
void wait_and_pop(T& value);
void push(T new_value);
bool empty();
};

template<typename T>
void threadsafe_queue<T>::push(T new_value)
{
std::shared_ptr<T> new_data(
std::make_shared<T>(std::move(new_value)));
std::unique_ptr<node> p(new node);
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
tail->data=new_data;
node* const new_tail=p.get();
tail->next=std::move(p);
tail=new_tail;
}
data_cond.notify_one();// unlock 后 notify 能够稍微避免一些等待
}

A thread-safe queue with locking and waiting: wait_and_pop()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
#include <memory>
#include <mutex>
#include <atomic>
#include "node.hpp"
template<typename T>
class threadsafe_queue
{
private:

std::atomic<node*> tail;
std::atomic<node*> head;

node* get_tail()
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
return tail;
}

//此函数从 wait_pop_head 的 overloads 中择出来. reduce duplication
std::unique_ptr<node> pop_head()
{
std::unique_ptr<node> const old_head=std::move(head);
head=std::move(old_head->next);
return old_head;
}

std::unique_lock<std::mutex> wait_for_data()
{
std::unique_lock<std::mutex> head_lock(head_mutex);
data_cond.wait(head_lock,[&]{return head!=get_tail();});
return std::move(head_lock);
}

std::unique_ptr<node> wait_pop_head()
{
std::unique_lock<std::mutex> head_lock(wait_for_data());
return pop_head();
}

std::unique_ptr<node> wait_pop_head(T& value)
{
std::unique_lock<std::mutex> head_lock(wait_for_data());
value=std::move(*head->data); //把赋值放在锁里. 防止 lost node.
return pop_head();
}

public:
std::shared_ptr<T> wait_and_pop()
{
std::unique_ptr<node> const old_head=wait_pop_head();
return old_head->data;
}

void wait_and_pop(T& value)
{
std::unique_ptr<node> const old_head=wait_pop_head(value);
}
};

这里有个应对 exception 的策略, 具体发生在 wait_and_pop(T& value) 中的 operator =: retrieved from old_head with a copy assignment to the value parameter, there’s a potential exception-safety issue. At this point, the data item has been removed from the queue and the mutex unlocked; all that remains is to return the data to the caller. But if the copy assignment throws an exception (as it might), the data item is lost
because it can’t be returned to the queue in the same place. 简单地来说, 当把一个 node 取出来, 结果在赋值的时候出现异常了, 然而锁已经被 lock 住了, 导致了一个 lost node.
解决办法:

  • If the actual type T used for the template argument has a no-throw move-assignment operator or a no-throw swap operation, you could use that,
  • but you’d prefer a general solution that could be used for any type T. In this case, you have to move the potential throwing operation inside the locked region before the node is removed from the list. 把取值赋值的操作都放在锁里, 赋值异常至少还能放回去. This means you need an extra overload of pop_head() that retrieves the stored value prior to modifying the list.

A thread-safe queue with locking and waiting: try_pop() and empty()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <memory>
#include <mutex>
template<typename T>
class threadsafe_queue
{
private:
std::unique_ptr<node> try_pop_head()
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if(head.get()==get_tail())
{
return std::unique_ptr<node>();
}
return pop_head();
}

std::unique_ptr<node> try_pop_head(T& value)
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if(head.get()==get_tail())
{
return std::unique_ptr<node>();
}
value=std::move(*head->data);
return pop_head();
}

public:
std::shared_ptr<T> try_pop()
{
std::unique_ptr<node> const old_head=try_pop_head();
return old_head?old_head->data:std::shared_ptr<T>();
}

bool try_pop(T& value)
{
std::unique_ptr<node> const old_head=try_pop_head(value);
return old_head;
}

bool empty()
{
std::lock_guard<std::mutex> head_lock(head_mutex);
return (head==get_tail());
}
};

注意 It’s an unbounded queue, 可能需要 a bounded queue 限制 queue 的长度.

6.3 Designing more complex lock-based data structures

6.3.1 Writing a thread-safe lookup table using locks

特点:

  • a lookup table might be modified rarely.
  • For the first cut at a thread-safe lookup table interface, you’ll skip the iterators. 所以可能需要自己造 map 容器.

a few basic operations on a lookup table:

  • Add a new key/value pair.
  • Change the value associated with a given key.
  • Remove a key and its associated value.
  • Obtain the value associated with a given key, if any.
  • others
    • check on whether the container is empty.
    • a snapshot of the complete list of keys.
    • a snapshot of the complete set of key/value pairs.

DESIGNING A MAP DATA STRUCTURE FOR FINE-GRAINED LOCKING

There are three common ways of implementing an associative container like your lookup table:

  1. A binary tree, such as a red-black tree => NG, every lookup or modification has to start by accessing the root node.
  2. A sorted array => even worse, because you can’t tell in advance where in the array a given data value is going to be, so you need a single lock for the whole array.
  3. A hash table

只看散列表:

  • 好处: If you again use a mutex that supports multiple readers or a single writer, you increase the opportunities for concurrency N-fold,
    where N is the number of buckets.
  • The downside is that you need a good hash function for the key.
  • 前提: safely have a separate lock per bucket => which bucket a key belongs to is purely a property of the key and its hash function. => std::hash<> template.

A thread-safe lookup table

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include <vector>
#include <memory>
#include <mutex>
#include <functional>
#include <list>
#include <utility>
#include <shared_mutex>

template<typename Key,typename Value,typename Hash=std::hash<Key> >
class threadsafe_lookup_table
{
private:
class bucket_type
{
private:
typedef std::pair<Key,Value> bucket_value;
typedef std::list<bucket_value> bucket_data;
typedef typename bucket_data::iterator bucket_iterator;

bucket_data data;
mutable std::shared_mutex mutex;

bucket_iterator find_entry_for(Key const& key) const
{
return std::find_if(data.begin(),data.end(),
[&](bucket_value const& item)
{return item.first==key;});
}
public:
Value value_for(Key const& key,Value const& default_value) const
{
std::shared_lock<std::shared_mutex> lock(mutex);
bucket_iterator const found_entry=find_entry_for(key);
return (found_entry==data.end())?
default_value : found_entry->second;
}

void add_or_update_mapping(Key const& key,Value const& value)
{
std::unique_lock<std::shared_mutex> lock(mutex);
bucket_iterator const found_entry=find_entry_for(key);
if(found_entry==data.end())
{
data.push_back(bucket_value(key,value));
}
else
{
found_entry->second=value;
}
}

void remove_mapping(Key const& key)
{
std::unique_lock<std::shared_mutex> lock(mutex);
bucket_iterator const found_entry=find_entry_for(key);
if(found_entry!=data.end())
{
data.erase(found_entry);
}
}
};

std::vector<std::unique_ptr<bucket_type> > buckets;
Hash hasher;

bucket_type& get_bucket(Key const& key) const
{
std::size_t const bucket_index=hasher(key)%buckets.size();
return *buckets[bucket_index];
}

public:
typedef Key key_type;
typedef Value mapped_type;
typedef Hash hash_type;

threadsafe_lookup_table(
unsigned num_buckets=19, Hash const& hasher_=Hash()):
buckets(num_buckets),hasher(hasher_)
{
for(unsigned i=0;i<num_buckets;++i)
{
buckets[i].reset(new bucket_type);
}
}

threadsafe_lookup_table(threadsafe_lookup_table const& other)=delete;
threadsafe_lookup_table& operator=(
threadsafe_lookup_table const& other)=delete;

Value value_for(Key const& key,
Value const& default_value=Value()) const
{
return get_bucket(key).value_for(key,default_value);
}

void add_or_update_mapping(Key const& key,Value const& value)
{
get_bucket(key).add_or_update_mapping(key,value);
}

void remove_mapping(Key const& key)
{
get_bucket(key).remove_mapping(key);
}
};

程序设计的细节:

  • The default is 19, which is an arbitrary prime number; hash tables work best with a prime number of buckets.
  • Each bucket is protected with an instance of std::shared_mutex.
  • Because the number of buckets is fixed, the get_bucket() function can be called without any locking.
  • exception safety
    • value_for doesn’t modify anything, so that’s fine; if it throws an exception, it won’t affect the data structure.
    • remove_mapping modifies the list with the call to erase, but this is guaranteed not to throw, so that’s safe.
    • add_or_update_mapping might throw in either of the two branches of if.
      • push_back is exception-safe and will leave the list in the original state if it throws, so that branch is fine.
      • assignment in the case where you’re replacing an existing value; if the assignment throws, you’re relying on it leaving the original unchanged. But this doesn’t affect the data structure as a whole and is entirely a property of the user-supplied type, so you can safely leave it up to the user to handle this.

设计 lock 住整个 map 的思路: provided you lock them in the same order every time (for example, increasing bucket index), there’ll be no opportunity for deadlock.

Obtaining contents of a threadsafe_lookup_table as std::map<>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <map>
#include <vector>
#include <shared_mutex>
#include <mutex>

typedef int Key;
typedef int Value;

std::map<Key,Value> threadsafe_lookup_table::get_map() const
{
std::vector<std::unique_lock<std::shared_mutex> > locks;
for(unsigned i=0;i<buckets.size();++i)
{
locks.push_back(
std::unique_lock<std::shared_mutex>(buckets[i].mutex));
}
std::map<Key,Value> res;
for(unsigned i=0;i<buckets.size();++i)
{
for(bucket_iterator it=buckets[i].data.begin();
it!=buckets[i].data.end();
++it)
{
res.insert(*it);
}
}
return res;
}

6.3.2 Writing a thread-safe list using locks

each operation holds only the locks on the nodes it’s interested in and unlocks each node as it moves on to the next.

A thread-safe list with iteration support

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#include <memory>
#include <mutex>

template<typename T>
class threadsafe_list
{
struct node
{
std::mutex m;
std::shared_ptr<T> data;
std::unique_ptr<node> next;

node():
next()
{}

node(T const& value):
data(std::make_shared<T>(value))
{}
};

node head;

public:
threadsafe_list()
{}

~threadsafe_list()
{
remove_if([](T const&){return true;});
}

threadsafe_list(threadsafe_list const& other)=delete;
threadsafe_list& operator=(threadsafe_list const& other)=delete;

void push_front(T const& value)
{
std::unique_ptr<node> new_node(new node(value));
std::lock_guard<std::mutex> lk(head.m);
new_node->next=std::move(head.next);
head.next=std::move(new_node);
}

//This function is designed to provide a iterate-over
template<typename Function>
void for_each(Function f) //the function must accept a value of type T as the sole parameter.
{
node* current=&head;
std::unique_lock<std::mutex> lk(head.m);
while(node* const next=current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();//Once you have the lock on that node, you can release the lock on the previous node.
f(*next->data);
current=next;
lk=std::move(next_lk);
}
}

template<typename Predicate>
std::shared_ptr<T> find_first_if(Predicate p)
{
node* current=&head;
std::unique_lock<std::mutex> lk(head.m);
while(node* const next=current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();
if(p(*next->data))
{
return next->data;
}
current=next;
lk=std::move(next_lk);
}
return std::shared_ptr<T>();
}

template<typename Predicate>
void remove_if(Predicate p)
{
node* current=&head;
std::unique_lock<std::mutex> lk(head.m);
while(node* const next=current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if(p(*next->data))
{
std::unique_ptr<node> old_next=std::move(current->next);
current->next=std::move(next->next); //此时手上握有 2 个 mutex, 一个是 lk(current), 另一个是 next_lk(next)
next_lk.unlock();
}
else
{
lk.unlock();
current=next;
lk=std::move(next_lk);
}
}
}
};

The iteration is always one way, always starting from the head node, and always locking the next mutex before releasing the current one ==> no deadlock.

作者

cx

发布于

2022-10-02

更新于

2022-10-25

许可协议