[AI生成] go基于atomic value实现并发map
基于atomic实现的map,通过替换整个map引用(原子值本身是map)来实现并发安全,适合读多写少场景,性能高于sync.RWMutex加锁map,因为cpu支持的原子操作更快。
// Package main —— 基于自定义泛型 atomic.Value 的并发安全 Map(完全无锁,CAS 循环)。
//
// 核心设计思路:
//
// sync/atomic.Value 本身只能存任意 interface{},为了避免每次 Load 的类型断言开销,
// 这里封装一个泛型的 AtomicValue[T],对外暴露严格的类型 T。
//
// 基于 AtomicValue[*map[K]V] 的 Map:
// - 使用"写时复制"(copy-on-write)策略
// - **完全无锁**:读路径是原子读,写路径使用 CompareAndSwap(CAS)循环
// - Map 内部存储 *map[K]V(而不是 map[K]V),因为指针是可比较的,CAS 才能工作
// - 适合读多写少的场景(如配置表、路由表、缓存)
//
// 运行:go run ./atomic-value/ 或 go run ./atomic-value/main.go
package mainimport ("fmt""sync""sync/atomic""time"
)// ============================================================
// 自定义泛型 atomic.Value
// ============================================================// AtomicValue 封装 sync/atomic.Value,提供泛型类型安全的 Load/Store/Swap/CompareAndSwap。
// 零值可用,初始 Load 返回 T 的零值。
//
// 注意:T 必须是可比较类型(如果需要使用 CompareAndSwap)。
// 对于不可比较类型(如 map、slice、func 本身),请用指针包装:AtomicValue[*T]。
type AtomicValue[T any] struct {v atomic.Value
}// NewAtomicValue 创建一个带有初始值的 AtomicValue。
func NewAtomicValue[T any](initial T) *AtomicValue[T] {av := &AtomicValue[T]{}av.Store(initial)return av
}// Load 原子读取当前值。若从未 Store 过,返回 T 的零值。
func (av *AtomicValue[T]) Load() T {raw := av.v.Load()if raw == nil {var zero Treturn zero}return raw.(T)
}// Store 原子写入新值。
func (av *AtomicValue[T]) Store(val T) {av.v.Store(val)
}// Swap 原子写入新值并返回旧值。
func (av *AtomicValue[T]) Swap(new T) (old T) {prev := av.v.Swap(new)if prev == nil {var zero Treturn zero}return prev.(T)
}// CompareAndSwap 原子比较并交换。
// 如果当前值 == old,则写入 new 并返回 true;否则不修改并返回 false。
func (av *AtomicValue[T]) CompareAndSwap(old, new T) bool {return av.v.CompareAndSwap(old, new)
}// ============================================================
// 基于 AtomicValue[*map[K]V] 的并发安全 Map(无锁,CAS 循环)
// ============================================================
//
// 为什么存储 *map[K]V 而不是 map[K]V?
// 因为 map[K]V 是不可比较类型,atomic.Value.CompareAndSwap 会 panic。
// 而 *map[K]V(指针)是可比较的,CAS 语义正好是:
// "当前存储的指针是否仍然是 old 指针?是,则替换为 new 指针"
// —— 这正是写时复制所需的语义。// AtomicMap 是一个**完全无锁**的并发安全泛型 Map,适合读多写少场景。
// 零值可用,推荐通过 NewAtomicMap 创建,以确保初始状态一致。
//
// 写路径使用 CompareAndSwap(CAS)循环:
// 1. Load 当前 *map[K]V 指针
// 2. 创建新的 map 并在新 map 上修改
// 3. CompareAndSwap(oldPtr, newPtr) 原子替换
// 4. 失败则回到第 1 步重试(期间有其他 writer 成功则重试)
type AtomicMap[K comparable, V any] struct {// 存储 *map[K]V(指针可比较,CAS 才能工作)kv AtomicValue[*map[K]V]
}// NewAtomicMap 创建一个新的并发 AtomicMap。
func NewAtomicMap[K comparable, V any]() *AtomicMap[K, V] {m := &AtomicMap[K, V]{}empty := make(map[K]V)m.kv.Store(&empty)return m
}// NewAtomicMapFrom 从一个已有的 map 复制内容后创建并发 AtomicMap。
// 传入的 map 不会被持有或修改。
func NewAtomicMapFrom[K comparable, V any](src map[K]V) *AtomicMap[K, V] {m := &AtomicMap[K, V]{}cp := make(map[K]V, len(src))for k, v := range src {cp[k] = v}m.kv.Store(&cp)return m
}// ---------- 只读方法 ----------// Load 原子读取 key 对应的 value。第二个返回值表示 key 是否存在。
func (m *AtomicMap[K, V]) Load(key K) (V, bool) {snapPtr := m.kv.Load()if snapPtr == nil {var zero Vreturn zero, false}val, ok := (*snapPtr)[key]return val, ok
}// LoadAll 返回当前所有键值对的快照(新 map)。
func (m *AtomicMap[K, V]) LoadAll() map[K]V {snapPtr := m.kv.Load()if snapPtr == nil {return make(map[K]V)}cp := make(map[K]V, len(*snapPtr))for k, v := range *snapPtr {cp[k] = v}return cp
}// Len 返回当前 map 的元素个数(原子读)。
func (m *AtomicMap[K, V]) Len() int {snapPtr := m.kv.Load()if snapPtr == nil {return 0}return len(*snapPtr)
}// Has 判断 key 是否存在(原子读)。
func (m *AtomicMap[K, V]) Has(key K) bool {snapPtr := m.kv.Load()if snapPtr == nil {return false}_, ok := (*snapPtr)[key]return ok
}// Range 以当前快照遍历 map。回调返回 false 则停止遍历。
// 因为 Range 使用的是原子快照,遍历期间 map 的内容不会变化。
func (m *AtomicMap[K, V]) Range(fn func(key K, val V) bool) {snapPtr := m.kv.Load()if snapPtr == nil {return}for k, v := range *snapPtr {if !fn(k, v) {return}}
}// ---------- 写方法(CAS 循环,无锁) ----------// Store 原子设置或更新 key。
// 使用 CAS 循环:Load 当前指针 → 创建新 map 复制并修改 → CompareAndSwap 替换指针。
// CAS 失败说明期间有其他 writer 成功写入,自动重试。
func (m *AtomicMap[K, V]) Store(key K, val V) {for {oldPtr := m.kv.Load()oldMap := *oldPtrcp := make(map[K]V, len(oldMap)+1)for k, v := range oldMap {cp[k] = v}cp[key] = valif m.kv.CompareAndSwap(oldPtr, &cp) {return}}
}// StoreMany 批量设置多个键值对(一次 CAS 循环)。
func (m *AtomicMap[K, V]) StoreMany(kv map[K]V) {if len(kv) == 0 {return}for {oldPtr := m.kv.Load()oldMap := *oldPtrcp := make(map[K]V, len(oldMap)+len(kv))for k, v := range oldMap {cp[k] = v}for k, v := range kv {cp[k] = v}if m.kv.CompareAndSwap(oldPtr, &cp) {return}}
}// Delete 原子删除 key(不存在则无副作用)。
func (m *AtomicMap[K, V]) Delete(key K) {for {oldPtr := m.kv.Load()oldMap := *oldPtrif _, ok := oldMap[key]; !ok {return}cp := make(map[K]V, len(oldMap))for k, v := range oldMap {if k != key {cp[k] = v}}if m.kv.CompareAndSwap(oldPtr, &cp) {return}}
}// DeleteMany 批量删除(一次 CAS 循环)。
func (m *AtomicMap[K, V]) DeleteMany(keys []K) {if len(keys) == 0 {return}for {oldPtr := m.kv.Load()oldMap := *oldPtrhasAny := falsefor _, k := range keys {if _, ok := oldMap[k]; ok {hasAny = truebreak}}if !hasAny {return}cp := make(map[K]V, len(oldMap))for k, v := range oldMap {cp[k] = v}for _, k := range keys {delete(cp, k)}if m.kv.CompareAndSwap(oldPtr, &cp) {return}}
}// Clear 清空整个 map。
func (m *AtomicMap[K, V]) Clear() {for {oldPtr := m.kv.Load()oldMap := *oldPtrif len(oldMap) == 0 {return}empty := make(map[K]V)if m.kv.CompareAndSwap(oldPtr, &empty) {return}}
}// LoadOrStore 原子"读取或设置":key 已存在则返回旧值,否则写入新值并返回。
// 整个流程在 CAS 循环内完成,保证原子语义。
func (m *AtomicMap[K, V]) LoadOrStore(key K, newVal V) (actual V, loaded bool) {for {oldPtr := m.kv.Load()oldMap := *oldPtrif v, ok := oldMap[key]; ok {return v, true}cp := make(map[K]V, len(oldMap)+1)for k, v := range oldMap {cp[k] = v}cp[key] = newValif m.kv.CompareAndSwap(oldPtr, &cp) {return newVal, false}}
}// ============================================================
// 演示与测试入口
// ============================================================// mustLoad 辅助:Load 值(忽略是否存在)
func mustLoad[K comparable, V any](m *AtomicMap[K, V], key K) V {v, _ := m.Load(key)return v
}// loadWith 辅助:Load 值并格式化显示(不存在时显示 <nil>)
func loadWith[K comparable, V any](m *AtomicMap[K, V], key K) string {v, ok := m.Load(key)if !ok {return fmt.Sprintf("<nil>")}return fmt.Sprintf("%v", v)
}// 演示 1:基本读写(单线程,验证基础语义正确)
func demoBasic() {fmt.Println("====== 演示 1:基本读写 ======")m := NewAtomicMap[string, int]()m.Store("a", 1)m.Store("b", 2)m.Store("c", 3)fmt.Printf(" Len() = %d\n", m.Len())fmt.Printf(" Load('a') = %d\n", mustLoad(m, "a"))fmt.Printf(" Load('x') = %v\n", loadWith(m, "x"))fmt.Printf(" Has('b') = %v\n", m.Has("b"))fmt.Printf(" Has('z') = %v\n", m.Has("z"))m.Delete("b")fmt.Printf(" After Delete('b'): len=%d\n", m.Len())m.StoreMany(map[string]int{"x": 100, "y": 200})fmt.Printf(" StoreMany: len=%d\n", m.Len())m.Range(func(k string, v int) bool {fmt.Printf(" Range: %s=%d\n", k, v)return true})
}// 演示 2:LoadOrStore(存在/不存在两种分支都走一遍)
func demoLoadOrStore() {fmt.Println("\n====== 演示 2:LoadOrStore ======")m := NewAtomicMap[string, string]()actual, loaded := m.LoadOrStore("foo", "first")fmt.Printf(" 第一次: actual=%s loaded=%v\n", actual, loaded)actual2, loaded2 := m.LoadOrStore("foo", "second")fmt.Printf(" 第二次: actual=%s loaded=%v\n", actual2, loaded2)
}// 演示 3:LoadAll 快照
func demoSnapshot() {fmt.Println("\n====== 演示 3:LoadAll 快照 ======")m := NewAtomicMap[string, int]()m.Store("x", 1)m.Store("y", 2)snap1 := m.LoadAll()fmt.Printf(" snap1: %v (len=%d)\n", snap1, len(snap1))m.Store("z", 3)snap2 := m.LoadAll()fmt.Printf(" snap2: %v (len=%d)\n", snap2, len(snap2))fmt.Printf(" snap1 len 不变: len=%d\n", len(snap1))
}// 演示 4:Clear
func demoClear() {fmt.Println("\n====== 演示 4:Clear ======")m := NewAtomicMap[int, string]()m.Store(1, "one")m.Store(2, "two")m.Store(3, "three")fmt.Printf(" Before clear: len=%d\n", m.Len())m.Clear()fmt.Printf(" After clear: len=%d\n", m.Len())fmt.Printf(" Load(1)=%v\n", mustLoad(m, 1))
}// 演示 5:并发压力测试(多 goroutine 并发读写)
func demoConcurrency() {fmt.Println("\n====== 演示 5:并发压力测试 ======")const (writerCount = 8readerCount = 32opsPerWriter = 10_000opsPerReader = 10_000keySpace = 100)m := NewAtomicMap[int, int]()var wg sync.WaitGroupwg.Add(writerCount + readerCount)start := time.Now()for w := 0; w < writerCount; w++ {go func(wid int) {defer wg.Done()for i := 0; i < opsPerWriter; i++ {key := (wid*13 + i) % keySpacem.Store(key, wid*1000+i)}}(w)}for r := 0; r < readerCount; r++ {go func() {defer wg.Done()for i := 0; i < opsPerReader; i++ {key := i % keySpace_, _ = m.Load(key)}}()}wg.Wait()elapsed := time.Since(start)fmt.Printf(" writers=%d readers=%d elapsed=%v\n", writerCount, readerCount, elapsed)fmt.Printf(" Len=%d\n", m.Len())fmt.Printf(" ✓ 无 panic\n")
}// 演示 6:自定义泛型 AtomicValue[T] 的直接使用
func demoRawAtomicValue() {fmt.Println("\n====== 演示 6:AtomicValue[T] 直接使用 ======")av := NewAtomicValue[int](42)fmt.Printf(" 初始值: %d\n", av.Load())av.Store(100)fmt.Printf(" Store(100): %d\n", av.Load())old := av.Swap(200)fmt.Printf(" Swap(200): old=%d new=%d\n", old, av.Load())
}// main 运行所有演示
func main() {demoBasic()demoLoadOrStore()demoSnapshot()demoClear()demoConcurrency()demoRawAtomicValue()fmt.Println("\n====== 全部完成 ✓ ======")
}
运行结果
====== 演示 1:基本读写 ======Len() = 3Load('a') = 1Load('x') = <nil>Has('b') = trueHas('z') = falseAfter Delete('b'): len=2StoreMany: len=4Range: a=1Range: c=3Range: x=100Range: y=200====== 演示 2:LoadOrStore ======第一次: actual=first loaded=false第二次: actual=first loaded=true====== 演示 3:LoadAll 快照 ======snap1: map[x:1 y:2] (len=2)snap2: map[x:1 y:2 z:3] (len=3)snap1 len 不变: len=2====== 演示 4:Clear ======Before clear: len=3After clear: len=0Load(1)======= 演示 5:并发压力测试 ======writers=8 readers=32 elapsed=741.8247msLen=100✓ 无 panic====== 演示 6:AtomicValue[T] 直接使用 ======初始值: 42Store(100): 100Swap(200): old=100 new=200====== 全部完成 ✓ ======
