C++并发实战-第七章-总学习笔记第5

C++并发实战-第七章-总学习笔记第5

[TOC]
C++ 并发实战《C++ Concurrency in Action》的学习笔记 5, 记录第五章的部分 Designing lock-free concurrent data structures.
内容是设计 lock-free 的数据结构用于并发, 实现了 stack 与 queue 的示例, 同时包含了与之相关的内存管理的技术(例如垃圾收集).

Chapter 7 Designing lock-free concurrent data structures

7.1 Definitions and consequences

定义:

  • Algorithms and data structures that use mutexes, condition variables, and futures to synchronize the data are called blocking data structures and algorithms.
  • Data structures and algorithms that don’t use blocking library functions are said to be nonblocking. Not all these data structures are lock-free, though.

7.1.1 Types of nonblocking data structures

如下使用 std::atomic_flag 实现的自旋锁即为 nonblocking, 但是不是 lock-free.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <atomic>

class spinlock_mutex
{
std::atomic_flag flag;
public:
spinlock_mutex():
flag(ATOMIC_FLAG_INIT)
{}
void lock()
{
while(flag.test_and_set(std::memory_order_acquire));
}
void unlock()
{
flag.clear(std::memory_order_release);
}
};

更加细分的概念:

  • Obstruction-Free: If all other threads are paused, then any given thread will complete its operation in a bounded number of steps. ==> this is more useful as a characterization of a failed lock-free implementation.

  • Lock-Free: If multiple threads are operating on a data structure, then after a bounded number of steps one of them will complete its operation.

  • Wait-Free: Every thread operating on a data structure will complete its operation in a bounded number of steps, even if other threads are also operating on the data structure.

7.1.2 Lock-free data structures

特点:

  1. 多线程安全访问: more than one thread must be able to access the data structure concurrently. They don’t have to be able to do the same
    operations; a lock-free queue might allow one thread to push and one to pop but break if two threads try to push new items at the same time.

  2. 其他线程的中断不能妨碍此线程的完成. One of the threads accessing the data structure is suspended by the scheduler midway through its operation, the other threads must still be able to complete their operations without waiting for the suspended thread.
    例如 This code can still be lock-free if the compare/exchange would eventually succeed if the other threads were suspended. If it didn’t, you’d have a spin lock, which is nonblocking but not lock-free.

starvation 问题: Data structures that avoid this problem are wait-free as well as lock-free.

7.1.3 Wait-free data structures

定义: A wait-free data structure is a lock-free data structure with the additional property that every thread accessing the data structure can complete its operation within a bounded number of steps, regardless of the behavior of other threads.

不是 wait-free 的例子: a while loop on a compare_exchange_weak or compare_exchange_strong operation.

实现 wait-free 极其困难: Writing wait-free data structures correctly is extremely hard. In order to ensure that every thread can complete its operations within a bounded number of steps, you have to ensure that each operation can be performed in a single pass and that the steps performed by one thread don’t cause an operation on another thread to fail.

7.1.4 The pros and cons of lock-free data structures

pros:

  • enable maximum concurrency
  • robustness
    If a thread dies while holding a lock, that data structure is broken forever. But if a thread dies partway through an operation on a lock-free data structure, nothing is lost except that thread’s data; other threads can proceed normally.

cons:

  • invariants are upheld: not easy.
  • deadlocks are impossible with lock-free data structures, although there is the possibility of live locks instead.
    A live lock occurs when two threads each try to change the data structure, but for each thread, the changes made by the other require the operation to be restarted, so both threads loop and try again. 书中举了一个例子, 两个人无法同时通过一个窄门, 当二者碰巧都要去通过时, 都会发现过不去, 因此退出来然后又一次尝试通过, 又一次失败, 循环往复. live locks are typically short-lived because they depend on the exact scheduling of threads. 因此只会导致性能的一时下降不会导致长期问题(They therefore sap performance rather than cause long-term problems)
  • it may well decrease overall performance 某些情况下还是会导致整体性能下降
    1. the atomic operations used for lock-free code can be much slower than non-atomic operations.
    2. hardware must synchronize data between threads that access the same atomic variables(例如 cache ping-pong 问题).
    3. 看你怎么测, 看重什么方面: relevant performance aspects (whether that’s worst-case wait time, average wait time, overall execution time, or something else)

7.2 Examples of lock-free data structures

注意不是说用了 atomic operation 以及 memory ordering options 就可以保证里面没锁. 这与平台/实现强相关.

Only std::atomic_flag is guaranteed not to use locks in the implementation.

7.2.1 Writing a thread-safe stack without locks

adding a node(单向链表)

步骤:

  1. Create a new node.
  2. Set its next pointer to the current head node.
  3. Set the head node to point to it.

2,3 步会有可能 data race.

注意第 3 步, 我们不能回撤已经添加的 new node. Note that once head has been updated to point to your new node, another thread could read that node. It’s therefore vital that your new node is thoroughly prepared before head is set to point to it; you can’t modify the node afterward.

Implementing push() without locks: atomic compare/exchange operation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <atomic>

template<typename T>
class lock_free_stack
{
private:
struct node
{
T data;
node* next;
node(T const& data_):
data(data_)
{}
};
std::atomic<node*> head;
public:
void push(T const& data)
{
node* const new_node=new node(data);
new_node->next=head.load();
while(!head.compare_exchange_weak(new_node->next,new_node)); //false 时会自动更新 new_node->next, 保证一直指向最新的 head.
}
};

removing a node

步骤:

  1. Read the current value of head.
  2. Read head->next.
  3. Set head to head->next.
  4. Return the data from the retrieved node.
  5. Delete the retrieved node.

可能的问题:

  1. dereference a dangling pointer: 线程1,2 同时进入步骤 1, 线程 2 率先删除 node, 导致步骤 2 的线程 1 访问空悬的指针.
  2. if two threads read the same value of head, they’ll return the same node. 这部符合 stack 的要求.
