当前位置: 首页 > news >正文

Cpp 无锁编程(C++ Concurrency in Action)

本文是 C++ Concurrency in Action 的总结

通过原子变量实现 spinlock

class spinlock_mutex {std::atomic_flag flag;
public:spinlock_mutex(): flag(ATOMIC_FLAG_INIT) {}void lock() {while (flag.test_and_set(std::memory_order_acquire));}void unlock() {flag.clear(std::memory_order_release);}
};

lock 这里使用 acquire 内存序的原因是 lock 需要确保临界区里的读写需要看到上一个线程在 unlock 之前的修改
unlock 使用 release 表示临界区里的修改需要对下一个持锁线程可见

这里值得注意的是这个其实不是无锁编程,虽然没有使用 mutex ,虽然它不会被 block (线程进入 D 状态)

lock-free 的意思是如果有多个线程在操作一个共享的数据,在有限的step里面,其中有一个线程一定能完成这个操作。

从这个定义上看,我们的 spinlock 如果有一个线程 lock 后 panic 了,那其他的线程同样被阻塞无法继续完成相应的操作。因此不是 lock-free

但是这个例子其实用到了无锁编程经常使用到的原子变量和其对应的操作。这也说明了用原子变量不代表就是无锁

实现一个 lock-free stack

一个最简化版的 stack

template <typename T>
class LockFreeStack {private:struct Node {T data_;Node* next;Node(T const& data) : data_(data) {}};std::atomic<Node*> head{nullptr};public:void push(T const& data) {Node* node = new Node(data);node->next = head.load();while (!head.compare_exchange_weak(node->next, node));}bool pop(T& result) {Node* old_head = head.load();while (old_head && !head.compare_exchange_weak(old_head, old_head->next));if (!old_head) {return false;}result = old_head->data_;return true;}
};

pop 这里不能仅靠 load ,因为 pop 需要确保不同线程拿到的值需要不一样,所以需要使用 compare_exchange_weak 来保证如果节点被其他线程取了以后重新获取节点。

这里还有一点小问题,pop 出来的节点使用了拷贝数据来返回,但是拷贝过程中可能发生异常,那这样的话这个数据就会丢失。所以可以使用智能指针来处理这个问题。如下:

template <typename T>
class LockFreeStack {private:struct Node {std::shared_ptr<T> data_;Node* next;Node(T const& data) : data_(std::make_shared<T>(data)) {}};std::atomic<Node*> head{nullptr};public:void push(T const& data) {Node* node = new Node(data);node->next = head.load();while (!head.compare_exchange_weak(node->next, node));}std::shared_ptr<T> pop() {Node* old_head = head.load();while (old_head && !head.compare_exchange_weak(old_head, old_head->next));return old_head ? old_head->data_ : std::shared_ptr<T>{};}
};

现在的代码仍然有一个问题,pop 出节点后没有释放内存。因为在多线程环境下,你无法确定这个节点是否被其他线程持有,所以不能安全的删除。shared_ptr 只管理了 data,没有管理 Node

修复内存泄漏

通过 pop 的线程数阻止竞争

