Go并发之Fan out / in

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

fan out

通常用来处理计算密集型(computationally intensive的任务开启多个goroutine和每个goroutine自己的channel一起同时读取上流的inStream每个goroutine处理计算任务以此利用计算机的多核性能更快速地处理完pipeline中的这个计算任务繁重的stage。

// establish several goroutines to receive values from one channel
func fanOut(
	done <-chan interface{},
	inStream <-chan interface{},
	outStreamsNum int,
	judgeFn func(val interface{}) bool) []<-chan interface{} {
	outStreams := make([]chan interface{}, outStreamsNum)
	for i := 0; i < outStreamsNum; i++ {
		outStreams[i] = make(chan interface{})
		go func(index int) {
			defer close(outStreams[index])
			for val := range inStream {
				if judgeFn(val) {
					select {
					case <-done:
						return
					case outStreams[index] <- val:
					}
				}
			}
		}(i)
	}
	readOnlyOutStreams := make([]<-chan interface{}, outStreamsNum)
	for i := 0; i < outStreamsNum; i++ {
		readOnlyOutStreams[i] = (<-chan interface{})(outStreams[i])
	}
	return readOnlyOutStreams
}

本例中所谓计算任务即判断函数judgeFn例如判断某个很大的数是否是素数等。

fan in

将多个channels汇聚到一个输出的channel。具体做法开启多个goroutine每个goroutine读取一个上流channel的值, 并且所有goroutine都往同一个下流channel即outStream输出读取的值。

// establish several goroutines to combine multiple channels into one channel
func fanIn(done <-chan interface{}, inStreams ...<-chan interface{}) <-chan interface{} {
	wg := sync.WaitGroup{}
	outStream := make(chan interface{})
	for _, inStream := range inStreams {
		wg.Add(1)
		go func(inStream <-chan interface{}) {
			defer wg.Done()
			for val := range inStream {
				select {
				case <-done:
					return
				case outStream <- val:
				}
			}
		}(inStream)
	}
	go func() {
		wg.Wait()
		close(outStream)
	}()
	return outStream
}

将两个函数集成为fanOutIn函数形成一个简单的接口

func fanOutFanIn(
	done <-chan interface{},
	inStream <-chan interface{},
	outStreamsNum int,
	judgeFn func(val interface{}) bool) <-chan interface{} {
	outStreams := fanOut(done, inStream, outStreamsNum, judgeFn)
	return fanIn(done, outStreams...)
}
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: go

“Go并发之Fan out / in” 的相关文章