Go 1.19.3 sync.Mutex原理简析

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

互斥锁

互斥数学名词事件A与事件B在任何一次试验中都不会同时发生则称事件A与事件B互斥。来自百度百科
互斥锁的出现时为了保护共享资源同一时刻只有锁的持有者才能操作该资源。

Go Mutex

以下代码均来自src/sync/mutex.go
该包中定义了两个外部函数用于panic操作

// Provided by runtime via linkname.
func throw(string)
func fatal(string)

Locker 接口 定义了锁的行为锁定与解锁

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
	Lock()   //锁定
	Unlock() //解锁
}

Mutext 结构体实现了Locker接口作为互斥锁使用

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex. 
//
// A Mutex must not be copied after first use.
//
// In the terminology of the Go memory model,
// the n'th call to Unlock “synchronizes before” the m'th call to Lock
// for any n < m.
// A successful call to TryLock is equivalent to a call to Lock.
// A failed call to TryLock does not establish any “synchronizes before”
// relation at all.
type Mutex struct {
	state int32
	sema  uint32
}

互斥锁的零值代表该锁是解锁状态。在Go内存模型中第n次调用Unlock"在第m次调用Lock之前同步",对于任何n都是小于m的。成功调用TryLock相当于调用Lock。对TryLock的失败调用不建立任何“先行同步”关系。

Mutex中的两个字段分别是:
sema:信号量在锁饥饿状态下使用
state: 表示锁的状态其为一个int32类型位图该位图有32个bit。
位图的状态如下
mutexLocked低1位表示锁定状态 1代表锁定0代表未锁定
mutexWoken低2位表示锁唤醒状态协程在正常状态下被唤醒
mutexStarving低3位表示锁饥饿状态
mutexWaiterShift低4位之后表示等待被唤醒的协程数量

const (
	mutexLocked = 1 << iota // mutex is locked
	mutexWoken
	mutexStarving
	mutexWaiterShift = iota

	// Mutex fairness.
	// 锁公平性
	// Mutex can be in 2 modes of operations: normal and starvation.
	// 锁有两种模式正常和饥饿
	// In normal mode waiters are queued in FIFO order, 
	// 在正常模式下等待者按FIFO顺序排队
	
     //but a woken up waiter does not own the mutex and competes with new arriving goroutines over the ownership. 
    // 但被唤醒的等待者并不拥有互斥锁而是与新到的goroutine争夺所有权。
	// New arriving goroutines have an advantage -- they are
	// already running on CPU and there can be lots of them, so a woken up
	// waiter has good chances of losing. 
	// 新来的goroutine有一个优势——他们是已经在CPU上运行并且可能有很多,所以醒来了等待者很有可能无法抢占。
	//In such case it is queued at front of the wait queue.
	// 在这种情况下它在前面排队等待队列的。
	//If a waiter fails to acquire the mutex for more than 1ms, it switches mutex to the starvation mode.
	//如果等待者在超过1ms的时间内未能获取互斥锁它将互斥锁切换到饥饿模式
	// In starvation mode ownership of the mutex is directly handed off from the unlocking goroutine to the waiter at the front of the queue.
    // 在饥饿模式下互斥锁的所有权直接从解锁的goroutine移交给队列前面的等待者
	// New arriving goroutines don't try to acquire the mutex even if it appears to be unlocked, and don't try to spin. Instead they queue themselves at the tail of the wait queue.
	// 新到达的goroutine不会尝试获取互斥对象即使它出现了//不要试图旋转。相反他们在等待队列的尾部
	// If a waiter receives ownership of the mutex and sees that either
	// 如果等待者收到互斥锁的所有权并看到
	// (1) it is the last waiter in the queue, 
    // 1 这是排队的最后一个等待者
    // or (2) it waited for less than 1 ms, it switches mutex back to normal operation mode.
	//或2等待少于1ms它将互斥切换回正常操作模式
	// Normal mode has considerably better performance as a goroutine can acquire a mutex several times in a row even if there are blocked waiters.
	// 普通模式的性能要比常规模式好得多即使有被阻塞的等待者互斥锁也会连续出现几次
	// Starvation mode is important to prevent pathological cases of tail latency.
	// 饥饿模式对于预防尾部延迟的情况非常重要
	starvationThresholdNs = 1e6
)

starvationThresholdNs表示等待时间的判定值1 * 10 ^ 6 纳秒

(m *Mutex) Lock() 上锁

Lock时会首先尝试使用CAS操作进行锁的状态更新若成功则直接返回代表上锁成功。若失败则执行慢路径慢路径中若自旋长时间获取不到锁则使用信号量进行同步操作。

// Lock locks m.  Lock 锁定m
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
// 如果锁已在使用则调用的goroutine将阻塞直到可以获得互斥锁。
func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.  快速路径获取未锁定的互斥锁
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { //若锁的状态为0代表未上锁则cas操作将锁的状态改为mutexLocked即1。代表已经上锁
		if race.Enabled { // race相关的
			race.Acquire(unsafe.Pointer(m))
		}
		return // 抢占成功则直接返回
	}
	// 抢占不成功进入自旋状态尝试抢占
	// Slow path (outlined so that the fast path can be inlined)
	m.lockSlow() // 慢路径做自旋操作
}