1
2
3
4
5
6
7
8
9
10
11
template<typename T>
class lock_free_stack
{
public:
void pop(T& result)
{
node* old_head=head.load();
while(!head.compare_exchange_weak(old_head,old_head->next));
result=old_head->data;
}
};

此处的代码有些问题:

  1. 没处理空的 stack. if head is a null pointer, it will cause undefined behavior as it tries to read the next pointer.
  2. exception-safety issue: if an exception is thrown when copying the return value, the value is lost. 解决方案: return a (smart) pointer to the data value.
    注意 If you do the heap allocation as part of pop(), you’re still no better off, because the heap allocation might throw an exception. Instead, you can allocate the memory when you push() the data onto the stack—you have to allocate memory for the node anyway.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <atomic>
#include <memory>

template<typename T>
class lock_free_stack
{
private:
struct node
{
std::shared_ptr<T> data;
node* next;
node(T const& data_):
data(std::make_shared<T>(data_))
{}
};
std::atomic<node*> head;
public:
void push(T const& data)
{
node* const new_node=new node(data);//此处 new
new_node->next=head.load();
while(!head.compare_exchange_weak(new_node->next,new_node));
}
std::shared_ptr<T> pop() //返回智能指针
{
node* old_head=head.load();
while(old_head &&
!head.compare_exchange_weak(old_head,old_head->next));//while loop => not wait-free
return old_head ? old_head->data : std::shared_ptr<T>();
}
};

7.2.2 Stopping those pesky leaks: managing memory in lock-free data structures

为了防止内存泄漏 write a special-purpose garbage collector for nodes

Reclaiming nodes when no threads are in pop()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <atomic>
#include <memory>

template<typename T>
class lock_free_stack
{
private:
std::atomic<unsigned> threads_in_pop;
void try_reclaim(node* old_head);
public:
std::shared_ptr<T> pop()
{
++threads_in_pop;
node* old_head=head.load();
while(old_head &&
!head.compare_exchange_weak(old_head,old_head->next));
std::shared_ptr<T> res;
if(old_head)
{
res.swap(old_head->data);//此处关键, 看下面的解释
}
try_reclaim(old_head);
return res;
}
};

Because you’re going to potentially delay the deletion of the node itself, you can use swap() to remove the data from the node rather than copying the pointer, so that the data will be deleted automatically when you no longer need it rather than it being kept alive because there’s still a reference in a not-yet-deleted node. 我们希望实现 2 种效果:

  1. 通过智能指针返回被删除的 node 的内容–> data, 由于是智能指针, 因此可以不用考虑生命周期的管理问题.
  2. 推迟被删除 node 的删除部分, 直到我们确定是安全的(没有其他线程在 pop() 它).
    因此使用一个 swap() 函数做到了内容部分提取与删除 node 部分的分离(这一部分的安全性放到了 try_reclaim 的实现里). 非常值得借鉴的思路!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#include <atomic>

template<typename T>
class lock_free_stack
{
private:
std::atomic<node*> to_be_deleted;
static void delete_nodes(node* nodes)
{
while(nodes)
{
node* next=nodes->next;
delete nodes;
nodes=next;
}
}
void try_reclaim(node* old_head)
{
if(threads_in_pop==1)
{
node* nodes_to_delete=to_be_deleted.exchange(nullptr);
if(!--threads_in_pop) // no other thread can be accessing this list of pending nodes
{
delete_nodes(nodes_to_delete);
//There may be new pending nodes, but you’re not bothered about them for now,
//as long as it’s safe to reclaim your list.
}
else if(nodes_to_delete)
{
//Not safe to reclaim the nodes, so if there are any nodes_to_delete, you must chain them back onto the list
//Other threads might have called pop() in between the first test of threads_in_pop and the “claiming” of the list,
//potentially adding new nodes to the list that are still being accessed by one or more of those other threads.
//此处是风险最大的地方, 一个线程准备删除 to_be_deleted list 上的 node, 但是其他线程可能还要访问这个 node(通过 node->next). 导致 UB.
//此处的逻辑可以参照下面的 f7_1.png 图
chain_pending_nodes(nodes_to_delete);
}
delete old_head;//既只有当前线程 pop, 又没有其他线程增加 node 到 pending list 中时, 可以直接删除单独的 node.
}
else
{
chain_pending_node(old_head);
--threads_in_pop;
}
}
void chain_pending_nodes(node* nodes)
{
node* last=nodes;
while(node* const next=last->next)
{
last=next;
}
chain_pending_nodes(nodes,last);
}
void chain_pending_nodes(node* first,node* last)
{
last->next=to_be_deleted;
//in a loop here in order to ensure that you don’t leak any nodes that have been added by another thread.
//其他线程也在 pop 出 node 的时候, 会 fail 并更新(增加 node)到 list 中, 直到其他线程改好, 并排到本线程运行.
while(!to_be_deleted.compare_exchange_weak(
last->next,first));
}
void chain_pending_node(node* n)
{
chain_pending_nodes(n,n);
}
};

f7_1.png

我用我的理解来描述一遍上图的过程, 这个过程主要解释 try_reclaim() 里的 if(!--threads_in_pop) 以及 else if(nodes_to_delete) 进一步的 check 的意义.

第一张图是初始状态.
第二张图是线程 A 执行到 if(threads_in_pop==1) 语句, 被挂起.
第三张图是线程 B 执行到 pop() 里的 node* old_head=head.load(); 语句, 也就是 old_head 指向 node Y, 被挂起.
第四张图是线程 C 执行完 pop() 把 node Y 放入了 to_be_deleted list 里.
第五张图是线程 A 接着执行 to_be_deleted.exchange(nullptr); 在 A 的世界里 threads_in_pop == 1, 因此会执行 delete pending list, node Y 会被删除.
第六张图是线程 B 接着执行 old_head->next, 发现 node Y 已经被删除了, B 还尝试访问它 导致 UB.

