Go语言 | 协程池的应用(可能是全网最适合小白的教程)

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

文章目录

前言

为什么说这是全网最适合小白的呢因为我就是一个第一次写多线程使用协程池的小白自己明白这里面的入门不易特此分享给大家

需求分析

在我们的服务中有这么一个功能需要函数先去遍历一个文件夹这个文件夹可能有很多的子目录然后读取目录下的所有json文件并对其进行schema验证。

原始代码是将上述需求通过串行的方式进行实现也就是遍历读到一个json文件后就对其进行schema校验

在业务量小即json文件少的情况下这种方法一点问题没有但是当json文件激增到五六百个时程序可能就要耗费七八秒的时间这是不能接受的

目前改造计划通过多线程的方式进行功能的划分并发的执行读取任务利用Go语言高并发的机制加速程序服务的运行

设计功能

  • 分离文件遍历与文件读取校验的功能
  • 设计协程池文件遍历作为主协程文件读取校验作为worker协程
  • 通过chan通道传递文件名与校验结果

此前我主要使用C语言编程对于多线程的使用可以说一点没有能规避的就规避目前使用Go语言多线程是学习和使用是躲不过了这次还有这么一个协程池的概念

协程池和多线程相关概念

首先介绍协程池是什么为什么不直接用多线程。

go协程池goroutine

Go语言中的goroutine虽然相对于系统线程来说比较轻量级初始栈大小仅2KB但是在高并发量下的goroutine频繁创建和销毁对于性能损耗以及GC来说压力也不小

很多情况下我们需要考虑如下问题

  1. 限制并发的goroutine数量
  2. 复用goroutine减轻runtime调度压力提升程序性能
  3. 规避过多的goroutine侵占系统资源CPU&内存。

go协程池

如果无休止的开辟Goroutine依然会出现高频率的调度Groutine那么依然会浪费很多上下文切换的资源导致做无用功。

所以设计一个Goroutine池限制Goroutine的开辟个数在大型并发场景还是必要的。

package main
 
import (
	"fmt"
	"time"
)
 
/* 有关Task任务相关定义及操作 */
//定义任务Task类型,每一个任务Task都可以抽象成一个函数
type Task struct {
	f func() error //一个无参的函数类型
}
 
//通过NewTask来创建一个Task
func NewTask(f func() error) *Task {
	t := Task{
		f: f,
	}
	return &t
}
 
//执行Task任务的方法
func (t *Task) Execute() {
	t.f() //调用任务所绑定的函数
}
 
/* 有关协程池的定义及操作 */
//定义池类型
type Pool struct {
	EntryChannel chan *Task //对外接收Task的入口
	worker_num   int        //协程池最大worker数量,限定Goroutine的个数
	JobsChannel  chan *Task //协程池内部的任务就绪队列
}
 
//创建一个协程池
func NewPool(cap int) *Pool {
	p := Pool{
		EntryChannel: make(chan *Task),
		worker_num:   cap,
		JobsChannel:  make(chan *Task),
	}
	return &p
}
 
//协程池创建一个worker并且开始工作
func (p *Pool) worker(work_ID int) {
	//worker不断的从JobsChannel内部任务队列中拿任务
	for task := range p.JobsChannel {
		//如果拿到任务,则执行task任务
		task.Execute()
		fmt.Println("worker ID ", work_ID, " 执行完毕任务")
	}
}
 
//让协程池Pool开始工作
func (p *Pool) Run() {
	//1,首先根据协程池的worker数量限定,开启固定数量的Worker,
	//  每一个Worker用一个Goroutine承载
	for i := 0; i < p.worker_num; i++ {
		fmt.Println("开启固定数量的Worker:", i)
		go p.worker(i)
	}
 
	//2, 从EntryChannel协程池入口取外界传递过来的任务
	//   并且将任务送进JobsChannel中
	for task := range p.EntryChannel {
		p.JobsChannel <- task
	}
 
	//3, 执行完毕需要关闭JobsChannel
	close(p.JobsChannel)
	fmt.Println("执行完毕需要关闭JobsChannel")
 
	//4, 执行完毕需要关闭EntryChannel
	close(p.EntryChannel)
	fmt.Println("执行完毕需要关闭EntryChannel")
}
 
