golang 高手才会的【协程数量控制】套路总结

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

尽管Goroutine(协程)非常清轻量但是本身也是占用资源的过多协程切换也会带来开销总之物极必反无限制的开协程的结果只会是Game Over。生产实践中必须考虑控制协程数量本文带你看看针对不同场景和需求的协程数量控制方式看看这些姿势你都会了吗

场景

如下go中一个典型场景是接受数据然后开协程处理代码如下

// runTaskDataGenerator 产生数据
func runTaskDataGenerator(dataChan chan int) {
	for i := 0; i < 100; i++ {
		dataChan <- i
	}

	close(dataChan)
}

// runInfiniteTask 每来一个数据起协程处理任务
func runInfiniteTask(dataChan <-chan int) {
	var wg sync.WaitGroup

	for data := range dataChan {
		wg.Add(1)
		go func(data int) {
			defer wg.Done()

			// do something
			time.Sleep(3 * time.Second)
		}(data)
	}

	wg.Wait()
}

func TestRunInfiniteTask(t *testing.T) {
	dataChan := make(chan int)

	go runTaskDataGenerator(dataChan)
	go runNumGoroutineMonitor()

	runInfiniteTask(dataChan)
}

runNumGoroutineMonitor监控协程数量

// runNumGoroutineMonitor 协程数量监控
func runNumGoroutineMonitor() {
	log.Printf("协程数量->%d\n", runtime.NumGoroutine())

	for {
		select {
		case <-time.After(time.Second):
			log.Printf("协程数量->%d\n", runtime.NumGoroutine())
		}
	}
}

运行结果如下可以看到**有多少数据就会起多少协程如果任务处理时间较长短时间可能出现大量协程**耗尽资源需要控制协程数量。

=== RUN TestRunInfiniteTask
2023/01/05 10:36:12 协程数量->6
2023/01/05 10:36:13 协程数量->105
2023/01/05 10:36:14 协程数量->105
— PASS: TestRunInfiniteTask (3.00s)

固定个数协程并发处理任务

一般叫做Bounded/Fixed并发控制。

  • 优点是简单不复杂的并发任务这样简单处理即可。
  • 缺点在于dataChan可能流量不不均衡需要同时处理的任务多少在变动但是对应的协程数量保持不变要不就是任务处理堵塞要不就是存在多余的协程空闲
// runBoundedTask 起maxTaskNum个协程共同处理任务
func runBoundedTask(dataChan <-chan int, maxTaskNum int) {
	var wg sync.WaitGroup
	wg.Add(maxTaskNum)

	for i := 0; i < maxTaskNum; i++ {
		go func() {
			defer wg.Done()

			for data := range dataChan {
				func(data int) {

					// do something
					time.Sleep(3 * time.Second)
				}(data)
			}
		}()
	}

	wg.Wait()
}

动态个数协程并发处理任务

针对固定个数协程的缺点一个思路是协程数量最好能够根据来的处理任务的多少动态变更指定一个并发上限任务多时增加协程数量任务少时减少协程数量。这里提供两种实现思路

自定义令牌池实现

令牌池维持最大允许并发任务数个令牌每个任务启动时请求令牌运行完成返回令牌。

// runDynamicTask 
// 最大同时运行maxTaskNum个任务处理数据
// 自定义令牌池维持maxTaskNum个令牌供竞争
func runDynamicTask(dataChan <-chan int, maxTaskNum int) {
	// 初始化令牌池
	tokenPool := make(chan struct{}, maxTaskNum)
	for i := 0; i < maxTaskNum; i++ {
		tokenPool <- struct{}{}
	}

	var wg sync.WaitGroup

	for data := range dataChan {
		// 先获取令牌如果被消费完则阻塞等待其它任务返还令牌
		<-tokenPool

		wg.Add(1)
		go func(data int) {
			defer wg.Done()

			// 任务运行完成返还令牌
			defer func() {
				tokenPool <- struct{}{}
			}()

			// do something
			time.Sleep(3 * time.Second)
		}(data)
	}

	wg.Wait()
}

信号量Semaphore实现

同上令牌池换成信号量。

// runSemaphoreTask 
// 最大同时运行maxTaskNum个任务处理数据
// 使用信号量维持maxTaskNum个信号
func runSemaphoreTask(dataChan <-chan int, maxTaskNum int64) {
	w := semaphore.NewWeighted(maxTaskNum)

	var wg sync.WaitGroup

	for data := range dataChan {
		// 先获取信号量如果被消费完则阻塞等待信号量返还
		_ = w.Acquire(context.TODO(), 1)

		wg.Add(1)
		go func(data int) {
			defer wg.Done()

			// 运行完成返还信号量
			defer w.Release(1)

			// do something
			time.Sleep(3 * time.Second)
		}(data)
	}

	wg.Wait()
}

指定处理速度并发处理任务

针对固定个数协程的缺点另一个思路是借鉴限流器的实现控制每个时刻最大允许协程数量也达到控制协程数量的目的。这里也提供两种实现思路

自定义令牌池实现

相当于一个简单限流器指定速度生产令牌每个任务启动时必须请求到令牌。

// runRateLimitTask 限制每秒允许的最大协程数量限流器的思路
func runRateLimitTask(dataChan <-chan int) {
	// 初始化令牌池
	tokenPool := make(chan struct{})
	go func() {
		for {
			select {
			// 动态控制令牌生成速度
			case <-time.After(time.Second):
				tokenPool <- struct{}{}
			}
		}
	}()

	var wg sync.WaitGroup

	for data := range dataChan {
		// 先获取令牌如果被消费完则阻塞等待新令牌产生
		<-tokenPool

		wg.Add(1)
		go func(data int) {
			defer wg.Done()

			// do something
			time.Sleep(3 * time.Second)
		}(data)
	}

	wg.Wait()
}

官方限流器实现

逻辑同上每个任务启动必须先获取令牌。

// runRateLimitTask2 限制每秒允许的最大协程数量使用官方限流器
func runRateLimitTask2(dataChan <-chan int) {
	// 初始化令牌池
	limit := rate.Every(time.Second) // 每秒一个
	limiter := rate.NewLimiter(limit, 10)

	var wg sync.WaitGroup

	for data := range dataChan {
		// 先获取令牌如果被消费完则阻塞等待新令牌产生
		_ = limiter.Wait(context.TODO())

		wg.Add(1)
		go func(data int) {
			defer wg.Done()

			// do something
			time.Sleep(3 * time.Second)
		}(data)
	}

	wg.Wait()
}

协程池并发处理任务

生产业务中针对复杂业务或者不想那么麻烦可以直接上协程池。
常用协程池https://github.com/panjf2000/ants如下实现。

代码上看起来简洁很多根本原理和动态个数协程控制思路差不多后续单开一篇文章讲讲协程池的实现。

// runGoroutinePoolTask 使用协程池动态管理协程数量
func runGoroutinePoolTask(dataChan <-chan int, maxTaskNum int) {
	p, _ := ants.NewPool(maxTaskNum)
	defer p.Release()

	var wg sync.WaitGroup

	for _ = range dataChan {
		wg.Add(1)

		// 提交任务协程池动态管理数量可以做更多的分配优化策略
		_ = p.Submit(func() {
			defer wg.Done()

			// do something
			time.Sleep(3 * time.Second)
		})

	}

	wg.Wait()
}

参考

演示代码 https://gitee.com/wenzhou1219/go-in-prod/tree/master/task_concurrency

https://zhuanlan.zhihu.com/p/568151296

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: go