并发编程与线程安全:从锁机制到无锁编程的面试全解
并发编程与线程安全:从锁机制到无锁编程的面试全解
一、并发的"正确性幻觉":单线程没问题,多线程就炸了
一段代码在单线程测试中运行完美,放到多线程环境就出现数据竞争、死锁、活锁。这不是代码逻辑的问题,而是并发引入了新的正确性维度——可见性、原子性和有序性。三个线程同时对一个计数器加 1,最终结果可能不是 3 而是 2,因为"加 1"操作不是原子的:读取→计算→写回,中间可能被其他线程打断。
并发编程面试的核心考察点不是"你会不会用锁",而是"你能否识别并发风险、选择正确的同步策略、权衡性能与安全"。从悲观锁到乐观锁,从互斥到无锁,每种策略都有适用场景和代价。
二、并发问题的三大根源与同步策略
graph TB subgraph 并发问题根源 A[可见性<br/>线程A的写对线程B不可见] --> D[数据不一致] B[原子性<br/>复合操作被中途打断] --> D C[有序性<br/>指令重排改变执行顺序] --> D end subgraph 同步策略 D --> E[悲观锁<br/>互斥访问] D --> F[乐观锁<br/>CAS + 重试] D --> G[无锁编程<br/>原子操作 + 线程局部] end subgraph 选择依据 E --> H[写冲突多<br/>临界区大] F --> I[写冲突少<br/>临界区小] G --> J[读多写少<br/>可原子化] end可见性问题源于 CPU 缓存——每个核心有自己的 L1/L2 缓存,写操作可能只更新本地缓存而未刷新到主存。原子性问题源于复合操作的非原子性——"读取-修改-写回"三步可能被打断。有序性问题源于编译器和 CPU 的指令重排——为了性能,写操作可能被重排序。
三、代码实现
3.1 线程安全的计数器:从锁到 CAS
import threading import time from typing import Optional class LockedCounter: """基于互斥锁的计数器:简单但有锁竞争""" def __init__(self): self._value = 0 self._lock = threading.Lock() def increment(self) -> int: """加 1(线程安全)""" with self._lock: self._value += 1 return self._value def get(self) -> int: """读取当前值""" with self._lock: return self._value class CASCounter: """基于 CAS(Compare-And-Swap)的计数器:无锁实现""" def __init__(self): self._value = 0 # Python 没有原生 CAS,用锁模拟 # 真实场景使用 atomic 或 ctypes 调用 self._cas_lock = threading.Lock() def increment(self) -> int: """CAS 循环:乐观锁策略""" while True: old = self._value new = old + 1 # CAS: 如果当前值仍为 old,则更新为 new if self._compare_and_swap(old, new): return new # CAS 失败,说明有其他线程修改了值,重试 def _compare_and_swap( self, expected: int, new_value: int ) -> bool: """模拟 CAS 操作""" with self._cas_lock: if self._value == expected: self._value = new_value return True return False def get(self) -> int: return self._value3.2 读写锁与并发安全队列
class ReadWriteLock: """读写锁:读操作并发,写操作互斥""" def __init__(self): self._read_ready = threading.Condition() self._readers = 0 self._writers = 0 def acquire_read(self) -> None: """获取读锁""" with self._read_ready: while self._writers > 0: self._read_ready.wait() self._readers += 1 def release_read(self) -> None: """释放读锁""" with self._read_ready: self._readers -= 1 if self._readers == 0: self._read_ready.notify_all() def acquire_write(self) -> None: """获取写锁""" with self._read_ready: self._writers += 1 while self._readers > 0: self._read_ready.wait() def release_write(self) -> None: """释放写锁""" with self._read_ready: self._writers -= 1 self._read_ready.notify_all() class ConcurrentQueue: """线程安全队列:生产者-消费者模式""" def __init__(self, maxsize: int = 0): self._queue = [] self._maxsize = maxsize self._lock = threading.Lock() self._not_empty = threading.Condition(self._lock) self._not_full = threading.Condition(self._lock) def put(self, item: object, timeout: float = None) -> bool: """入队(阻塞直到有空间)""" with self._not_full: if self._maxsize > 0: deadline = (time.monotonic() + timeout if timeout else None) while len(self._queue) >= self._maxsize: if deadline and time.monotonic() > deadline: return False remaining = (deadline - time.monotonic() if deadline else None) self._not_full.wait(timeout=remaining) self._queue.append(item) self._not_empty.notify() return True def get(self, timeout: float = None) -> Optional[object]: """出队(阻塞直到有元素)""" with self._not_empty: deadline = (time.monotonic() + timeout if timeout else None) while not self._queue: if deadline and time.monotonic() > deadline: return None remaining = (deadline - time.monotonic() if deadline else None) self._not_empty.wait(timeout=remaining) item = self._queue.pop(0) self._not_full.notify() return item3.3 死锁检测与预防
class DeadlockDetector: """死锁检测:基于等待图的环检测""" def __init__(self): # 等待图:线程 → 它等待的锁 self.wait_graph = {} self._lock = threading.Lock() def wait_for( self, thread_id: str, lock_id: str ) -> bool: """记录线程等待锁,检测是否形成环""" with self._lock: self.wait_graph[thread_id] = lock_id # 检测环 if self._detect_cycle(thread_id): del self.wait_graph[thread_id] return False # 拒绝等待,避免死锁 return True def release(self, thread_id: str) -> None: """线程释放锁,移除等待记录""" with self._lock: self.wait_graph.pop(thread_id, None) def _detect_cycle(self, start: str) -> bool: """DFS 检测等待图中的环""" visited = set() current = start while current in self.wait_graph: if current in visited: return True # 发现环 visited.add(current) current = self.wait_graph[current] return False四、并发编程的 Trade-offs 分析
锁的粒度:粗粒度锁(一个大锁保护整个数据结构)实现简单,但并发度低;细粒度锁(每个子结构一个锁)并发度高,但容易死锁。选择的关键是评估写冲突的频率——冲突少用细粒度锁,冲突多用粗粒度锁。
乐观锁 vs. 悲观锁:乐观锁(CAS)在低冲突场景下性能远优于悲观锁(无阻塞、无上下文切换),但在高冲突场景下会频繁重试,反而更慢。经验法则:写冲突率低于 10% 用乐观锁,高于 30% 用悲观锁,中间地带需要基准测试。
无锁编程的复杂度:无锁编程消除了锁竞争,但代码复杂度急剧上升——ABA 问题、内存回收、优先级反转都是无锁编程的特有挑战。除非性能瓶颈明确在锁竞争上,否则不应为了"无锁"而增加代码复杂度。
死锁预防 vs. 检测:预防死锁(固定加锁顺序、超时机制)比检测死锁更可靠,但约束更强。检测死锁(等待图环检测)更灵活,但检测到后需要选择牺牲线程回滚,代价高。生产环境建议以预防为主、检测为辅。
五、总结
并发编程的核心挑战是保证可见性、原子性和有序性。从悲观锁到乐观锁到无锁编程,同步策略的选择取决于冲突频率和临界区大小。锁粒度越细并发度越高但死锁风险越大,乐观锁低冲突时性能好但高冲突时反而更慢。
面试准备建议:掌握互斥锁、读写锁、CAS 三种基本同步机制;理解死锁的四个必要条件和预防策略;能用生产者-消费者模型设计线程间通信。不追求"无锁"的炫技,而是根据场景选择最合适的同步策略。