换句话说要保证能安全地 delete pending list 需要满足 2 个条件, 一个是 threads_in_pop==1 只有一个线程在 pop(), 另一个是 to_be_deleted 没有被其他线程改为不是 nullptr(只有能安全 delete pending list 的线程才能设置为 nullptr). 这才有了后面的双层判断.

程序的其他部分需要注意的已经写在注释里了.

整体的设计思路是

  • 找到矛盾的关键点
  • 把安全一些与危险的部分剥离开来
  • 为了不影响性能, 考虑 lazy 策略, 先把危险的操作堆积起来, 一有机会就全部处理掉
  • 拖累性能而又不危险的操作(例如内存操作)设计其能够在线程等待的时候运行
  • 拆解过后的危险的部分, 进一步分解成为可以理解的步骤(线程间可能冲突), 找到关键变量然后在关键步骤 re-check.

注意此方法的缺陷, pending list 有可能会无限增加导致资源泄漏: In high-load situations, there may never be this quiescent state, because other threads have entered pop() before all the threads initially in pop() have left. Under this scenario, the to_be_deleted list would grow without bounds, and you’d be leaking memory again.

解决思路, 想办法标记那些不被 access 的 node 删除: If there aren’t going to be any quiescent periods, you need to find an alternative mechanism for reclaiming the nodes. The key is to identify when no more threads are accessing a particular node so that it can be reclaimed. By far the easiest such mechanism to reason about is the use of hazard pointers.

7.2.3 Detecting nodes that can’t be reclaimed using hazard pointers

hazard pointers: The basic idea is that if a thread is going to access an object that another thread might want to delete, it first sets a hazard pointer to reference the object, informing the other thread that deleting the object would indeed be hazardous. Once the object is no longer needed, the hazard pointer is cleared. 通过标记/解标记为 hazard 的形式来说明能否在定期的循环检查里安全地删除被指向的对象. 还是垃圾收集的思路.

When a thread wants to delete an object, it must first check the hazard pointers belonging to the other threads in the system. If none of the hazard pointers reference the object, it can safely be deleted. Otherwise, it must be left until later. Periodically, the list of objects that have been left until later is checked to see if any of them can now be deleted.

discovered by Maged Michael: “Safe Memory Reclamation for Dynamic Lock-Free Objects Using Atomic Reads and Writes,” .

一个前提, 即便一个指针指向的对象被删除了, 我们依旧可以查看这个指针的值(内存地址), 这在一般情况下是 UB, 需要自定义 allocator: Using hazard pointers like this relies on the fact that it’s safe to use the value of a pointer after the object it references has been deleted. This is technically undefined behavior if you are using the default implementation of new and delete, so either you need to ensure that your implementation permits it, or you need to use a custom allocator that permits this usage.

An implementation of pop() using hazard pointers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <atomic>
#include <memory>

std::shared_ptr<T> pop()
{
std::atomic<void*>& hp=get_hazard_pointer_for_current_thread();//在别的地方分配此线程的 hazard pointers, 因为可能比较耗时.
node* old_head=head.load();
do // Loop until you’ve set the hazard pointer to head.
{
node* temp;
do
{
temp=old_head;
hp.store(old_head);
old_head=head.load();
} while(old_head!=temp);
}
while(old_head &&
!head.compare_exchange_strong(old_head,old_head->next));//see below
hp.store(nullptr); //Once you’ve claimed the node as yours, you can clear your hazard pointer.
//也就是本线程有了控制权, 对于本线程而言不再是风险的了.
std::shared_ptr<T> res;
if(old_head)
{
res.swap(old_head->data);
if(outstanding_hazard_pointers_for(old_head)) //Check for hazard pointers belonging to other threads referencing a node before you delete it.
{
reclaim_later(old_head);
}
else
{
delete old_head;
}
delete_nodes_with_no_hazards();// no longer any hazard pointers referencing those nodes.
}
return res;
}

上面代码中需要关注的点:

  • You’re using compare_exchange_strong() here because you’re doing work inside the while loop: a spurious failure on compare_exchange_weak() would result in resetting the hazard pointer unnecessarily.
  • Any nodes for which there are still outstanding hazard pointers will be left for the next thread that calls pop().

A simple implementation of get_hazard_pointer_for_current_thread()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include <atomic>
#include <thread>

unsigned const max_hazard_pointers=100;
struct hazard_pointer
{
std::atomic<std::thread::id> id;
std::atomic<void*> pointer;
};
hazard_pointer hazard_pointers[max_hazard_pointers];
class hp_owner
{
hazard_pointer* hp;
public:
hp_owner(hp_owner const&)=delete;
hp_owner operator=(hp_owner const&)=delete;
hp_owner():
hp(nullptr)
{
for(unsigned i=0;i<max_hazard_pointers;++i)
{
std::thread::id old_id;
// searches through the table of owner/pointer pairs looking for an entry without an owner.
// It uses compare_exchange_strong() to check for an entry without an owner and claim it.
if(hazard_pointers[i].id.compare_exchange_strong(
old_id,std::this_thread::get_id()))
{
hp=&hazard_pointers[i];
break;
}
}
if(!hp)
{
throw std::runtime_error("No hazard pointers available");
}
}
std::atomic<void*>& get_pointer()
{
return hp->pointer;
}
~hp_owner()
{
hp->pointer.store(nullptr);
hp->id.store(std::thread::id()); //setting the owner ID to std::thread::id(), allowing another thread to reuse the entry later.
}
};
std::atomic<void*>& get_hazard_pointer_for_current_thread()
{
thread_local static hp_owner hazard; //hazard pointer for the current thread
return hazard.get_pointer();
}

初始化的过程耗时, 后面就不会太多资源需求了: Once the hp_owner instance has been created for a given thread, further accesses are much faster because the pointer is cached, so the table doesn’t have to be scanned again.

implementation of outstanding_hazard_pointers_for():

