Go并发之Fan out / in
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
fan out
通常用来处理计算密集型(computationally intensive的任务开启多个goroutine和每个goroutine自己的channel一起同时读取上流的inStream每个goroutine处理计算任务以此利用计算机的多核性能更快速地处理完pipeline中的这个计算任务繁重的stage。
// establish several goroutines to receive values from one channel
func fanOut(
done <-chan interface{},
inStream <-chan interface{},
outStreamsNum int,
judgeFn func(val interface{}) bool) []<-chan interface{} {
outStreams := make([]chan interface{}, outStreamsNum)
for i := 0; i < outStreamsNum; i++ {
outStreams[i] = make(chan interface{})
go func(index int) {
defer close(outStreams[index])
for val := range inStream {
if judgeFn(val) {
select {
case <-done:
return
case outStreams[index] <- val:
}
}
}
}(i)
}
readOnlyOutStreams := make([]<-chan interface{}, outStreamsNum)
for i := 0; i < outStreamsNum; i++ {
readOnlyOutStreams[i] = (<-chan interface{})(outStreams[i])
}
return readOnlyOutStreams
}
本例中所谓计算任务即判断函数judgeFn例如判断某个很大的数是否是素数等。
fan in
将多个channels汇聚到一个输出的channel。具体做法开启多个goroutine每个goroutine读取一个上流channel的值, 并且所有goroutine都往同一个下流channel即outStream输出读取的值。
// establish several goroutines to combine multiple channels into one channel
func fanIn(done <-chan interface{}, inStreams ...<-chan interface{}) <-chan interface{} {
wg := sync.WaitGroup{}
outStream := make(chan interface{})
for _, inStream := range inStreams {
wg.Add(1)
go func(inStream <-chan interface{}) {
defer wg.Done()
for val := range inStream {
select {
case <-done:
return
case outStream <- val:
}
}
}(inStream)
}
go func() {
wg.Wait()
close(outStream)
}()
return outStream
}
将两个函数集成为fanOutIn函数形成一个简单的接口
func fanOutFanIn(
done <-chan interface{},
inStream <-chan interface{},
outStreamsNum int,
judgeFn func(val interface{}) bool) <-chan interface{} {
outStreams := fanOut(done, inStream, outStreamsNum, judgeFn)
return fanIn(done, outStreams...)
}