Go 1.19.3 sync.WaitGroup原理简析

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

WaitGroup 同步屏障

WaitGroup可以起到同步屏障的作用假设main协程中有协程A协程B。main协程不知道协程A、B协程何时执行完毕但是main协程还需要等待这两个协程都执行完才能退出。此时有非常笨拙的方法比如用for阻塞用select阻塞用网络心跳服务阻塞等等都需要添加判定条件。最好的选择是用WaitGroup的Wait方法进行阻塞。
WaitGroup有三个对外开放的API分别是Add(delta int),Done(),Wait()。在协程开始前将调用Add(2),表示等待两个协程的退出在A、B中使用defer wg.Done()当WaitGroup中的计数器归零时Wait()会取消阻塞。

WaitGroup 结构

// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
//
// In the terminology of the Go memory model, a call to Done
// “synchronizes before” the return of any Wait call that it unblocks.
type WaitGroup struct {
	noCopy noCopy

	// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
	// 64-bit atomic operations require 64-bit alignment, but 32-bit
	// compilers only guarantee that 64-bit fields are 32-bit aligned.
	// For this reason on 32 bit architectures we need to check in state()
	// if state1 is aligned or not, and dynamically "swap" the field order if
	// needed.
	state1 uint64
	state2 uint32
}

WaitGroup结构体中有三个字段
noCopy是为了防止拷贝而设定的他是一个空结构体。go vet工具可以报告含有noCopy字段的结构体是否发生了拷贝。
state1 为uint64类型
state2 为uint32类型
这两个字段因为在结构体内处于连续的内存区域且内存对齐。所以这两个字段可以转换成一个[3]uint32类型的数组当做三个字段使用。
[3]uint32数组中"三个元素"表示三种计数器在32位操作系统中其表示运行计数等待计数信号计数。在64位操作系统中其表示信号计数运行计数等待计数。
由于32位操作系统无法对uint64类型的数据做原子操作。所以才有了[3]uint32的设计。

state 获取statep和semap指针其分别代表状态和信号量状态中分为两部分高32位为运行计数低32位为等待计数

// state returns pointers to the state and sema fields stored within wg.state*.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
	if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { //32位操作系统
		// state1 is 64-bit aligned: nothing to do.
		return &wg.state1, &wg.state2
	} else { // 64位操作系统
		// state1 is 32-bit aligned but not 64-bit aligned: this means that
		// (&state1)+4 is 64-bit aligned.
		state := (*[3]uint32)(unsafe.Pointer(&wg.state1))
		return (*uint64)(unsafe.Pointer(&state[1])), &state[0]
	}
}

Add 增加计数器

// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
	statep, semap := wg.state() // 获取状态和信号量指针
	if race.Enabled { // race相关
		_ = *statep // trigger nil deref early
		if delta < 0 {
			// Synchronize decrements with Wait.
			race.ReleaseMerge(unsafe.Pointer(wg))
		}
		race.Disable()
		defer race.Enable()
	}
	state := atomic.AddUint64(statep, uint64(delta)<<32) //原子操作状态高32位做加法加上传入的计数负数也一样操作通过补码计算
	v := int32(state >> 32) // state高32位为运行计数器左移操作将高32位移动到低32位方便转换。
	w := uint32(state)  // state低32位为等待计数标识等待协程个数
	if race.Enabled && delta > 0 && v == int32(delta) { // race 相关
		// The first increment must be synchronized with Wait.
		// Need to model this as a read, because there can be
		// several concurrent wg.counter transitions from 0.
		race.Read(unsafe.Pointer(semap))
	}
	if v < 0 { // 运行计数 < 0 非法计数
		panic("sync: negative WaitGroup counter")
	}
	if w != 0 && delta > 0 && v == int32(delta) { //非法计数
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	if v > 0 || w == 0 { // 若运行计数 > 0 或 等待计数 = 0
		return //则直接返回
	}
	
	// 执行至此则说明 运行计数等于0 或 等待计数 != 0 。运行计数等于0代表所有goroutine都执行完了此时可以执行释放操作结束Wait的阻塞。
	
	// This goroutine has set counter to 0 when waiters > 0.
	// Now there can't be concurrent mutations of state:
	// - Adds must not happen concurrently with Wait,
	// - Wait does not increment waiters if it sees counter == 0.
	// Still do a cheap sanity check to detect WaitGroup misuse.
	if *statep != state { //非法状态
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}

    // 将状态归零唤醒等待的goroutine
	// Reset waiters count to 0.
	*statep = 0
	for ; w != 0; w-- {
		runtime_Semrelease(semap, false, 0)
	}
}

Done 计数器-1操作。

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

Wait 阻塞直到计数器归零Wait方法可以在多个goroutine中阻塞这也是设计等待计数器的原因。

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
	statep, semap := wg.state() // 获得两个状态
	if race.Enabled { // race相关
		_ = *statep // trigger nil deref early
		race.Disable()
	}
	for { // 自旋
		state := atomic.LoadUint64(statep) //原子操作加载状态
		v := int32(state >> 32) // 获得运行计数器
		w := uint32(state) // 获得等待计数器
		if v == 0 { //运行计数器归零则直接返回
			// Counter is 0, no need to wait.
			if race.Enabled { // race相关
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			return
		}
		// 增加等待计数器个数 原子+1操作
		// Increment waiters count.
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
			if race.Enabled && w == 0 { // race相关
				// Wait must be synchronized with the first Add.
				// Need to model this is as a write to race with the read in Add.
				// As a consequence, can do the write only for the first waiter,
				// otherwise concurrent Waits will race with each other.
				race.Write(unsafe.Pointer(semap))
			}
			runtime_Semacquire(semap) //使用信号量进行阻塞为保证goroutine抓住执行权限不放
			if *statep != 0 { //非法状态
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			if race.Enabled { // race相关
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			return // 直接返回
		}
	}
}

透视一下底层计数器的变化

这里其实可以直接将wg类型通过unsafe.Pointer转换成*[3]uint32类型的指针因为WaitGroup中的noCopy字段是空结构体其不占内存空间。fmt.Println(unsafe.Sizeof(struct{}{})) // 0

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
	"unsafe"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	wg := sync.WaitGroup{}
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			time.Sleep(time.Second * time.Duration(rand.Intn(10)))
			defer wg.Done()
		}()
	}
	go func() {
		for {
			p := (*waitGroup)(unsafe.Pointer(&wg))
			p1 := (*[3]uint32)(unsafe.Pointer(&p.state1))
			fmt.Println(p1)
			time.Sleep(time.Second)
		}
	}()
	wg.Wait()
	p := (*waitGroup)(unsafe.Pointer(&wg))
	p1 := (*[3]uint32)(unsafe.Pointer(&p.state1))
	fmt.Println(p1)

}

type waitGroup struct {
	noCopy struct{}
	state1 uint64
	state2 uint32
}
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: go