1
2
3
4
5
6
7
8
9
10
11
bool outstanding_hazard_pointers_for(void* p)
{
for(unsigned i=0;i<max_hazard_pointers;++i)
{
if(hazard_pointers[i].pointer.load()==p)
{
return true;
}
}
return false; //It’s not even worth checking whether each entry has an owner: unowned entries will have a null pointer.
}

A simple implementation of the reclaim functions

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include <atomic>
#include <functional>
//解决范型与 void* 之间的冲突问题
template<typename T>
void do_delete(void* p)
{
delete static_cast<T*>(p);
}
struct data_to_reclaim
{
void* data;
std::function<void(void*)> deleter;
data_to_reclaim* next;
template<typename T>
data_to_reclaim(T* p):
data(p),
deleter(&do_delete<T>),
next(0)
{}
~data_to_reclaim()
{
deleter(data);
}
};
std::atomic<data_to_reclaim*> nodes_to_reclaim;
void add_to_reclaim_list(data_to_reclaim* node)
{
node->next=nodes_to_reclaim.load();
while(!nodes_to_reclaim.compare_exchange_weak(node->next,node));
}
bool outstanding_hazard_pointers_for(void* data){
return false;
}
// template: hazard pointers are a general-purpose utility, so you don’t want to tie yourselves to stack nodes.
template<typename T>
void reclaim_later(T* data)
{
add_to_reclaim_list(new data_to_reclaim(data));
}
void delete_nodes_with_no_hazards()
{
// claims the entire list of nodes to be reclaimed for itself
// this is the only thread trying to reclaim this particular set of nodes.
// Other threads are now free to add further nodes to the list or
// even try to reclaim them without impacting the operation of this thread.
data_to_reclaim* current=nodes_to_reclaim.exchange(nullptr);
while(current)
{
data_to_reclaim* const next=current->next;
if(!outstanding_hazard_pointers_for(current->data))
{
delete current;
}
else
{
add_to_reclaim_list(current);
}
current=next;
}
}

上面的实现不够高效:

  1. makes pop() an expensive operation
    • every pop() call => checking max_hazard_pointers atomic variables. Atomic operations are inherently slow—often 100 times slower than an equivalent non-atomic operation on desktop CPUs
  2. you scan the hazard pointer list for the node you’re about to remove + also scan it for each node in the waiting list. => checking all of them against max_hazard_pointers stored hazard pointers.

BETTER RECLAMATION STRATEGIES USING HAZARD POINTERS

解决上面问题的思路:

  1. trade memory for performance.
    • checking 2*max_hazard_pointers nodes every max_hazard_pointers calls to pop() and reclaiming at least max_hazard_pointers nodes.
    • downside: have to count the nodes on the reclamation list, which means using an atomic count, and you still have multiple threads competing to access the reclamation list itself.
  2. each thread keeps its own reclamation list in a thread-local variable.
    • have max_hazard_pointers*max_hazard_pointers nodes allocated.
    • If a thread exits before all its nodes have been reclaimed, they can be stored in the global list as before and added to the local list of the next thread doing a reclamation process.

可惜 hazard pointers are covered by a patent application submitted by IBM.

也许会被放入 C++ 标准里? http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0566r5.pdf

7.2.4 Detecting nodes in use with reference counting

本节介绍使用引用计数来实现垃圾回收.

Reference counting tackles the problem by storing a count of the number of threads accessing each node.

直观反应的智能有它的问题: although some operations on std::shared_ptr<> are atomic, they aren’t guaranteed to be lock-free. 为啥不加? std::shared_ptr<> is intended for use in many contexts, and making the atomic operations lock-free would likely impose an overhead on all uses of the class.

A lock-free stack using a lock-free std::shared_ptr<> implementation

如果智能指针是 lock-free 的话, 实现起来就会很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
template<typename T>
class lock_free_stack
{
private:
struct node
{
std::shared_ptr<T> data;
std::shared_ptr<node> next;
node(T const& data_):
data(std::make_shared<T>(data_))
{}
};
std::shared_ptr<node> head;
public:
void push(T const& data)
{
std::shared_ptr<node> const new_node=std::make_shared<node>(data);
new_node->next=std::atomic_load(&head);
while(!std::atomic_compare_exchange_weak(&head,
&new_node->next,new_node));
}
std::shared_ptr<T> pop()
{
std::shared_ptr<node> old_head=std::atomic_load(&head);
while(old_head && !std::atomic_compare_exchange_weak(&head,
&old_head,std::atomic_load(&old_head->next)));
if(old_head) {
//Note the need to clear the next pointer from the popped node in order to avoid the potential for deeply nested
//destruction of nodes when the last std::shared_ptr referencing a given node is destroyed.
std::atomic_store(&old_head->next,std::shared_ptr<node>());
return old_head->data;
}
return std::shared_ptr<T>();
}
~lock_free_stack(){
while(pop());
}
};

Concurrency TS 提供了 std::experimental::atomic_shared_ptr<T> in the <experimental/atomic> header. 注意 it may or may not be lock-free on any given implementation.

Stack implementation using std::experimental::atomic_shared_ptr<>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
template<typename T>
class lock_free_stack
{
private:
struct node
{
std::shared_ptr<T> data;
std::experimental::atomic_shared_ptr<node> next;
node(T const& data_):
data(std::make_shared<T>(data_))
{}
};
std::experimental::atomic_shared_ptr<node> head;
public:
void push(T const& data)
{
std::shared_ptr<node> const new_node=std::make_shared<node>(data);
new_node->next=head.load();
while(!head.compare_exchange_weak(new_node->next,new_node));
}
std::shared_ptr<T> pop()
{
std::shared_ptr<node> old_head=head.load();
while(old_head && !head.compare_exchange_weak(
old_head,old_head->next.load()));
if(old_head) {
old_head->next=std::shared_ptr<node>();
return old_head->data;
}

return std::shared_ptr<T>();
}
~lock_free_stack(){
while(pop());
}
};

好处: without having to remember to include the atomic_load and atomic_store calls.