(m *Mutex) lockSlow() 慢路径尝试上锁

func (m *Mutex) lockSlow() {
	var waitStartTime int64 //等待开始时间以纳秒计
	starving := false  // 是否饥饿
	awoke := false     // 是否是醒着的 
	iter := 0          // 迭代次数
	old := m.state     // 旧的状态
	for { //开始自旋
		// Don't spin in starvation mode, ownership is handed off to waiters
		// so we won't be able to acquire the mutex anyway.
		// 不可以在饥饿状态下自旋所有权移交给等待者我们无论如何都无法获取互斥锁
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {  // 若 旧状态中的 只有锁定的状态位饥饿的状态位为空并且可以尝试自旋iter次则执行下边操作
		
			// Active spinning makes sense. 主动旋转有意义
			// Try to set mutexWoken flag to inform Unlock
			// to not wake other blocked goroutines. 尝试设置mutexWoken标志以通知Unlock不唤醒其他被阻塞的goroutine
			
			// 如果非唤醒状态并且mutexWoken状态位为空并且等待者数量非0并且cas操作旧状态与当前状态一致且可以更新状态位mutexWoken锁唤醒状态。则设置awoke 为true表示是醒着的
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin() //做自旋操作等待抢占
			iter++ //尝试次数+1
			old = m.state // 更新旧的状态
			continue // 进入下一轮自旋
		}
		// 不满足自旋条件
		new := old // 转移状态
		// Don't try to acquire starving mutex, new arriving goroutines must queue.不要试图获取饥饿的互斥体新到达的goroutine必须排队。
		if old&mutexStarving == 0 { // 若旧状态中无饥饿状态
			new |= mutexLocked // 设置锁定状态
		}
		if old&(mutexLocked|mutexStarving) != 0 { // 锁定状态或饥饿状态或叠加等待者个数增加
			new += 1 << mutexWaiterShift
		}
		// The current goroutine switches mutex to starvation mode.
		// But if the mutex is currently unlocked, don't do the switch.
		// Unlock expects that starving mutex has waiters, which will not
		// be true in this case.
		//当前的goroutine将互斥切换到饥饿模式。
		//但如果互斥锁当前未锁定则不要进行切换。
		//Unlock期望饥饿的互斥体有等待者而等待者不会在这种情况下是正确的
		if starving && old&mutexLocked != 0 { //饥饿 并且 已锁定
			new |= mutexStarving //置饥饿状态
		}
		if awoke { //若醒着状态
			// The goroutine has been woken from sleep, goroutine 已被唤醒
			// so we need to reset the flag in either case. 重置标记为
			if new&mutexWoken == 0 { // 有其他goroutine修改了状态panic
				throw("sync: inconsistent mutex state")
			}
			new &^= mutexWoken //清空状态位
		}
		
		if atomic.CompareAndSwapInt32(&m.state, old, new) { //尝试修改状态
			if old&(mutexLocked|mutexStarving) == 0 { // 旧状态非锁定或饥饿状态
				break // locked the mutex with CAS  用cas操作上锁成功返回
			}
			// If we were already waiting before, queue at the front of the queue. 如果我们之前已经在等待请在队列的前面排队。
			queueLifo := waitStartTime != 0 //判定第一次循环时间状态
			if waitStartTime == 0 { //赋值时间
				waitStartTime = runtime_nanotime()
			}
			runtime_SemacquireMutex(&m.sema, queueLifo, 1) //用信号量抢占
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs //饥饿 或 等待时间过长
			old = m.state //状态转移
			if old&mutexStarving != 0 { //锁饥饿状态
				// If this goroutine was woken and mutex is in starvation mode, 如果这个goroutine被唤醒并且互斥锁处于饥饿模式
				// ownership was handed off to us but mutex is in somewhat
				// inconsistent state: mutexLocked is not set and we are still
				// accounted as waiter. Fix that.
				// 所有权已移交给我们但互斥锁在某种程度上不一致的状态互斥锁未设置我们仍然作为等待者。解决这个问题。
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { //状态不一致panic
					throw("sync: inconsistent mutex state")
				}
				delta := int32(mutexLocked - 1<<mutexWaiterShift) //状态迁移
				if !starving || old>>mutexWaiterShift == 1 { //非饥饿只有一个等待者
					// Exit starvation mode.
					// Critical to do it here and consider wait time.
					// Starvation mode is so inefficient, that two goroutines
					// can go lock-step infinitely once they switch mutex
					// to starvation mode.
					//退出饥饿模式。
					//在这里进行此操作并考虑等待时间至关重要。
					//饥饿模式是如此低效以至于两次
					//一旦切换互斥锁就可以无限地锁步
					//到饥饿模式。
					delta -= mutexStarving  //清除饥饿状态
				}
				atomic.AddInt32(&m.state, delta) //清除状态
				break
			}
			awoke = true  //唤醒
			iter = 0 // 重置迭代计数器
		} else {
			old = m.state //状态迁移
		}
	}

	if race.Enabled { // race相关
		race.Acquire(unsafe.Pointer(m))
	}
}

