sync.Map源码分析
思路
- 通过两个map, 一个是只读map[read], 一个是读写map[dirty], 通过expunged标记和misses阈值来触发dirty覆盖更新read;
- 读操作, 先去read中查询, 没有再去dirty中查询, 逻辑上的读写分离, 提高查询效率(sync.Map适用于高并发中多读少写);
- 存储操作, 先尝试对read的key更新, 没有再去dirty存储新的key-value; 把操作拦截在read中, 因为是只读的(atomic.Value), 所以减少了锁的开销, 提高了性能.(可能有人会疑惑, 若只更新read的key, 那dirty不就没有被更新了吗, 其实对于dirty的更新时机是在存储时key都不在read和dirty时dirtyLocked)
- 删除操作, 对read只是标记删除, 没有真正删除, 若key不存在read并且dirty新于read才真正删除dirty中key, 把删除操作延迟了, 避免过多的删除操作导致性能问题 因为读操作时有
读取和插入的逻辑
- 读取: 先从只读map中查找key, 没有的话从dirty中查找(超过一定misses, dirty更新到read);
- 插入: 直接插入到dirty
- 删除: 只读map read只是标记删除, 没有真正delete, 真正的删除操作是在dirty更新到read时, 这样做的目的是提高查询的效率
Store存储
- 1.从read中读取, 若在read查询到key, 并且是未expunged的, 那就直接cas更新entry的p指针(在read有肯定在dirty也有, 因为read是通过dirty覆盖的方式更新的)
- 2.上锁, 再次尝试读取; 因为上面没有加锁, 只是利用原子性, 避免dirty刷新造成遗漏
- 3.判断key是否在read中/在dirty中/都不在read和dirty
- 1.在read中, 如果read已经把key标为已删除, 就需要在dirty中插入key, 更新misses值(misses达到阈值再更新read), 更新值value
- 2.在dirty中, 直接更新value(这时不会更新read, 等穿透misses到阈值直接覆盖更新read)
- 3.都不在read和dirty, 证明是新key, 先在dirty增加key, 然后标记read的amended为true, 标记read对比dirty是不完整的, 更新值value
Load查询
- 1.查询read, 若存在并且read对比dirty完整(amended为false), 直接从read读取value
- 2.查询read, 若不存在或者read相对dirty不完整(amended为true), 从dirty中读取value, 并且增加misses次数(达到阈值触发dirty更新read)
Delete删除
- 1.查询read, 若key存在read中, 标记key已删除(设为nil); 若key不存在read中, 并且read相对dirty不完整(amended为true), 从dirty中直接删除dirty
为什么说sync.Map适用大量读,少量写的场景?
借助两个map, 虽然查询时虽然减少了锁的使用, 对查询性能很有帮助, 但是因为read map读不到数据而再次上锁读取,同时dirty map也会经常刷新覆盖read map,整体性能较差.
```go
package sync
import (
"sync/atomic"
"unsafe"
)
type Map struct {
mu Mutex
read atomic.Value // readOnly
dirty map[interface{}]*entry
misses int //超过一定穿透(在read找不到)会触发dirty刷新到read
}
// readOnly 只读map结构
type readOnly struct {
m map[interface{}]*entry
amended bool // 只读map对比dirty map缺失key标记
}
// expunged 标记已删除的条目
var expunged = unsafe.Pointer(new(interface{}))
// entry 每个key对应的value的结构
type entry struct {
//p==nil, entry被真正删除并且m.dirty==nil
//p==expunged, entry标记为删除, 但m.dirty != nil, 从m.dirty找不到entry
//其他情况m.read.m[key], 并且如果m.dirty != nil, m.dirty[key]
p unsafe.Pointer // *interface{}
}
func newEntry(i interface{}) *entry {
return &entry{p: unsafe.Pointer(&i)}
}
// Load 返回存储在map的键值和是否存在标记
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
//读取时, 先从只读map readOnly中读取
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
//若只读map没有key
//上锁, 再次尝试读取; 因为上面没有加锁, 只是利用原子性, 避免dirty刷新造成遗漏
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
//真的在只读map中找不到key, 并且dirty发生过刷新,和只读不一致, 那么久从dirty中读取key
e, ok = m.dirty[key]
// 增加misses, 超过阈值就把dirty刷新到只读map
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
//对entry的p解引用,返回值
return e.load()
}
//load 对entry的p解引用,返回值
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
//p必须是非nil, 并且p没有被标记为已删除的
return nil, false
}
//解引用返回给应用程序
return *(*interface{})(p), true
}
// Store 存储key-value
func (m *Map) Store(key, value interface{}) {
//先从read中读取, 若在read查询到key, 并且是未expunged的, 那就直接cas更新entry的p指针
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
//上锁, 再次尝试读取; 因为上面没有加锁, 只是利用原子性, 避免dirty刷新造成遗漏
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
//如果在read中找到key, 先判断是否已经被标记删除
if e.unexpungeLocked() {
// 如果read已经把key标为已删除, 就需要在dirty中插入key, 等后面misses达到阈值再更新read
m.dirty[key] = e
}
//更新值
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
//如果在read找不到key, 就去dirty找key, 并且更新value
e.storeLocked(&value)
} else {
if !read.amended {
// 若read和dirty都找不到key,证明是新key;
// 先在dirty增加key, 然后标记read的amended为true, 标记read对比dirty是不完整的
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
//存储新的key-value到dirty
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}
// tryStore 如果p没有标记为expunged, 则cas更新p的值的指针
func (e *entry) tryStore(i *interface{}) bool {
for {
p := atomic.LoadPointer(&e.p)
if p == expunged {
//若标记为expunged就不存储返回
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}
// unexpungeLocked 确保该条目没有标记为"删除"
//如果条目先前被删除,则必须将其添加到dirty映射中
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}
// storeLocked 存储值的指针到entry的p
func (e *entry) storeLocked(i *interface{}) {
atomic.StorePointer(&e.p, unsafe.Pointer(i))
}
// LoadOrStore 若key存在, 更新value返回旧value;若不存在,存储key-value,返回新value
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
// Avoid locking if it's a clean hit.
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
actual, loaded, _ = e.tryLoadOrStore(value)
m.missLocked()
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()
return actual, loaded
}
// tryLoadOrStore 若key存在, 更新value返回旧value;若不存在,存储key-value,返回新value
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
// Copy the interface after the first load to make this method more amenable
// to escape analysis: if we hit the "load" path or the entry is expunged, we
// shouldn't bother heap-allocating.
ic := i
for {
if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
return i, false, true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
}
}
// Delete 删除key-value
func (m *Map) Delete(key interface{}) {
//从read查询key
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
//上锁, 再次查询, 以防dirty刷新read造成错误
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
//key不存在read中(代表还没触发dirty刷新read,key不在read,可能在dirty中)
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok {
//key存在read中, 标记删除key
e.delete()
}
}
//delete 标记删除key
func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(&e.p)
//若已经被标记删除就直接返回(重入性)
if p == nil || p == expunged {
return false
}
//把entry的p设为nil,表示read标记删除key了
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return true
}
}
}
// Range 根据f排序key-value
func (m *Map) Range(f func(key, value interface{}) bool) {
// 读取read, 若read不完整, 用dirty更新read
read, _ := m.read.Load().(readOnly)
if read.amended {
// 上锁, 再次查询, 以防dirty刷新read造成错误
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
//dirty更新read
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
//missLocked 增加misses次数, 触发dirty完整更新read
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
//dirtyLocked 将read中未删除的数据加入到dirty中
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
//在dirty中的key都是未标记为expunged的
m.dirty[k] = e
}
}
}
//tryExpungeLocked 判断entry是否被标记删除,并且将标记为nil的entry更新标记为expunge
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}
```