使用智能指针里的计数不一定是 lock-free, 因此考虑手动实现引用计数.
two reference counts for each node: an internal count and an external count.

  • The sum of these values is the total number of references to the node.
  • The external count is kept alongside the pointer to the node and is increased every time the pointer is read.
  • When the reader is finished with the node, it decreases the internal count.
  • A simple operation that reads the pointer will leave the external count increased by one and the internal count decreased by one when it’s finished.
  • When the external count/pointer pairing is no longer required (the node is no longer accessible from a location accessible to multiple threads), the internal count is increased by the value of the external count minus one and the external counter is discarded.
  • Once the internal count is equal to zero, there are no outstanding references to the node and it can be safely deleted.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <atomic>
#include <memory>

template<typename T>
class lock_free_stack
{
private:
struct node;
struct counted_node_ptr
{
int external_count;
node* ptr;
};
struct node
{
std::shared_ptr<T> data;
std::atomic<int> internal_count;
counted_node_ptr next;
node(T const& data_):
data(std::make_shared<T>(data_)),
internal_count(0)
{}
};
std::atomic<counted_node_ptr> head;
public:
~lock_free_stack()
{
while(pop());
}
void push(T const& data)
{
counted_node_ptr new_node;
new_node.ptr=new node(data);
//set up so the internal_count is zero, and the external_count is one.
new_node.external_count=1;
new_node.ptr->next=head.load();
while(!head.compare_exchange_weak(new_node.ptr->next,new_node));
}
};