(m *Mutex) TryLock() 尝试上锁可以的话就加锁不可以就返回

// TryLock tries to lock m and reports whether it succeeded.TryLock尝试锁定m并报告是否成功。
//
// Note that while correct uses of TryLock do exist, they are rare,
// and use of TryLock is often a sign of a deeper problem
// in a particular use of mutexes.
// 请注意虽然TryLock的正确用法确实存在
// 而TryLock的使用往往是一个更深层次问题的标志
// 在互斥体的特定使用中。
func (m *Mutex) TryLock() bool {
	old := m.state  //状态转移
	if old&(mutexLocked|mutexStarving) != 0 { // 旧状态为锁定或锁饥饿或叠加态则直接返回
		return false
	}

	// There may be a goroutine waiting for the mutex, but we are
	// running now and can try to grab the mutex before that
	// goroutine wakes up.
	//可能有一个goroutine在等待互斥锁但我们是
	//现在正在运行并且可以尝试在此之前获取互斥锁goroutine醒来了。
	if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) { //尝试解锁更新状态若不成功则返回false否则后续返回true
		return false
	}

	if race.Enabled { //race相关
		race.Acquire(unsafe.Pointer(m))
	}
	return true
}

(m *Mutex) Unlock() 解锁

解锁操作首先尝试用原子操作进行锁的状态更新若成功则直接返回代表解锁成功若失败则进入慢路径使用自旋和信号量释放的方式尝试解锁。

// Unlock unlocks m. Unlock解锁m
// It is a run-time error if m is not locked on entry to Unlock.
// 如果m在进入解锁时未被锁定这是一个运行时错误。

// A locked Mutex is not associated with a particular goroutine.
// 锁定的Mutex与特定的goroutine没有关联。
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
// 允许一个goroutine锁定Mutex然后安排另一个goroutine来解锁它。
func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked) // 快速路径用cas操作尝试去掉mutexLocked状态位
	
	if new != 0 { // 解锁失败或存在其他状态进入慢路径
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. 为了在跟踪过程中隐藏解锁我们在跟踪GoUnblock时跳过一个额外的帧。
		m.unlockSlow(new)
	}
}

(m *Mutex) unlockSlow(new int32) 解锁的慢路径

func (m *Mutex) unlockSlow(new int32) {
	if (new+mutexLocked)&mutexLocked == 0 { //重复解锁
		fatal("sync: unlock of unlocked mutex")
	}
	if new&mutexStarving == 0 { //非饥饿模式
		old := new // 状态转移
		for { // 自旋
			// If there are no waiters or a goroutine has already
			// been woken or grabbed the lock, no need to wake anyone.
			//如果没有等待者或者已经有一个goroutine被唤醒或持有住锁无需唤醒任何人。
			
			// In starvation mode ownership is directly handed off from unlocking
			// goroutine to the next waiter. 
			// 在饥饿模式下所有权直接从解锁转移到下一个等待者。
			// We are not part of this chain, since we did not observe mutexStarving when we unlocked the mutex above.
			//我们不是这个链的一部分因为我们在解锁上面的互斥锁时没有观察到互斥锁饥饿。
			// So get off the way. 所以别挡路
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { //没有等待者并且锁定或唤醒或饥饿状态或叠加状态
				return //直接返回
			}
			// Grab the right to wake someone. 唤醒某个goroutine
			new = (old - 1<<mutexWaiterShift) | mutexWoken //减少等待者并添加唤醒状态
			if atomic.CompareAndSwapInt32(&m.state, old, new) { //可以更新状态
				runtime_Semrelease(&m.sema, false, 1) //释放信号量
				return // 返回
			}
			old = m.state // 状态迁移
		}
	} else {
		// Starving mode: handoff mutex ownership to the next waiter, and yield
		// our time slice so that the next waiter can start to run immediately.
		//饥饿模式将互斥体所有权移交给下一个等待者并让位我们的时间片以便下一个等待者可以立即开始运行。
		// Note: mutexLocked is not set, the waiter will set it after wakeup.
		// 注意互斥锁未设置等待者将在唤醒后设置。
		
		// But mutex is still considered locked if mutexStarving is set,
		// so new coming goroutines won't acquire it.
		// 但如果设置了mutexStaring则互斥锁仍被认为是锁定的
		// 所以新来的goroutine不会获得它
		runtime_Semrelease(&m.sema, true, 1) // 释放信号量
	}
}

总结

sync.Mutex是使用CAS原子操作自旋信号量同步的方式实现的其结构体内部使用位图存储锁的状态。这是一把重量级的锁。在同步过程中可以保证锁内代码状态的唯一性。Mutex一经创建不能被复制。在读多写少的场景中可以使用sync.RWMutex做同步操作。

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: go