//主函数
func main() {
	//创建一个Task
	t := NewTask(func() error {
		fmt.Println("创建一个Task:", time.Now().Format("2006-01-02 15:04:05"))
		return nil
	})
 
	//创建一个协程池,最大开启3个协程worker
	p := NewPool(3)
 
	//开一个协程 不断的向 Pool 输送打印一条时间的task任务
	go func() {
		for {
			p.EntryChannel <- t
		}
	}()
 
	//启动协程池p
	p.Run()
}

上述代码通过一个简单的例子说明了协程池的基本工作原理但是如果上述框架要应用在实际工程中还有许多的不足因此go协程池也是有库函数的存在

ants库

ants是一个受fasthttp启发的高性能协程池fasthttp号称是比go原生的net/http快10倍其原因之一就是采用了各种池化技术 ants相比之前两种协程池其模型更像是之前接触到的数据库连接池需要从空余的worker中取出一个来执行任务, 当无可用空余worker的时候再去创建而当pool的容量达到上线之后剩余的任务阻塞等待当前进行中的worker执行完毕将worker放回pool, 直至pool中有空闲worker。

ants在内存的管理上做得很好除了定期清除过期worker(一定时间内没有分配到任务的worker)ants还实现了一种适用于大批量相同任务的pool, 这种pool与一个需要大批量重复执行的函数锁绑定避免了调用方不停的创建更加节省内存。

ants Git仓库地址

如果可以看懂这个库的使用还是推荐通过第三方维护的库来进行实现可以规避很多初次使用遇到的问题。

package main
 
import (
	"fmt"
	"github.com/panjf2000/ants"
	"sync"
	"time"
)
 
//任务
func sendMail(i int, wg *sync.WaitGroup) func() {
	var cnt int
	return func() {
		for {
			time.Sleep(time.Second * 2)
			fmt.Println("send mail to ", i)
			cnt++
			if cnt > 5 && i == 1 {
				fmt.Println("退出协程ID:", i)
				break
			}
		}
		wg.Done()
	}
}
 
func main() {
	wg := sync.WaitGroup{}
 
	//申请一个协程池对象
	pool, _ := ants.NewPool(2)
 
	//关闭协程池
	defer pool.Release()
 
	// 向pool提交任务
	for i := 1; i <= 5; i++ {
		pool.Submit(sendMail(i, &wg))
		wg.Add(1)
	}
	wg.Wait()
}

源码中提到 ants的吞吐量能够比原生groutine高出N倍内存节省10到20倍。

实现

在我的实际设计中因为各种原因我没有使用ants库而是通过其原理进行了设计

因为牵扯到具体的功能直接看可能会比较懵但我想说明的就是我们需要关注的点