external_countnode* ptr; 放在一起形成 struct counted_node_ptr 放入 std::atomic<counted_node_ptr> head; 要注意其 struct 大小问题.
如果太大 std::atomic<counted_node_ptr> 不会 lock-free: On those platforms that support a double-word-compare-and-swap operation(可以参考[wiki]https://en.wikipedia.org/wiki/Double_compare-and-swap), this structure will be small enough for std::atomic<counted_node_ptr> to be lock-free.

一些 trick: 利用指针数值里的无效 bits 存储 counter. => require platform-specific knowledge
if you’re willing to limit the size of the counter, and you know that your platform has spare bits in a pointer (for example, because the address space is only 48 bits but a pointer is 64 bits), you can store the count inside the spare bits of the pointer to fit it all back in a single machine word.

Popping a node from a lock-free stack using split reference counts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include <memory>
#include <atomic>

template<typename T>
class lock_free_stack
{
private:
struct counted_node_ptr
{
int external_count;
node* ptr;
};
std::atomic<counted_node_ptr> head;

void increase_head_count(counted_node_ptr& old_counter)
{
counted_node_ptr new_counter;
do
{
new_counter=old_counter;
//once you’ve loaded the value of head, you must first increase the count of external references to the head node
++new_counter.external_count;
}
while(!head.compare_exchange_strong(old_counter,new_counter));
old_counter.external_count=new_counter.external_count;
}
public:
std::shared_ptr<T> pop()
{
counted_node_ptr old_head=head.load();
for(;;)
{
increase_head_count(old_head);
node* const ptr=old_head.ptr; //Once the count has been increased,
//you can safely dereference the ptr field of the value loaded from head in order to access the pointed-to node
if(!ptr)
{
return std::shared_ptr<T>();
}
if(head.compare_exchange_strong(old_head,ptr->next))//注意没有用 while
{
std::shared_ptr<T> res;
res.swap(ptr->data);
int const count_increase=old_head.external_count-2;
// why -2?
//you’ve removed the node from the list, so you drop one off the count for that
//and you’re no longer accessing the node from this thread, so you drop another off the count for that.
if(ptr->internal_count.fetch_add(count_increase)==
-count_increase)
{
delete ptr;
}
return res;
}
else if(ptr->internal_count.fetch_sub(1)==1) //another thread removed your node before you did, or another thread added a new node to the stack.
//you’re the last thread to hold a reference
//(because another thread removed it from the stack), the internal reference count will be 1
{
delete ptr;
}
}
}
};

7.2.5 Applying the memory model to the lock-free stack

需要考虑一些场景, 先从最简单的开始: one thread pushes a data item onto the stack and another thread then pops that data item off the stack some time later.

分析数据结构, three important pieces of data are involved:

  1. counted_node_ptr used for transferring the data: head.
  2. the node structure that head refers to.
  3. the data item pointed to by that node.

动作拆解, push():

  1. constructs the data item.
  2. constructs the node.
  3. sets head.

动作拆解, pop():

  1. loads the value of head.
  2. does a compare/exchange loop on head to increase the reference count.
  3. reads the node structure to obtain the next value.

a happens-before relationship between the store (by the pushing thread) and the load (by the popping thread).

next 指针分别考虑:

pushing:

  • only atomic operation in the push() is the compare_exchange_weak(), and you need a release operation to get a happens-before relationship between threads, the compare_exchange_weak() must be std::memory_order_release or stronger.
  • If the compare_exchange_weak() call fails, nothing has changed and you keep looping, so you need only std::memory_order_relaxed in that case.
1
2
3
4
5
6
7
8
9
void push(T const& data)
{
counted_node_ptr new_node;
new_node.ptr=new node(data);
new_node.external_count=1;
new_node.ptr->next=head.load(std::memory_order_relaxed)
while(!head.compare_exchange_weak(new_node.ptr->next,new_node,
std::memory_order_release,std::memory_order_relaxed));// 注意 order option 可以传入 2 个, 前面的为 success 后面的为 fail
}

poping:

  • std::memory_order_acquire or stronger before the access to next.
  • The pointer you dereference to access the next field is the old value read by the compare_exchange_strong() in increase_head_count(), so you need the ordering on that if it succeeds.
1
2
3
4
5
6
7
8
9
10
11
12
void increase_head_count(counted_node_ptr& old_counter)
{
counted_node_ptr new_counter;
do
{
new_counter=old_counter;
++new_counter.external_count;
}
while(!head.compare_exchange_strong(old_counter,new_counter,
std::memory_order_acquire,std::memory_order_relaxed));
old_counter.external_count=new_counter.external_count;
}

data 理清顺序:

  • store to data in the push() thread is sequenced before the store to head.
  • the call to increase_head_count() is sequenced before the load of ptr->data.
  • the acquire operation in increase_head_count() ensures that there’s a synchronizes-with relationship between the store in the push() thread and that compare/exchange.

==> there’s a happens-before relationship with data between two threads.

The only other place where ptr->data is changed is the call to swap(). no other thread can be operating on the same node. ==> safe

many other threads modify head in the meantime

head is only ever modified by compare/exchange operations. Because these are read-modify-write operations, they form part of the release sequence headed by the compare/exchange in push(). Therefore, the compare_exchange_weak() in push() synchronizes with a call to compare_exchange_strong() in increase_head_count(), which reads the value stored, even if many other threads modify head in the meantime.

modifying the reference count(fetch_add() operations)

any thread that did not successfully retrieve the data knows that another thread did modify the node data; the successful thread used swap() to extract the referenced data item. Therefore you need to ensure that swap() happens before the delete in order to avoid a data race.

自然而然地推出: make the fetch_add() in the successful-return branch use std::memory_order_release and the fetch_add() in the loop-again branch use std::memory_order_acquire.

这样做还可以压榨, else if(ptr->internal_count.fetch_add( 里的 fetch_add() is a read-modify-write operation, it forms part of the release
sequence, so you can do that with an additional load(): fetch_add() with std::memory_order_relaxed + load() with std::memory_order_acquire.

汇总如下:
A lock-free stack with reference counting and relaxed atomic operations

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#include <atomic>
#include <memory>

template<typename T>
class lock_free_stack
{
private:
struct node;
struct counted_node_ptr
{
int external_count;
node* ptr;
};
struct node
{
std::shared_ptr<T> data;
std::atomic<int> internal_count;
counted_node_ptr next;
node(T const& data_):
data(std::make_shared<T>(data_)),
internal_count(0)
{}
};
std::atomic<counted_node_ptr> head;
void increase_head_count(counted_node_ptr& old_counter)
{
counted_node_ptr new_counter;
do
{
new_counter=old_counter;
++new_counter.external_count;
}
while(!head.compare_exchange_strong(
old_counter,new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
old_counter.external_count=new_counter.external_count;
}
public:
~lock_free_stack()
{
while(pop());
}
void push(T const& data)
{
counted_node_ptr new_node;
new_node.ptr=new node(data);
new_node.external_count=1;
new_node.ptr->next=head.load(std::memory_order_relaxed);
while(!head.compare_exchange_weak(
new_node.ptr->next,new_node,
std::memory_order_release,
std::memory_order_relaxed));
}
std::shared_ptr<T> pop()
{
counted_node_ptr old_head=
head.load(std::memory_order_relaxed);
for(;;)
{
increase_head_count(old_head);
node* const ptr=old_head.ptr;
if(!ptr)
{
return std::shared_ptr<T>();
}
if(head.compare_exchange_strong(
old_head,ptr->next,std::memory_order_relaxed))
{
std::shared_ptr<T> res;
res.swap(ptr->data);
int const count_increase=old_head.external_count-2;
if(ptr->internal_count.fetch_add(
count_increase,std::memory_order_release)==-count_increase)
{
delete ptr;
}
return res;
}
else if(ptr->internal_count.fetch_add(
-1,std::memory_order_relaxed)==1)
{
ptr->internal_count.load(std::memory_order_acquire);
delete ptr;
}
}
}
};

lots of the complexity in lock-free code comes from managing memory.

7.2.6 Writing a thread-safe queue without locks

a perfectly serviceable single-producer, single-consumer(SPSC) queue:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <memory>
#include <atomic>

template<typename T>
class lock_free_queue
{
private:
struct node
{
std::shared_ptr<T> data;
node* next;
node():
next(nullptr)
{}
};
std::atomic<node*> head;
std::atomic<node*> tail;
node* pop_head()
{
node* const old_head=head.load();
if(old_head==tail.load())
{
return nullptr;
}
head.store(old_head->next);
return old_head;
}
public:
lock_free_queue():
head(new node),tail(head.load())
{}
lock_free_queue(const lock_free_queue& other)=delete;
lock_free_queue& operator=(const lock_free_queue& other)=delete;
~lock_free_queue()
{
while(node* const old_head=head.load())
{
head.store(old_head->next);
delete old_head;
}
}
std::shared_ptr<T> pop()
{
node* old_head=pop_head();
if(!old_head)
{
return std::shared_ptr<T>();
}
std::shared_ptr<T> const res(old_head->data);
delete old_head;
return res;
}
void push(T new_value)
{
std::shared_ptr<T> new_data(std::make_shared<T>(new_value));
node* p=new node;
node* const old_tail=tail.load();
old_tail->data.swap(new_data);
old_tail->next=p;
tail.store(p);
}
};

几个需要注意同步的点:

  • The store to tail synchronizes with the load from tail => 通过原子变量实现.
  • the store to data happens before the load:
    • store to tail synchronizes with the load from tail.
    • the store to the preceding node’s data pointer is sequenced before the store to tail.
    • the load from tail is sequenced before the load from the data pointer.

HANDLING MULTIPLE THREADS IN PUSH()

如何解决多个线程下的并发 pop()push().

思路1: add a dummy node between the real nodes.
对于 push(): the current tail node that needs updating is the next pointer, which could therefore be made atomic.
对于 pop(): require a minor change to pop() in order to discard nodes with a null data pointer and loop again.

The downside here is that every pop() call will typically have to remove two nodes, and there are twice as many memory allocations.

思路2: make the data pointer atomic and set that with a call to compare/exchange.
对于 push(): If a call to compare/exchange succeeds, this is your tail node, and you can safely set the next pointer to your new node and then update tail. If the compare/exchange fails because another thread has stored the data, you loop around, reread tail, and start again.

Listing 7.15 A (broken) first attempt at revising push()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void push(T new_value)
{
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr=new node;
new_next.external_count=1;
for(;;)
{
node* const old_tail=tail.load();
T* old_data=nullptr;
if(old_tail->data.compare_exchange_strong(
old_data,new_data.get()))
{
old_tail->next=new_next;
tail.store(new_next.ptr);//此处有问题, 看下方.
new_data.release();
break;
}
}
}

tail.store(new_next.ptr); 的同时有别的线程修改了 tail 指针比如 pop(), 那么 new 会去访问已经被删掉的 tail 导致 UB. 如何解决呢?
因为是与 pop() 一样的问题, 可以考虑再加一对 external 与 internal count. 具体思路如下:

Having two external counts for the same node requires a modification to the reference-counting scheme to avoid deleting the node too early. You can address this by also counting the number of external counters inside the node structure and decreasing this number when each external counter is destroyed (as well as adding the corresponding external count to the internal count). If the internal count is zero and there are no external counters, you know the node can safely be deleted.

Listing 7.16 Implementing push() for a lock-free queue with a reference-counted tail

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#include <atomic>
#include <memory>
template<typename T>
class lock_free_queue
{
private:
struct node;
struct counted_node_ptr
{
int external_count;
node* ptr;
};
std::atomic<counted_node_ptr> head;
std::atomic<counted_node_ptr> tail;
struct node_counter //Its important to update these counts together as a single entity in order to avoid race conditions.
{
unsigned internal_count:30;
unsigned external_counters:2;// at most two such counters.
};//Keeping the structure within a machine word makes it more likely that the atomic operations can be lock-free on many platforms.
struct node
{
std::atomic<T*> data;
std::atomic<node_counter> count;
counted_node_ptr next;
node()
{
node_counter new_count;
new_count.internal_count=0;
new_count.external_counters=2;
//why 2?
// every new node starts out referenced from tail and
// from the next pointer of the previous node once you’ve added it to the queue.
count.store(new_count);
next.ptr=nullptr;
next.external_count=0;
}
};
public:
void push(T new_value)
{
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr=new node;
new_next.external_count=1;
counted_node_ptr old_tail=tail.load();
for(;;)
{
increase_external_count(tail,old_tail);
T* old_data=nullptr;
if(old_tail.ptr->data.compare_exchange_strong(
old_data,new_data.get()))
{
old_tail.ptr->next=new_next;
old_tail=tail.exchange(new_next);
free_external_counter(old_tail);
new_data.release();
break;
}
old_tail.ptr->release_ref();
}
}
};

Listing 7.17 Popping a node from a lock-free queue with a reference-counted tail

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <memory>
#include <atomic>
template<typename T>
class lock_free_queue
{
private:
#include "node.hpp"
struct counted_node_ptr
{
int external_count;
node* ptr;
};

std::atomic<counted_node_ptr> head;
std::atomic<counted_node_ptr> tail;
public:
std::unique_ptr<T> pop()
{
counted_node_ptr old_head=head.load(std::memory_order_relaxed);
for(;;)
{
increase_external_count(head,old_head);
node* const ptr=old_head.ptr;
if(ptr==tail.load().ptr) //If the head node is the same as the tail node, you can release the reference
{
ptr->release_ref();
return std::unique_ptr<T>();
}
if(head.compare_exchange_strong(old_head,ptr->next))// compares the external count and pointer as a single entity after releasing the reference
{
T* const res=ptr->data.exchange(nullptr);
free_external_counter(old_head); //released the external counter to the popped node
return std::unique_ptr<T>(res);
}
ptr->release_ref();
}
}
};

接下来是 reference-counting functions

Listing 7.18 Releasing a node reference in a lock-free queue: release_ref()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include <memory>
#include <atomic>
template<typename T>
class lock_free_queue
{
private:
struct node;
struct node_counter
{
unsigned internal_count:30;
unsigned external_counters:2;
};

struct counted_node_ptr
{
int external_count;
node* ptr;
};
struct node
{
std::shared_ptr<T> data;
std::atomic<node_counter> count;
std::atomic<int> internal_count;
counted_node_ptr next;
node(T const& data_):
data(std::make_shared<T>(data_)),
internal_count(0)
{}
void release_ref()
{
node_counter old_counter=
count.load(std::memory_order_relaxed);
node_counter new_counter;
do
{
new_counter=old_counter;
--new_counter.internal_count;
}
while(!count.compare_exchange_strong(
old_counter,new_counter,
std::memory_order_acquire,std::memory_order_relaxed));
if(!new_counter.internal_count &&
!new_counter.external_counters) //if both the internal and external counts are now zero, this is the last reference
{
delete this;
}
}
};
std::atomic<counted_node_ptr> head;
std::atomic<counted_node_ptr> tail;
};

Listing 7.19 Obtaining a new reference to a node in a lock-free queue increase_external_count()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template<typename T>
class lock_free_queue
{
private:
static void increase_external_count(
std::atomic<counted_node_ptr>& counter,
counted_node_ptr& old_counter)
{
counted_node_ptr new_counter;
do
{
new_counter=old_counter;
++new_counter.external_count;
}
while(!counter.compare_exchange_strong(
old_counter,new_counter,
std::memory_order_acquire,std::memory_order_relaxed));
old_counter.external_count=new_counter.external_count;
}
};

Listing 7.20 Freeing an external counter to a node in a lock-free queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
template<typename T>
class lock_free_queue
{
private:
static void free_external_counter(counted_node_ptr &old_node_ptr)
{
node* const ptr=old_node_ptr.ptr;
int const count_increase=old_node_ptr.external_count-2;
node_counter old_counter=
ptr->count.load(std::memory_order_relaxed);
node_counter new_counter;
do
{
new_counter=old_counter;
--new_counter.external_counters;
new_counter.internal_count+=count_increase;
}
while(!ptr->count.compare_exchange_strong(
old_counter,new_counter,
std::memory_order_acquire,std::memory_order_relaxed));
if(!new_counter.internal_count &&//If both the values are now zero, there are no more references to the node, so it can be safely deleted
!new_counter.external_counters) //This has to be done as a single action
{
delete ptr;
}
}
};

上面的做法其实不是 lock-free 的:
Once one thread has started a push() operation by successfully completing the compare_exchange_strong() on old_tail.ptr->data. no other thread can perform a push() operation.
This is a busy wait, which consumes CPU cycles without achieving anything. Consequently, this is effectively a lock.
比一般的 mutex 锁更坏的一点是:
the operating system can give priority to the thread that holds the lock on a mutex if there are blocked threads, it can’t do so in this case, so the blocked threads will waste CPU cycles until the first thread is done.

MAKING THE QUEUE LOCK-FREE BY HELPING OUT ANOTHER THREAD

解决思路: find a way for a waiting thread to make progress even if the thread doing the push() is stalled. One way to do this is to help the stalled thread by doing its work for it.

具体的内容: the next pointer on the tail node needs to be set to a new dummy node, and then the tail pointer itself must be updated.

Listing 7.21 pop() modified to allow helping on the push() side

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <atomic>
template<typename T>
class lock_free_queue
{
private:
struct node
{
std::atomic<T*> data;
std::atomic<node_counter> count;
std::atomic<counted_node_ptr> next; //the next pointer is now atomic
};
public:
std::unique_ptr<T> pop()
{
counted_node_ptr old_head=head.load(std::memory_order_relaxed);
for(;;)
{
increase_external_count(head,old_head);
node* const ptr=old_head.ptr;
if(ptr==tail.load().ptr)
{
return std::unique_ptr<T>();
}
counted_node_ptr next=ptr->next.load(); //load of next is atomic
if(head.compare_exchange_strong(old_head,next))
{
T* const res=ptr->data.exchange(nullptr);
free_external_counter(old_head);
return std::unique_ptr<T>(res);
}
ptr->release_ref();
}
}
};

Listing 7.22 A sample push() with helping for a lock-free queue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
template<typename T>
class lock_free_queue
{
private:
void set_new_tail(counted_node_ptr &old_tail,
counted_node_ptr const &new_tail)
{
node* const current_tail_ptr=old_tail.ptr;
while(!tail.compare_exchange_weak(old_tail,new_tail) &&
old_tail.ptr==current_tail_ptr);
if(old_tail.ptr==current_tail_ptr)
free_external_counter(old_tail);
else
current_tail_ptr->release_ref();
}
public:
void push(T new_value)
{
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr=new node;
new_next.external_count=1;
counted_node_ptr old_tail=tail.load();
for(;;)
{
increase_external_count(tail,old_tail);
T* old_data=nullptr;
if(old_tail.ptr->data.compare_exchange_strong(
old_data,new_data.get()))
{
counted_node_ptr old_next={0};
if(!old_tail.ptr->next.compare_exchange_strong(
old_next,new_next))
{
delete new_next.ptr; //another thread has already set the next pointer.
new_next=old_next; //use the next value that the other thread set for updating tail.
}
set_new_tail(old_tail, new_next);
new_data.release();
break;
}
else //an else clause to do the helping
{
counted_node_ptr old_next={0};
if(old_tail.ptr->next.compare_exchange_strong(
old_next,new_next))
{
old_next=new_next;
new_next.ptr=new node;
}
set_new_tail(old_tail, old_next);
}
}
}
};

一个潜在的效率风险点: there are a lot of new and delete calls for such a small piece of code, because new nodes are allocated on push() and destroyed in pop(). The efficiency of the memory allocator therefore has a considerable impact on the performance of this code.
解决方法:

  1. try it and measure the performance of the code before and after.
  2. having a separate memory allocator on each thread and using free lists to recycle nodes rather than returning them to the allocator.

7.3 Guidelines for writing lock-free data structures

7.3.1 Guideline: use std::memory_order_seq_cst for prototyping

对于检查: Unless you have an algorithm checker that can systematically test all possible combinations of thread visibilities that are consistent with the specified ordering guarantees (and these things do exist), running the code isn’t enough.

7.3.2 Guideline: use a lock-free memory reclamation scheme

难点: managing memory.

  • It’s essential to avoid deleting objects when other threads might still have references to them,
  • but you still want to delete the object as soon as possible in order to avoid excessive memory consumption.

three techniques for ensuring that memory can safely be reclaimed:

  1. Waiting until no threads are accessing the data structure and deleting all objects that are pending deletion.
  2. Using hazard pointers to identify that a thread is accessing a particular object.
  3. Reference counting the objects so that they aren’t deleted until there are no outstanding references.

更大维度上的解决办法:

  1. this is the ideal scenario for using a garbage collector.
  2. recycle nodes and only free them completely when the data structure is destroyed.
    The downside here is that another problem becomes more prevalent. This is the so-called ABA problem.

7.3.3 Guideline: watch out for the ABA problem

解决方法: an ABA counter alongside the variable x. The compare/exchange operation is then done on the combined structure of x plus the counter as a single unit.

常见的场景: The ABA problem is particularly prevalent in algorithms that use free lists or otherwise recycle nodes rather than returning them to the allocator.

7.3.4 Guideline: identify busy-wait loops and help the other thread

解决办法: the waiting thread performs the incomplete steps if it’s scheduled to run before the original thread completes the operation, you can remove the busy-wait and the operation is no longer blocking.

Summary

本章设计 data structures 的意义: Wherever data is shared between threads, you need to think about the data structures used and how the data is synchronized between threads. By designing data structures for concurrency, you can encapsulate that responsibility in the data structure itself, so the rest of the code can focus on the task it’s trying to perform with the data rather than the data synchronization.

作者

cx

发布于

2022-10-03

更新于

2022-10-25

许可协议