go

sync.Map

sync.Map源码分析

Posted by Liangjf on July 20, 2020

sync.Map源码分析

思路

  • 通过两个map, 一个是只读map[read], 一个是读写map[dirty], 通过expunged标记和misses阈值来触发dirty覆盖更新read;
  • 读操作, 先去read中查询, 没有再去dirty中查询, 逻辑上的读写分离, 提高查询效率(sync.Map适用于高并发中多读少写);
  • 存储操作, 先尝试对read的key更新, 没有再去dirty存储新的key-value; 把操作拦截在read中, 因为是只读的(atomic.Value), 所以减少了锁的开销, 提高了性能.(可能有人会疑惑, 若只更新read的key, 那dirty不就没有被更新了吗, 其实对于dirty的更新时机是在存储时key都不在read和dirty时dirtyLocked)
  • 删除操作, 对read只是标记删除, 没有真正删除, 若key不存在read并且dirty新于read才真正删除dirty中key, 把删除操作延迟了, 避免过多的删除操作导致性能问题 因为读操作时有

读取和插入的逻辑

  • 读取: 先从只读map中查找key, 没有的话从dirty中查找(超过一定misses, dirty更新到read);
  • 插入: 直接插入到dirty
  • 删除: 只读map read只是标记删除, 没有真正delete, 真正的删除操作是在dirty更新到read时, 这样做的目的是提高查询的效率

Store存储

  • 1.从read中读取, 若在read查询到key, 并且是未expunged的, 那就直接cas更新entry的p指针(在read有肯定在dirty也有, 因为read是通过dirty覆盖的方式更新的)
  • 2.上锁, 再次尝试读取; 因为上面没有加锁, 只是利用原子性, 避免dirty刷新造成遗漏
  • 3.判断key是否在read中/在dirty中/都不在read和dirty
    • 1.在read中, 如果read已经把key标为已删除, 就需要在dirty中插入key, 更新misses值(misses达到阈值再更新read), 更新值value
    • 2.在dirty中, 直接更新value(这时不会更新read, 等穿透misses到阈值直接覆盖更新read)
    • 3.都不在read和dirty, 证明是新key, 先在dirty增加key, 然后标记read的amended为true, 标记read对比dirty是不完整的, 更新值value

Load查询

  • 1.查询read, 若存在并且read对比dirty完整(amended为false), 直接从read读取value
  • 2.查询read, 若不存在或者read相对dirty不完整(amended为true), 从dirty中读取value, 并且增加misses次数(达到阈值触发dirty更新read)

Delete删除

  • 1.查询read, 若key存在read中, 标记key已删除(设为nil); 若key不存在read中, 并且read相对dirty不完整(amended为true), 从dirty中直接删除dirty

为什么说sync.Map适用大量读,少量写的场景?

借助两个map, 虽然查询时虽然减少了锁的使用, 对查询性能很有帮助, 但是因为read map读不到数据而再次上锁读取,同时dirty map也会经常刷新覆盖read map,整体性能较差.

