Go 1.19.3 sync.Cond原理简析

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

sync.Cond

cond是condition的缩写意为条件。sync.Cond有阻塞与结束阻塞的功能。结束阻塞可以分为两种方法一种是调用Signal()另一种是调用Broadcast()。前者会随机唤醒一个使用其Wait()方法阻塞的协程后者会唤醒所有使用其Wait()方法阻塞的协程。

// Cond implements a condition variable, a rendezvous point
// for goroutines waiting for or announcing the occurrence
// of an event.
// Cond 实现了一个条件变量一个等待或宣布事件发生的 goroutines 的集合点。
// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
// 每个 Cond 都有一个关联的 Locker L通常是互斥或 RWMutex在更改条件和调用 Wait 方法时必须保留该 L
// A Cond must not be copied after first use.
// Cond 在首次使用后不得复制。
// In the terminology of the Go memory model, Cond arranges that
// a call to Broadcast or Signal “synchronizes before” any Wait call
// that it unblocks.
// 在 Go 内存模型的术语中Cond 安排对广播或信号的调用在它取消阻止的任何 Wait 调用之前“同步”。
// For many simple use cases, users will be better off using channels than a
// Cond (Broadcast corresponds to closing a channel, and Signal corresponds to
// sending on a channel).
// 对于许多简单的用例用户使用通道比使用Cons更好广播对应于关闭通道信号对应于在通道上发送。
// For more on replacements for sync.Cond, see [Roberto Clapis's series on
// advanced concurrency patterns], as well as [Bryan Mills's talk on concurrency
// patterns].
// 有关替换同步的更多信息。Cond参见[Roberto Clapis关于高级并发模式的系列]以及[Bryan Mills关于并发模式的演讲]
// [Roberto Clapis's series on advanced concurrency patterns]: https://blogtitle.github.io/categories/concurrency/
// [Bryan Mills's talk on concurrency patterns]: https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view

Signal()和Broadcast()应发生在Wait()之前

Cond结构

以下源码若为标注出处则均来自src/sync/cond.go

type Cond struct {
	noCopy noCopy

	// L is held while observing or changing the condition
	L Locker

	notify  notifyList
	checker copyChecker
}

noCopy检测是否拷贝过的标记

// noCopy may be added to structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
//
// Note that it must not be embedded, due to the Lock and Unlock methods.
type noCopy struct{}

// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}

LLocker类型的接口可以接收实现该接口的所有类型如Mutex、RWMutex
src/sync/mutex.go

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
	Lock()
	Unlock()
}

notify维护一个通知列表
src/sync/runtime2.go

// Approximation of notifyList in runtime/sema.go. Size and alignment must
// agree.
type notifyList struct {
	wait   uint32
	notify uint32
	lock   uintptr // key field of the mutex
	head   unsafe.Pointer
	tail   unsafe.Pointer
}

checker检测Cond结构体是否发生过拷贝的对象

copyChecker 8字节数值指针类型copyChecker.check(),检查该接收者是否发生过拷贝在第一次调用时赋地址值之后调用则检测该地址是否发生变化

// copyChecker holds back pointer to itself to detect object copying.
type copyChecker uintptr // uint指针类型保存地址

func (c *copyChecker) check() { // 检查地址是否发生改变从而判断是否发生复制
	if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
		!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
		uintptr(*c) != uintptr(unsafe.Pointer(c)) {
		panic("sync.Cond is copied")
	}
}

NewCond 创建Cond传入一把锁

// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
	return &Cond{L: l}
}

Cond.Wait 阻塞所在的goroutine

技巧Wait()应被锁保护

// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
// 等待以原子方式解锁 c.L 并暂停调用 goroutine的执行。稍后恢复执行后Wait 在返回之前锁定 c.L。与其他系统不同除非被广播或信号唤醒否则等待无法返回。
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
// 由于 c.L 在 Wait 首次恢复时未锁定因此调用方通常无法假定 Wait 返回时条件为 true。相反调用方应该在循环中等待
//	c.L.Lock()
//	for !condition() {
//	    c.Wait()
//	}
//	... make use of condition ... 使用条件
//	c.L.Unlock()
func (c *Cond) Wait() {
	c.checker.check() // copy 检查
	t := runtime_notifyListAdd(&c.notify) // 向通知列表列表中加入该通知
	c.L.Unlock() // 暂时解锁
	runtime_notifyListWait(&c.notify, t) //通知操作
	c.L.Lock() //加锁还原状态
}

Cond.Signal 随机通知一个被Cond.Wait阻塞的协程结束阻塞

// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
// 信号唤醒一个等待 c 的 goroutine如果有的话。允许但不是要求调用方在呼叫期间保持 c.L。
// Signal() does not affect goroutine scheduling priority; if other goroutines
// are attempting to lock c.L, they may be awoken before a "waiting" goroutine.
// Signal 不影响 goroutine 调度优先级;如果其他 goroutines 试图锁定 c.L它们可能会在“等待”的 goroutines 之前被唤醒。
func (c *Cond) Signal() {
	c.checker.check() // copy检查
	runtime_notifyListNotifyOne(&c.notify) //随机通知一个被Wait阻塞的协程取消阻塞
}

Cond.Broadcast 通知所有被Cond.Wait阻塞的协程结束阻塞

// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
	c.checker.check() // copy 检查
	runtime_notifyListNotifyAll(&c.notify) //通知所有阻塞协程
}

src/runtme/sema.go notifyListAdd() 通知列表

// notifyListAdd adds the caller to a notify list such that it can receive
// notifications. The caller must eventually call notifyListWait to wait for
// such a notification, passing the returned ticket number.
//
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
	// This may be called concurrently, for example, when called from
	// sync.Cond.Wait while holding a RWMutex in read mode.
	return atomic.Xadd(&l.wait, 1) - 1
}

src/runtime/sema.go notifyListNotifyAll()

// notifyListNotifyAll notifies all entries in the list.
//
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
	// Fast-path: if there are no new waiters since the last notification
	// we don't need to acquire the lock.
	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
		return
	}

	// Pull the list out into a local variable, waiters will be readied
	// outside the lock.
	lockWithRank(&l.lock, lockRankNotifyList)
	s := l.head
	l.head = nil
	l.tail = nil

	// Update the next ticket to be notified. We can set it to the current
	// value of wait because any previous waiters are already in the list
	// or will notice that they have already been notified when trying to
	// add themselves to the list.
	atomic.Store(&l.notify, atomic.Load(&l.wait))
	unlock(&l.lock)

	// Go through the local list and ready all waiters.
	for s != nil {
		next := s.next
		s.next = nil
		readyWithTime(s, 4)
		s = next
	}
}

src/runtime/sema.go notifyListNotifyOne()

// notifyListNotifyOne notifies one entry in the list.
//
//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
	// Fast-path: if there are no new waiters since the last notification
	// we don't need to acquire the lock at all.
	if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
		return
	}

	lockWithRank(&l.lock, lockRankNotifyList)

	// Re-check under the lock if we need to do anything.
	t := l.notify
	if t == atomic.Load(&l.wait) {
		unlock(&l.lock)
		return
	}

	// Update the next notify ticket number.
	atomic.Store(&l.notify, t+1)

	// Try to find the g that needs to be notified.
	// If it hasn't made it to the list yet we won't find it,
	// but it won't park itself once it sees the new notify number.
	//
	// This scan looks linear but essentially always stops quickly.
	// Because g's queue separately from taking numbers,
	// there may be minor reorderings in the list, but we
	// expect the g we're looking for to be near the front.
	// The g has others in front of it on the list only to the
	// extent that it lost the race, so the iteration will not
	// be too long. This applies even when the g is missing:
	// it hasn't yet gotten to sleep and has lost the race to
	// the (few) other g's that we find on the list.
	for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
		if s.ticket == t {
			n := s.next
			if p != nil {
				p.next = n
			} else {
				l.head = n
			}
			if n == nil {
				l.tail = p
			}
			unlock(&l.lock)
			s.next = nil
			readyWithTime(s, 4)
			return
		}
	}
	unlock(&l.lock)
}

一个用广播通知所有消费者开始工作的例子

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func consumer(cond *sync.Cond, id int) {
	cond.L.Lock()
	cond.Wait()
	rand.Seed(time.Now().UnixNano())
	fmt.Println("消费者协程", id, "执行开始")
	time.Sleep(time.Second * time.Duration(rand.Intn(3)))
	// do something
	cond.L.Unlock()
	fmt.Println("消费者协程", id, "执行结束")
}

func main() {
	lock := new(sync.Mutex)
	cond := sync.NewCond(lock)
	for i := 0; i < 10; i++ {
		go consumer(cond, i)
	}

	time.Sleep(time.Second)
	// cond.Signal() //通知一个等待的协程结束阻塞
	cond.Broadcast() //通知所有等待的协程结束阻塞

	time.Sleep(time.Second * 30)

	fmt.Println("执行结束")
}
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: go