template <typename T>
class LockFreeStack {private:struct Node {//...};std::atomic<Node*> head{nullptr};std::atomic<unsigned> threads_in_pop{0};std::atomic<Node*> to_be_deleted{nullptr};static void delete_nodes(Node* node) {while (node) {auto next = node->next;delete node;node = next;}}void chain_pending_nodes(Node* nodes) {Node* last = nodes;while (Node* const next = last->next) {last = next;}chain_pending_nodes(nodes, last);}void chain_pending_nodes(Node* first, Node* last) {last->next = to_be_deleted.load();while (!to_be_deleted.compare_exchange_weak(last->next, first));}void try_reclaim(Node* node) {if (threads_in_pop == 1) {Node* nodes_to_delete = to_be_deleted.exchange(nullptr);if (!--threads_in_pop) {delete_nodes(nodes_to_delete);} else if (nodes_to_delete) {chain_pending_nodes(nodes_to_delete);}delete node;} else {if (node) {chain_pending_nodes(node);}--threads_in_pop;}}public:void push(T const& data) {//...}std::shared_ptr<T> pop() {threads_in_pop++;Node* old_head = head.load();while (old_head && !head.compare_exchange_weak(old_head, old_head->next));std::shared_ptr<T> res;if (old_head) {res.swap(old_head->data_);}try_reclaim(old_head);return res;}
};

这个实现的核心思想是:如果没有其他线程在执行 pop,就可以安全的把节点删掉。
所以引入了 threads_in_pop 来监控有几个线程,进入 pop 就加一,离开就减一。
如果 threads_in_pop == 1 说明是只有一个线程,可以安全删除,但需要注意的是在 delete_nodes 前需要再判断一次,因为这中间有可能有其他线程进入 pop 然后操作了。
threads_in_pop == 1 的时候可以安全删掉当前pop出的节点,因为其他线程后面肯定不会访问到这个节点。

Pasted image 20260604094954

这个方案有个问题,在低并发的时候还能工作,但是高并发下面可能会失效。现在的方案是监控 pop 操作是否有其他线程,可以考虑监控节点是否有其他的线程在访问。

Hazard Pointer

hazard pointer 基本思路是,如果一个线程要访问另一个线程可能想要删除的对象,它首先会设置一个危险指针来指向该对象,告知另一个线程删除该对象确实会带来风险。一旦不再需要该对象,则清除危险指针。
Hazard Pointer 的工作原理可以压缩成三个操作:Protect(保护)、Retire(退役)、Scan(扫描回收)

  • 保护:线程声明我正在使用这个节点
  • 退役:线程声明我已经把这个节点移除了,但是先不 free
  • 扫描回收:检查待删除链表里面的节点,如果没有线程在使用就释放

手写一个 hazard pointer

改进后的 pop 为例:

  std::shared_ptr<T> pop() {std::atomic<void*>& hp = get_hazard_pointer_for_current_thread();auto old_head = head.load();// protectdo {Node* temp;do {temp = old_head;hp.store(old_head);old_head = head.load();} while (old_head != temp);} while (old_head &&!head.compare_exchange_strong(old_head, old_head->next));hp.store(nullptr);std::shared_ptr<T> res;if (old_head) {res.swap(old_head->data_);// retireif (outstanding_hazard_pointers_for(old_head)) {reclaim_later(old_head);} else {delete old_head;}// scan and deletedelete_nodes_with_no_hazards();}return res;}

获取 hazard pointer,hazard pointer 需要将线程和节点关联起来,并且保存到一个所有线程都可以访问到的地方:

unsigned const MAX_HAZARD_POINTERS = 100;
struct HazardPointer {std::atomic<std::thread::id> id;std::atomic<void*> pointer;
};
HazardPointer hazard_pointers[MAX_HAZARD_POINTERS];
class HpOwner {HazardPointer* hp;public:HpOwner(HpOwner const&) = delete;HpOwner operator=(HpOwner const&) = delete;HpOwner() : hp(nullptr) {for (auto i = 0; i < MAX_HAZARD_POINTERS; i++) {std::thread::id old_id;if (hazard_pointers[i].id.compare_exchange_strong(old_id, std::this_thread::get_id())) {hp = &hazard_pointers[i];break;}}if (!hp) {throw std::runtime_error("No hazard pointers available");}}std::atomic<void*>& get_pointer() { return hp->pointer; }~HpOwner() {hp->pointer.store(nullptr);hp->id.store(std::thread::id());}
};std::atomic<void*>& get_hazard_pointer_for_current_thread() {thread_local static HpOwner hazard;return hazard.get_pointer();
}

判断某个节点是否被其他线程持有:

bool outstanding_hazard_pointers_for(void* p) {for (auto i = 0; i < MAX_HAZARD_POINTERS; i++) {if (hazard_pointers[i].pointer.load() == p) {return true;}}return false;
}

retire 这个节点,也就是把这个节点插入一个待删除的链表里面:

template <typename T>
void do_delete(void* p) {delete static_cast<T*>(p);
}struct DataToReclaim {void* data;std::function<void(void*)> deleter;DataToReclaim* next;template <typename T>DataToReclaim(T* p) : data(p), deleter(&do_delete<T>), next(nullptr) {}~DataToReclaim() { deleter(data); }
};std::atomic<DataToReclaim*> nodes_to_reclaim;
void add_to_reclaim_list(DataToReclaim* node) {node->next = nodes_to_reclaim.load();while (!nodes_to_reclaim.compare_exchange_weak(node->next, node));
}template <typename T>
void reclaim_later(T* data) {add_to_reclaim_list(new DataToReclaim(data));
}

扫描回收:

void delete_nodes_with_no_hazards() {auto current = nodes_to_reclaim.exchange(nullptr);while (current) {auto next = current->next;if (!outstanding_hazard_pointers_for(current->data)) {delete current;} else {add_to_reclaim_list(current);}current = next;}
}

这段代码是比较需要注意的:

//...do {Node* temp;do {temp = old_head;hp.store(old_head);old_head = head.load();} while (old_head != temp);} while (old_head &&!head.compare_exchange_strong(old_head, old_head->next));
//...

这里有两层循环:1. 内层循环,作用是确保 hp 保护的节点还是当前的 head。在获取 old_head (temp = old_head)和 hp.store(old_head) 之间可能存在竞态 2. 外层节点就是一个很正常的 pop 节点所做的操作,如果发现 old_head 被修改了则需要重新去获取头节点。这里因为失败会重新处理内层循环,所以 spurious failure 的代价比较昂贵,在这里用了 exchange_strong

目前这个版本的实现其实有很大的性能问题。因为每一次 pop 都会去扫描待删除链表,扫描是需要占用 CPU 资源的,并且不同线程去访问一个全局的链表是有竞争的。所以有两种解决办法:1. 空间换时间,不需要每次都去扫描,只有当链表里的节点超过一定数量才需要删除 2. 每个线程有自己的链表,不需要去竞争同一个。

Hazard Pointer 内存上界

Hazard Pointer 是有明确的内存上界的。在任意时刻,系统中未被回收的退役节点数最多为:N * (R + K * N) N 是线程数,K 是每个线程的 hazard pointer 数量, R 是触发 scan 的域值。

这个上界的含义是:每个线程最多持有 R 个未被 scan 的退役节点,加上最多有 K×N 个节点被其他线程的 hazard pointer 保护着无法释放。通常 R 被设置为 2×K×N,所以上界约为 N × 3KN = 3KN²。对于 64 线程、K=2 的 M-S queue 场景,上界是 3×2×64² = 24576 个节点。如果每个节点 64 字节,这是约 1.5 MB 的"延迟释放"开销——在绝大多数场景下完全可以接受。

Hazard Pointer 为什么是安全的

Safety Invariant: 节点 P 被释放,当且仅当:(1) P 在某个线程的 retired_list 中;且 (2) 执行 scan 时,P 不在任何线程的 hazard pointer 中。

条件 (1) 要求 P 已经被从数据结构中逻辑删除(否则不会被 retire)。条件 (2) 要求没有线程声明正在使用 P。这两个条件一起保证了:如果一个线程正在使用 P(它的 hazard pointer 中有 P),P 就不会被释放。

这个不变量的证明核心在于 protect-then-validate 协议:如果一个线程 T 通过 hp.store(P)验证 P 仍在数据结构中这两步都成功了,那么在 T 清除它的 hazard pointer 之前,P 一定不会被释放——因为任何执行 scan 的线程在收集 hazard pointer 时一定会看到 T 的声明(acquire-release 语义保证了可见性)。

看一个场景:
场景设定:队列状态 Head → A → B → C,线程 T1 执行 dequeue(目标是摘掉 A),线程 T2 也执行 dequeue 并且还会触发 scan。最危险的交错是:T1 正在保护 A 的过程中,T2 已经把 A retire 了并且开始 scan。

时刻 T1 (dequeuer) T2 (dequeuer + scanner) A 的状态
t0 head = Head.load()

→ 得到 A
- 在队列中
t1 hp[0].store(A) - 在队列中,被T1保护
t2 - head = Head.load()

→ 得到 A
在队列中,被T1保护
t3 - hp[0].store(A) 被T1和T2保护
t4 - 验证 head == Head → 通过 同上
t5 - next = A->next

→ 得到 B
同上
t6 - CAS(Head, A, B)

→ 成功
逻辑删除

,仍被T1和T2保护
t7 - hp[0].clear(); hp[1].clear() 逻辑删除,仅被T1保护
t8 - retire(A)

→ A进入retired_list
退役,仅被T1保护
t9 - scan()

开始:收集所有 hazard pointer
退役,仅被T1保护
t10 - scan 读取 T1.hp[0] → 发现值为 A 退役,scan知道被保护
t11 - A 在 protected_set 中,跳过不释放 退役,安全保留
t12 验证 head == Head → 失败 - 退役,仅被T1保护
t13 hp[0].clear()

→ 重新开始
- 退役,无人保护
t14 - (下次 scan 发现 A 无保护) 退役,无人保护
t15 - delete A

→ 安全释放
已释放
t9-t10 是整个协议的安全关键。 在 t9,T2 的 scan 开始收集所有线程的 hazard pointer。它读取 T1 的 hp[0],发现值为 A。这个读取使用 memory_order_acquire,配合 T1 在 t1 时 hp[0].store(A)使用的 memory_order_release,构成了一个 acquire-release 同步——T2 在 t10 看到的 A,一定是 T1 在 t1 或之后存入的值,不会看到更早的 nullptr。

完整的实现

#include <atomic>
#include <iostream>
#include <memory>
#include <stdexcept>
#include <thread>unsigned const MAX_HAZARD_POINTERS = 100;
struct HazardPointer {std::atomic<std::thread::id> id;std::atomic<void*> pointer;
};
HazardPointer hazard_pointers[MAX_HAZARD_POINTERS];
class HpOwner {HazardPointer* hp;public:HpOwner(HpOwner const&) = delete;HpOwner operator=(HpOwner const&) = delete;HpOwner() : hp(nullptr) {for (auto i = 0; i < MAX_HAZARD_POINTERS; i++) {std::thread::id old_id;if (hazard_pointers[i].id.compare_exchange_strong(old_id, std::this_thread::get_id())) {hp = &hazard_pointers[i];break;}}if (!hp) {throw std::runtime_error("No hazard pointers available");}}std::atomic<void*>& get_pointer() { return hp->pointer; }~HpOwner() {hp->pointer.store(nullptr);hp->id.store(std::thread::id());}
};std::atomic<void*>& get_hazard_pointer_for_current_thread() {thread_local static HpOwner hazard;return hazard.get_pointer();
}bool outstanding_hazard_pointers_for(void* p) {for (auto i = 0; i < MAX_HAZARD_POINTERS; i++) {if (hazard_pointers[i].pointer.load() == p) {return true;}}return false;
}template <typename T>
void do_delete(void* p) {delete static_cast<T*>(p);
}struct DataToReclaim {void* data;std::function<void(void*)> deleter;DataToReclaim* next;template <typename T>DataToReclaim(T* p) : data(p), deleter(&do_delete<T>), next(nullptr) {}~DataToReclaim() { deleter(data); }
};std::atomic<DataToReclaim*> nodes_to_reclaim;
void add_to_reclaim_list(DataToReclaim* node) {node->next = nodes_to_reclaim.load();while (!nodes_to_reclaim.compare_exchange_weak(node->next, node));
}template <typename T>
void reclaim_later(T* data) {add_to_reclaim_list(new DataToReclaim(data));
}void delete_nodes_with_no_hazards() {auto current = nodes_to_reclaim.exchange(nullptr);while (current) {auto next = current->next;if (!outstanding_hazard_pointers_for(current->data)) {delete current;} else {add_to_reclaim_list(current);}current = next;}
}template <typename T>
class LockFreeStack {private:struct Node {std::shared_ptr<T> data_;Node* next;Node(T const& data) : data_(std::make_shared<T>(data)) {}};std::atomic<Node*> head{nullptr};public:void push(T const& data) {Node* node = new Node(data);node->next = head.load();while (!head.compare_exchange_weak(node->next, node));}std::shared_ptr<T> pop() {std::atomic<void*>& hp = get_hazard_pointer_for_current_thread();auto old_head = head.load();do {Node* temp;do {temp = old_head;hp.store(old_head);old_head = head.load();} while (old_head != temp);} while (old_head &&!head.compare_exchange_strong(old_head, old_head->next));hp.store(nullptr);std::shared_ptr<T> res;if (old_head) {res.swap(old_head->data_);if (outstanding_hazard_pointers_for(old_head)) {reclaim_later(old_head);} else {delete old_head;}delete_nodes_with_no_hazards();}return res;}
};int main() {LockFreeStack<int> stack;constexpr int count = 10;std::atomic<int> popped{0};std::thread t([&] {for (int i = 0; i < count; ++i) {stack.push(i);}});std::thread t1([&] {while (popped.load() < count) {auto value = stack.pop();if (value) {popped.fetch_add(1);std::cout << "thread t1 pop: " << *value << '\n';} else {std::this_thread::yield();}}});std::thread t2([&] {while (popped.load() < count) {auto value = stack.pop();if (value) {popped.fetch_add(1);std::cout << "thread t2 pop: " << *value << '\n';} else {std::this_thread::yield();}}});t.join();t1.join();t2.join();
}

Reference Count

我们会想为什么不直接使用 shared_ptr 来管理节点,这样当引用计数为 0 的时候节点自动就被释放了。但是问题在于 cpp 标准里面并没有规定 shared_ptr 是 lock-free 的,它是有可能带锁的,取决于平台的实现,可以通过 std::atomic_is_lock_free 来判断。

所以如果我们想通过引用计数来管理节点的话,就只能自己手动实现。

template <typename T>
class LockFreeStack {private:struct Node;struct CountedNodePtr {int external_count;Node* ptr;};struct Node {std::shared_ptr<T> data_;std::atomic<int> internal_count;CountedNodePtr next;Node(T const& data) : data_(std::make_shared<T>(data)), internal_count(0) {}};std::atomic<CountedNodePtr> head;void increase_head_count(CountedNodePtr& old_counter) {CountedNodePtr new_counter;do {new_counter = old_counter;new_counter.external_count++;} while(!head.compare_exchange_strong(old_counter, new_counter));old_counter.external_count = new_counter.external_count;}public:LockFreeStack() : head(CountedNodePtr{0, nullptr}) {}void push(T const& data) {CountedNodePtr new_node;new_node.external_count = 1;new_node.ptr = new Node(data);new_node.ptr->next = head.load();while (!head.compare_exchange_weak(new_node.ptr->next, new_node));}std::shared_ptr<T> pop() {auto old_head = head.load();while(true) {increase_head_count(old_head);auto ptr = old_head.ptr;if (!ptr) {return std::shared_ptr<T>{};}if (head.compare_exchange_strong(old_head, ptr->next)) {std::shared_ptr<T> res;res.swap(ptr->data_);int const count_increase = old_head.external_count - 2;if (ptr->internal_count.fetch_add(count_increase) == -count_increase) {delete ptr;}return res;} else if (ptr->internal_count.fetch_sub(1) == 1) {delete ptr;}}}
};

这里使用了两个计数,一个外部计数和一个内部计数。外部计数是和指向节点的指针绑定在一起的,而内部计数是在节点内部。外部计数负责“安全拿到节点”,内部计数负责“节点摘下后延迟回收”

increase_head_count(old_head); 首先取头节点,并且马上给头节点的 external_count 加 1,表示线程正在使用这个指针不能马上 delete。
if (head.compare_exchange_strong(old_head, ptr->next)) 如果当前 head 仍然是我刚刚增加过引用计数的 old_head,就把 head 改成 ptr->next。
int const count_increase = old_head.external_count - 2; 这里减掉两个引用计数,分别是:1. 当前线程通过 increase_head_count 增加的 2. head 本身就持有的那个外部引用
ptr->internal_count.fetch_add(count_increase) == -count_increase 剩下的外部引用要转移到内部去,如果加上后等于0就可以删除这个节点。
else if (ptr->internal_count.fetch_sub(1) == 1) 这个分支是节点被其他线程取走了,减去内部计数。 所以如果很多线程都CAS失败了,内部引用就可能为负数。等到最后成功 pop 的线程把外部计数合并进 internal_count 则会抵消。
所以,内部计数为负数的本质是: 失败线程先把“我不用了”记到 internal_count 里;成功线程稍后把 external_count 转进来,两边抵消。负数只是中间状态。

Epoch-Based Reclamation

TODO

实现一个 lock-free queue

TODO

lock-free 的好处和坏处

好处:

  1. 提高并发,如果有锁一些线程会被 block,无锁下每个线程还是持续运行
  2. robustness,整个系统不受持锁线程的影响,就算持锁线程 panic 了,系统仍然可以继续运行

坏处:

  1. 编程难度大。没有锁的保护,你没办法阻止多个线程同时访问某个数据,你必须自己来确保数据结构的 invariants 在并发下 仍然成立
  2. 避免 live lock。死锁是两个线程互相等待卡死的情况,无锁编程不会出现这种情况,但是会出现 live lock. live lock 是两个线程没有卡死,都在积极处理数据,但是没办法继续往前推进。
  3. 降低系统的性能(看起来特别反直觉)。原子变量操作的性能是小于非原子变量的。并且原子变量需要在多个线程之间同步,这对 cpu 缓存非常不友好(因为线程可能在不同的cpu核上,所以同步就需要让cache invalid)。如果在竞争很低的情况下甚至不如 mutexmutex 有快路径,可以避免进入 futex wait)

live lock 的例子:

// thread 1
mutex1.lock();if (!mutex2.try_lock()) {mutex1.unlock();retry();
}// thread 2
mutex1.lock();if (!mutex2.try_lock()) {mutex1.unlock();retry();
}

会发生:

线程A拿到mutex1
线程B拿到mutex2A尝试mutex2失败 -> 释放mutex1
B尝试mutex1失败 -> 释放mutex2然后同时重试

线程没有卡死,但是仍然不断重试。

http://www.gsyq.cn/news/1463211.html

相关文章:

  • Mermaid Live Editor完整指南:免费在线图表创作工具快速上手教程
  • 利用快马平台十分钟搭建51网登录入口原型,验证你的产品设计
  • 如何让经典GTA游戏在现代电脑上完美运行:SilentPatch终极修复指南
  • 从摄像头到麦克风:一份超全的FFmpeg跨平台音视频采集命令清单(含macOS avfoundation / Windows dshow / Linux v4l2)
  • 如何快速掌握xcms代谢组学数据分析工具:新手终极指南
  • 从Windows到Linux:手把手教你为VCS+Verdi生成和配置License(含网卡名修改)
  • Qbot量化交易框架:从零搭建AI自动交易系统的实战指南
  • 【限时解密】某独角兽公司封存的智能离职整合架构图(含RAG增强的员工情绪感知模块)
  • 保姆级教程:从零开始,用GitHub Actions云编译你的专属OpenWrt固件
  • 终极指南:5步掌握免费PDF补丁丁的强大功能
  • 2026年北京农村自建房换瓦全成本核算:彩石金属瓦/铝镁锰瓦/不锈钢瓦哪个最省钱 - 企业深度横评dyy6420
  • 酶联免疫吸附测定(ELISA):从原理到应用的深度剖析
  • 揭秘MatAnyone:时空感知的智能视频抠图革命
  • 企业级代码智能助手:DeepSeek-Coder-V2的技术架构与集成指南
  • 如何用PPTist在浏览器中免费创建专业演示文稿:完整指南
  • 5步精通B站API:Python开发者终极数据获取实战指南
  • LX Music桌面版实战指南:解锁跨平台免费音乐播放的完整方案
  • Mermaid在线编辑器完整指南:实时图表创作与团队协作的高效方案
  • 鸿蒙开发-怎么知道设备支持哪些GPU特性?GLES扩展查询
  • Paperless-ngx终极指南:5步打造企业级无纸化文档管理系统
  • Android视频字幕控件:逐字高亮+滚动同步,适配ExoPlayer/MediaPlayer
  • MinneApple实战指南:3步构建高精度苹果检测与分割系统
  • 3个技巧彻底解决Cursor试用限制:从设备指纹到无限重置
  • 为什么选择TimeMoE-200M:对比传统时间序列模型的7大优势
  • IDEA 新建 JavaWeb 项目 练习 JavaWeb 技术
  • ExcelJS终极指南:掌握Anchor类实现图片与图表精确定位
  • 终极指南:用antimicrox免费实现游戏手柄映射,让每款游戏都能畅玩
  • 别再用ChatGPT做分类了!真正工业级AI分类流水线(含BERT微调→Faiss索引→动态阈值反馈环)
  • 终极LevelDB GUI管理工具:LevelUI实战指南
  • 【紧急预警】2024年档案AI化窗口期仅剩11个月!国家档案局新规倒逼下的3类机构迁移时间表与风险熔断机制