```go
package sync

import (
	"sync/atomic"
	"unsafe"
)

type Map struct {
	mu Mutex
	read atomic.Value // readOnly
	dirty map[interface{}]*entry
	misses int //超过一定穿透(在read找不到)会触发dirty刷新到read
}

// readOnly 只读map结构
type readOnly struct {
	m       map[interface{}]*entry
	amended bool // 只读map对比dirty map缺失key标记
}

// expunged 标记已删除的条目
var expunged = unsafe.Pointer(new(interface{}))

// entry 每个key对应的value的结构
type entry struct {
	//p==nil, entry被真正删除并且m.dirty==nil
	//p==expunged, entry标记为删除, 但m.dirty != nil, 从m.dirty找不到entry
	//其他情况m.read.m[key], 并且如果m.dirty != nil, m.dirty[key]
	p unsafe.Pointer // *interface{}
}

func newEntry(i interface{}) *entry {
	return &entry{p: unsafe.Pointer(&i)}
}

// Load 返回存储在map的键值和是否存在标记
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
	//读取时, 先从只读map readOnly中读取
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	if !ok && read.amended {
		//若只读map没有key

		//上锁, 再次尝试读取; 因为上面没有加锁, 只是利用原子性, 避免dirty刷新造成遗漏
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		if !ok && read.amended {
			//真的在只读map中找不到key, 并且dirty发生过刷新,和只读不一致, 那么久从dirty中读取key
			e, ok = m.dirty[key]
			// 增加misses, 超过阈值就把dirty刷新到只读map
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if !ok {
		return nil, false
	}
	//对entry的p解引用,返回值
	return e.load()
}

//load 对entry的p解引用,返回值
func (e *entry) load() (value interface{}, ok bool) {
	p := atomic.LoadPointer(&e.p)
	if p == nil || p == expunged {
		//p必须是非nil, 并且p没有被标记为已删除的
		return nil, false
	}
	//解引用返回给应用程序
	return *(*interface{})(p), true
}

// Store 存储key-value
func (m *Map) Store(key, value interface{}) {
	//先从read中读取, 若在read查询到key, 并且是未expunged的, 那就直接cas更新entry的p指针
	read, _ := m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok && e.tryStore(&value) {
		return
	}

	//上锁, 再次尝试读取; 因为上面没有加锁, 只是利用原子性, 避免dirty刷新造成遗漏
	m.mu.Lock()
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		//如果在read中找到key, 先判断是否已经被标记删除
		if e.unexpungeLocked() {
			// 如果read已经把key标为已删除, 就需要在dirty中插入key, 等后面misses达到阈值再更新read
			m.dirty[key] = e
		}
		//更新值
		e.storeLocked(&value)
	} else if e, ok := m.dirty[key]; ok {
		//如果在read找不到key, 就去dirty找key, 并且更新value
		e.storeLocked(&value)
	} else {
		if !read.amended {
			// 若read和dirty都找不到key,证明是新key;
			// 先在dirty增加key, 然后标记read的amended为true, 标记read对比dirty是不完整的
			m.dirtyLocked()
			m.read.Store(readOnly{m: read.m, amended: true})
		}
		//存储新的key-value到dirty
		m.dirty[key] = newEntry(value)
	}
	m.mu.Unlock()
}

// tryStore 如果p没有标记为expunged, 则cas更新p的值的指针
func (e *entry) tryStore(i *interface{}) bool {
	for {
		p := atomic.LoadPointer(&e.p)
		if p == expunged {
			//若标记为expunged就不存储返回
			return false
		}
		if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
			return true
		}
	}
}

// unexpungeLocked 确保该条目没有标记为"删除"
//如果条目先前被删除,则必须将其添加到dirty映射中
func (e *entry) unexpungeLocked() (wasExpunged bool) {
	return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

// storeLocked 存储值的指针到entry的p
func (e *entry) storeLocked(i *interface{}) {
	atomic.StorePointer(&e.p, unsafe.Pointer(i))
}

// LoadOrStore 若key存在, 更新value返回旧value;若不存在,存储key-value,返回新value
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
	// Avoid locking if it's a clean hit.
	read, _ := m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		actual, loaded, ok := e.tryLoadOrStore(value)
		if ok {
			return actual, loaded
		}
	}

	m.mu.Lock()
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		if e.unexpungeLocked() {
			m.dirty[key] = e
		}
		actual, loaded, _ = e.tryLoadOrStore(value)
	} else if e, ok := m.dirty[key]; ok {
		actual, loaded, _ = e.tryLoadOrStore(value)
		m.missLocked()
	} else {
		if !read.amended {
			// We're adding the first new key to the dirty map.
			// Make sure it is allocated and mark the read-only map as incomplete.
			m.dirtyLocked()
			m.read.Store(readOnly{m: read.m, amended: true})
		}
		m.dirty[key] = newEntry(value)
		actual, loaded = value, false
	}
	m.mu.Unlock()

	return actual, loaded
}

// tryLoadOrStore 若key存在, 更新value返回旧value;若不存在,存储key-value,返回新value
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
	p := atomic.LoadPointer(&e.p)
	if p == expunged {
		return nil, false, false
	}
	if p != nil {
		return *(*interface{})(p), true, true
	}

	// Copy the interface after the first load to make this method more amenable
	// to escape analysis: if we hit the "load" path or the entry is expunged, we
	// shouldn't bother heap-allocating.
	ic := i
	for {
		if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
			return i, false, true
		}
		p = atomic.LoadPointer(&e.p)
		if p == expunged {
			return nil, false, false
		}
		if p != nil {
			return *(*interface{})(p), true, true
		}
	}
}

// Delete 删除key-value
func (m *Map) Delete(key interface{}) {
	//从read查询key
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	if !ok && read.amended {
		//上锁, 再次查询, 以防dirty刷新read造成错误
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		if !ok && read.amended {
			//key不存在read中(代表还没触发dirty刷新read,key不在read,可能在dirty中)
			delete(m.dirty, key)
		}
		m.mu.Unlock()
	}
	if ok {
		//key存在read中, 标记删除key
		e.delete()
	}
}

//delete 标记删除key
func (e *entry) delete() (hadValue bool) {
	for {
		p := atomic.LoadPointer(&e.p)
		//若已经被标记删除就直接返回(重入性)
		if p == nil || p == expunged {
			return false
		}
		//把entry的p设为nil,表示read标记删除key了
		if atomic.CompareAndSwapPointer(&e.p, p, nil) {
			return true
		}
	}
}

// Range 根据f排序key-value
func (m *Map) Range(f func(key, value interface{}) bool) {
	// 读取read, 若read不完整, 用dirty更新read
	read, _ := m.read.Load().(readOnly)
	if read.amended {
		// 上锁, 再次查询, 以防dirty刷新read造成错误
		m.mu.Lock()
		read, _ = m.read.Load().(readOnly)
		if read.amended {
			//dirty更新read
			read = readOnly{m: m.dirty}
			m.read.Store(read)
			m.dirty = nil
			m.misses = 0
		}
		m.mu.Unlock()
	}

	for k, e := range read.m {
		v, ok := e.load()
		if !ok {
			continue
		}
		if !f(k, v) {
			break
		}
	}
}

//missLocked 增加misses次数, 触发dirty完整更新read
func (m *Map) missLocked() {
	m.misses++
	if m.misses < len(m.dirty) {
		return
	}
	m.read.Store(readOnly{m: m.dirty})
	m.dirty = nil
	m.misses = 0
}

//dirtyLocked 将read中未删除的数据加入到dirty中
func (m *Map) dirtyLocked() {
	if m.dirty != nil {
		return
	}

	read, _ := m.read.Load().(readOnly)
	m.dirty = make(map[interface{}]*entry, len(read.m))
	for k, e := range read.m {
		if !e.tryExpungeLocked() {
			//在dirty中的key都是未标记为expunged的
			m.dirty[k] = e
		}
	}
}

//tryExpungeLocked 判断entry是否被标记删除,并且将标记为nil的entry更新标记为expunge
func (e *entry) tryExpungeLocked() (isExpunged bool) {
	p := atomic.LoadPointer(&e.p)
	for p == nil {
		if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
			return true
		}
		p = atomic.LoadPointer(&e.p)
	}
	return p == expunged
}
```