func loadConfigData(paths []string, schema map[string]*gojsonschema.Schema,
	loadSchemaCheck bool, poolNumber int) {

	var wg sync.WaitGroup
	var mapGuard sync.Mutex
	fileinfochan := make(chan fileInfoChan, 100)

	for i := 0; i < poolNumber; i++ {
		go func() {
			for fileinfo := range fileinfochan {
				switch fileinfo.jsonType {
				case hub.OTHER_TYPE_TEMPALTE:
					_ = loadOtherDataTempParse(fileinfo, &mapGuard)
					wg.Done()
				case hub.OTHER_TYPE_PLUGIN:
					_ = loadOtherDataPluginParse(fileinfo, &mapGuard)
					wg.Done()
				case hub.OTHER_TYPE_YAML:
					_ = loadOtherDataYamlParse(fileinfo, &mapGuard)
					wg.Done()
				default:
					loadJsonDefDataParse(fileinfo.jsonType, fileinfo.fileName, fileinfo.fileNamePath,
						schema, loadSchemaCheck, &mapGuard)
					wg.Done()
				}
			}
		}()
	}

	logger.LogS().Debugln("加载API def文件...")
	for i := hub.JSON_TYPE_PRIVATE; i <= hub.JSON_TYPE_SCHEDULE; i++ {
		loadJsonDefData(i, paths[i], true, schema, loadSchemaCheck, fileinfochan, &wg)
	}

	wg.Wait()
	close(fileinfochan)
}
... ...
// 遍历tml目录及子目录
func loadOtherDataTemp(path string, fileinfochan chan fileInfoChan, jsonType int, wg *sync.WaitGroup) error {
	logger.LogS().Debugln("加载template(*.tml)文件: ")
	err := filepath.Walk(path, func(fileNamePath string, f os.FileInfo, err error) error {
		if strings.Contains(fileNamePath, ".tmpl") {
			fileinfo := fileInfoChan{
				fileNamePath: fileNamePath,
				jsonType:     jsonType,
			}
			wg.Add(1)
			fileinfochan <- fileinfo
		}
		return nil
	})
	if err != nil {
		logger.LogS().Errorln(err.Error())
		return err
	}
	logger.LogS().Debugln("加载(*.tml)文件完成\r\n")
	return nil
}

主要关注如下代码和变量

sync.WaitGroup

使用等待组进行多个任务的同步等待组可以保证在并发环境中完成指定数量的任务

在 sync.WaitGroup等待组类型中每个 sync.WaitGroup 值在内部维护着一个计数此计数的初始默认值为零。

var wg sync.WaitGroup

在使用中我们每往通道里发送一个数据就是用wg.Add(1)进行计数加1每在通道里消费一个数据就使用wg.Done()进行计数减1。在主函数结束时通过wg.Wait()让程序等待通道中不再有发送且已经消费完数据后再退出即Wait() 会阻塞代码的运行直到计数器地值减为0。

sync.Mutex

var mapGuard sync.Mutex

对于需要在多个线程里互斥访问的资源可以使用sync.Mutex来操作对应的是Lock和Unlock两个方法就是加锁和解锁。

Go语言中对于map的操作必须通过互斥锁

var mutex sync.Mutex                //互斥锁
func printer(str string){
    mutex.Lock()                //加锁
    defer mutex.Unlock()        //在defer语句里解锁这样就可以保证在函数退出时释放。
    for _,ch:=range str{
        fmt.Printf("%c",ch)
        time.Sleep(time.Millisecond*100)
    }
}

chan

创建chan的结构体根据实际情况创建结构体变量

初始化过程中创建了100个缓存通道在结束时要使用close对chan进行关闭

type fileInfoChan struct {
	jsonType     int
	fileName     string
	fileNamePath string
}

fileinfochan := make(chan fileInfoChan, 100)

close(fileinfochan)

pool

for i := 0; i < poolNumber; i++ {
		go func() {
		}
}

pool就是我们常说的协程池这里我们来存放具体的任务在设计中即解析json文件并验证

我们要在主函数中提前设置好建立的pool数量也就是协程池数

假设我们poolNumber = 20也就是协程池数为20个我们创建二十个线程作为任务处理的线程

后续任务在chan排队进入线程池运行消费chan中的任务

chan <-

信息如何进入通道排队这里我们对文件夹进行遍历每遍历到一个文件信息我们就通过 <-的方式传递到chan中进行排队等待线程池的消耗

if strings.Contains(fileNamePath, ".tmpl") {
			fileinfo := fileInfoChan{
				fileNamePath: fileNamePath,
				jsonType:     jsonType,
			}
			wg.Add(1)
			fileinfochan <- fileinfo
		}

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: go

“Go语言 | 协程池的应用(可能是全网最适合小白的教程)” 的相关文章