go

sync.RWMutex

sync.RWMutex源码分析

Posted by Liangjf on July 20, 2020

sync.RWMutex源码分析

读写锁

原理: 读操作可并发重入,写操作是互斥的。 读写锁通常用互斥锁、条件变量、信号量实现

实现: 通过原子变量(模拟条件变量), 信号量与互斥锁结合的方式, 实现读写锁; 主要是借助原子变量, 信号量控制模拟了读写的等待和唤醒

一直有读锁进来, 写锁就一直饿死?

不会的, 通过readerWait会记录下写锁前的最后一个读锁, 这个读锁释放锁后会通知写锁执行, 这样就不会饿死了.

相当于写锁会把读锁分为两部分:

  • 前一部分是写锁前的读锁, 写锁会等待它们都完成, 由最后一个读锁唤醒写锁;
  • 后一部分是写锁后的读锁, 它们会等待写锁完成再被唤醒执行

场景分析

1.写操作是如何阻止写操作的

读写锁包含一个互斥锁(Mutex),写锁定必须要先获取该互斥锁,如果互斥锁已被协程A获取(或者协程A在阻塞等待读结束),意味着协程A获取了互斥锁,那么协程B只能阻塞等待该互斥锁。所以,写操作依赖互斥锁阻止其他的写操作。

2.写操作是如何阻止读操作的

RWMutex.readerCount是个整型值,用于表示读者数量,不考虑写操作的情况下,每次读锁定将该值+1,每次解除读锁定将该值-1,所以readerCount取值为[0, N],N为读者个数,实际上最大可支持2^30个并发读者。当写锁定进行时,会先将readerCount减去2^30,从而readerCount变成了负值,此时再有读锁定到来时检测到readerCount为负值,便知道有写操作在进行,只好阻塞等待。而真实的读操作个数并不会丢失,只需要将readerCount加上2^30即可获得。所以,写操作将readerCount变成负值来阻止读操作的。

3.读操作是如何阻止写操作的

读锁定会先将RWMutext.readerCount加1,此时写操作到来时发现读者数量不为0,会阻塞等待所有读操作结束。所以,读操作通过readerCount来将来阻止写操作的。

4.为什么写锁定不会被饿死

写操作要等待读操作结束后才可以获得锁,写操作等待期间可能还有新的读操作持续到来,如果写操作等待所有读操作结束,很可能被饿死。然而,通过RWMutex.readerWait可完美解决这个问题。写操作到来时,会把RWMutex.readerCount值拷贝到RWMutex.readerWait中,用于标记排在写操作前面的读者个数。前面的读操作结束后,除了会递减RWMutex.readerCount,还会递减RWMutex.readerWait值,当RWMutex.readerWait值变为0时唤醒写操作。所以以,写操作就相当于把一段连续的读操作划分成两部分,前面的读操作结束后唤醒写操作,写操作结束后唤醒后面的读操作

参考

https://my.oschina.net/renhc/blog/2878292

```go
package sync

import (
	"internal/race"
	"sync/atomic"
	"unsafe"
)


type RWMutex struct {
	w           Mutex  // held if there are pending writers
	writerSem   uint32 // 写操作等待读操作完成的信号量
	readerSem   uint32 // 读操作等待写操作完成的信号量
	readerCount int32  // 读的个数
	readerWait  int32  // 写阻塞时读者个数
}

//最大只支持 1 << 30 个读锁
const rwmutexMaxReaders = 1 << 30

/*
RLock 读加锁
	增加读操作计数,即readerCount++
	阻塞等待写操作结束(若有, readerCount<0)
*/
func (rw *RWMutex) RLock() {
	//是否开启检测race
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	//这里分两种情况:
	//1.当判断大于等于0, 证明当前没有写锁, 那么可以上读锁, 并且readerCount原子加1(读锁可重入[只要匹配了释放次数就行])
	//2.当判断小于0, 证明当前有写锁(Lock时会readerCount-rwmutexMaxReaders, 因此会小于0), 所以通过readerSem读信号量, 使读操作睡眠等待
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// 当前有个写锁, 读操作阻塞等待写锁释放
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
	//是否开启检测race
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
	}
}

/*
RUnlock 读解锁
	减少读操作计数,即readerCount--
	唤醒等待写操作的协程(若有, readerCount==0)
*/
func (rw *RWMutex) RUnlock() {
	//是否开启检测race
	if race.Enabled {
		_ = rw.w.state
		race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
		race.Disable()
	}

	//这里分两种情况:
	//释放读锁, readerCount减1
	//1.若readerCount大于0, 证明当前还有读锁, 直接结束本次操作
	//2.若readerCount小于等于0, 证明已经没有读锁, 可以唤醒写锁(若有)
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// Outlined slow-path to allow the fast-path to be inlined
		rw.rUnlockSlow(r)
	}
	//是否开启检测race
	if race.Enabled {
		race.Enable()
	}
}

func (rw *RWMutex) rUnlockSlow(r int32) {
	//1.本来就没读锁, 调用RUnlock就发生panic
	//2.超过读锁的最大限制就发生panic
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		race.Enable()
		throw("sync: RUnlock of unlocked RWMutex")
	}
	// 如果当前有写锁等待
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// 写锁前的最后一个读锁唤醒写锁执行
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

/*
Lock 写上锁
	获取互斥锁
	阻塞等待所有读操作结束(若有)
*/
func (rw *RWMutex) Lock() {
	//是否开启检测race
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	//获取互斥锁
	rw.w.Lock()
	// 通知读锁有一个写锁到来
	// readerCount-rwmutexMaxReaders小于0, 再加回rwmutexMaxReaders
	// 若r仍然不为0, 代表当前还有读锁
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// 等待写锁前面的读锁释放, 所以若不为0就阻塞写锁, 等待rUnlockSlow-rUnlockSlow的readerWait-1直至0倍唤醒写锁
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		//阻塞写锁
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
	//是否开启检测race
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
		race.Acquire(unsafe.Pointer(&rw.writerSem))
	}
}

/*
Unlock 写解锁
	唤醒因读锁定而被阻塞的协程(若有)
	解除互斥锁
*/
func (rw *RWMutex) Unlock() {
	//是否开启检测race
	if race.Enabled {
		_ = rw.w.state
		race.Release(unsafe.Pointer(&rw.readerSem))
		race.Disable()
	}

	// 增加readerCount, 若超过读锁的最大限制, 触发panic
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		race.Enable()
		throw("sync: Unlock of unlocked RWMutex")
	}
	// 解除阻塞的读锁(若有)
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}

	// 释放互斥锁
	rw.w.Unlock()

	//是否开启检测race
	if race.Enabled {
		race.Enable()
	}
}

func (rw *RWMutex) RLocker() Locker {
	return (*rlocker)(rw)
}

type rlocker RWMutex

func (r *rlocker) Lock()   { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }
```