Go 1.19.3 sync.WaitGroup原理简析
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
WaitGroup 同步屏障
WaitGroup可以起到同步屏障的作用假设main协程中有协程A协程B。main协程不知道协程A、B协程何时执行完毕但是main协程还需要等待这两个协程都执行完才能退出。此时有非常笨拙的方法比如用for阻塞用select阻塞用网络心跳服务阻塞等等都需要添加判定条件。最好的选择是用WaitGroup的Wait方法进行阻塞。
WaitGroup有三个对外开放的API分别是Add(delta int)
,Done()
,Wait()
。在协程开始前将调用Add(2),表示等待两个协程的退出在A、B中使用defer wg.Done()当WaitGroup中的计数器归零时Wait()会取消阻塞。
WaitGroup 结构
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
//
// In the terminology of the Go memory model, a call to Done
// “synchronizes before” the return of any Wait call that it unblocks.
type WaitGroup struct {
noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers only guarantee that 64-bit fields are 32-bit aligned.
// For this reason on 32 bit architectures we need to check in state()
// if state1 is aligned or not, and dynamically "swap" the field order if
// needed.
state1 uint64
state2 uint32
}
WaitGroup结构体中有三个字段
noCopy是为了防止拷贝而设定的他是一个空结构体。go vet工具可以报告含有noCopy字段的结构体是否发生了拷贝。
state1 为uint64类型
state2 为uint32类型
这两个字段因为在结构体内处于连续的内存区域且内存对齐。所以这两个字段可以转换成一个[3]uint32类型的数组当做三个字段使用。
[3]uint32数组中"三个元素"表示三种计数器在32位操作系统中其表示运行计数等待计数信号计数。在64位操作系统中其表示信号计数运行计数等待计数。
由于32位操作系统无法对uint64类型的数据做原子操作。所以才有了[3]uint32的设计。
state 获取statep和semap指针其分别代表状态和信号量状态中分为两部分高32位为运行计数低32位为等待计数
// state returns pointers to the state and sema fields stored within wg.state*.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { //32位操作系统
// state1 is 64-bit aligned: nothing to do.
return &wg.state1, &wg.state2
} else { // 64位操作系统
// state1 is 32-bit aligned but not 64-bit aligned: this means that
// (&state1)+4 is 64-bit aligned.
state := (*[3]uint32)(unsafe.Pointer(&wg.state1))
return (*uint64)(unsafe.Pointer(&state[1])), &state[0]
}
}
Add 增加计数器
// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state() // 获取状态和信号量指针
if race.Enabled { // race相关
_ = *statep // trigger nil deref early
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
state := atomic.AddUint64(statep, uint64(delta)<<32) //原子操作状态高32位做加法加上传入的计数负数也一样操作通过补码计算
v := int32(state >> 32) // state高32位为运行计数器左移操作将高32位移动到低32位方便转换。
w := uint32(state) // state低32位为等待计数标识等待协程个数
if race.Enabled && delta > 0 && v == int32(delta) { // race 相关
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
// several concurrent wg.counter transitions from 0.
race.Read(unsafe.Pointer(semap))
}
if v < 0 { // 运行计数 < 0 非法计数
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) { //非法计数
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 { // 若运行计数 > 0 或 等待计数 = 0
return //则直接返回
}
// 执行至此则说明 运行计数等于0 或 等待计数 != 0 。运行计数等于0代表所有goroutine都执行完了此时可以执行释放操作结束Wait的阻塞。
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
if *statep != state { //非法状态
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 将状态归零唤醒等待的goroutine
// Reset waiters count to 0.
*statep = 0
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
Done 计数器-1操作。
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
Wait 阻塞直到计数器归零Wait方法可以在多个goroutine中阻塞这也是设计等待计数器的原因。
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
statep, semap := wg.state() // 获得两个状态
if race.Enabled { // race相关
_ = *statep // trigger nil deref early
race.Disable()
}
for { // 自旋
state := atomic.LoadUint64(statep) //原子操作加载状态
v := int32(state >> 32) // 获得运行计数器
w := uint32(state) // 获得等待计数器
if v == 0 { //运行计数器归零则直接返回
// Counter is 0, no need to wait.
if race.Enabled { // race相关
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// 增加等待计数器个数 原子+1操作
// Increment waiters count.
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 { // race相关
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(semap))
}
runtime_Semacquire(semap) //使用信号量进行阻塞为保证goroutine抓住执行权限不放
if *statep != 0 { //非法状态
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled { // race相关
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return // 直接返回
}
}
}
透视一下底层计数器的变化
这里其实可以直接将wg类型通过unsafe.Pointer转换成*[3]uint32类型的指针因为WaitGroup中的noCopy字段是空结构体其不占内存空间。fmt.Println(unsafe.Sizeof(struct{}{})) // 0
package main
import (
"fmt"
"math/rand"
"sync"
"time"
"unsafe"
)
func main() {
rand.Seed(time.Now().UnixNano())
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
time.Sleep(time.Second * time.Duration(rand.Intn(10)))
defer wg.Done()
}()
}
go func() {
for {
p := (*waitGroup)(unsafe.Pointer(&wg))
p1 := (*[3]uint32)(unsafe.Pointer(&p.state1))
fmt.Println(p1)
time.Sleep(time.Second)
}
}()
wg.Wait()
p := (*waitGroup)(unsafe.Pointer(&wg))
p1 := (*[3]uint32)(unsafe.Pointer(&p.state1))
fmt.Println(p1)
}
type waitGroup struct {
noCopy struct{}
state1 uint64
state2 uint32
}