Go语言设计与实现 -- Channel

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

稍微需要注意一点的用法

类型断言

type dog struct {
	Name  string
	Color string
}

func main() {
	allChan := make(chan any, 10)
	allChan <- dog{Name: "lxy", Color: "yellow"}
	// 如果你这么写代码的话你虽然拿到了一条狗但是你拿不到它的任何属性和方法
	// dog1 := <-allChan
	dog1 := (<-allChan).(dog)
	fmt.Println(dog1.Name)
	fmt.Println(dog1.Color)
	fmt.Println(dog1)
}

for range时遇到的一些问题

type dog struct {
	Name  string
	Color string
}

func main() {
	allChan := make(chan any, 10)
	allChan <- dog{Name: "lxy", Color: "yellow"}
	allChan <- -1
	for val := range allChan {
		fmt.Println(val)
	}
}

运行结果如下

{lxy yellow}
-1
fatal error: all goroutines are asleep - deadlock!

原因分析

这个for循环只有在管道关闭了之后才会结束而管道没有被关闭因此永远不可能结束报出错误。

改成后的代码如下

type dog struct {
	Name  string
	Color string
}

func main() {
	allChan := make(chan any, 10)
	allChan <- dog{Name: "lxy", Color: "yellow"}
	allChan <- -1
	close(allChan)
	for val := range allChan {
		fmt.Println(val)
	}
}

原理剖析

Go语言中也能使用共享内存加互斥锁进行通信但是Go语言提供了一种不同的并发模型 – 通信顺序进程CSP。

GoroutineChannel分别对应CSP中实体和传递信息的媒介Goroutine之间会通过Channel传递数据。

今天来讲解Channel它的收发操作均遵循先进先出并且是线程安全的关于线程安全我们一般使用锁来解决。

锁是一种常见的并发控制技术我们一般将锁分成乐观锁悲观锁即乐观并发控制和悲观并发控制。无锁队列更准确的描述是使用乐观并发控制的队列。但是乐观锁和悲观锁是有本质区别的乐观锁不是真正的锁只是一种并发控制思想。

乐观并发控制本质上是基于验证的协议我们使用原子指令CAS在多线程间同步数据无锁队列也依赖这一个原子指令。

然而Channel目前还是有锁队列哈哈它在无锁化的过程中遇到了一点问题至今还有解决。

数据结构

type hchan struct {
   qcount   uint           // 元素个数
   dataqsiz uint           // 循环队列的长度
   buf      unsafe.Pointer // 缓冲区数据指针
   elemsize uint16 // 能够收发的元素大小
   closed   uint32
   elemtype *_type // 能够收发的元素类型
   sendx    uint   // 发送操作处理到的位置
   recvx    uint   // 接收操作处理到的位置
   recvq    waitq  // 由于缓冲区不足而阻塞的Goroutine读队列
   sendq    waitq  // 由于缓冲区不足而阻塞的Goroutine写队列

   // lock protects all fields in hchan, as well as several
   // fields in sudogs blocked on this channel.
   //
   // Do not change another G's status while holding this lock
   // (in particular, do not ready a G), as this can deadlock
   // with stack shrinking.
   lock mutex
}

这缓冲区是环形的

创建Channel

func makechan(t *chantype, size int) *hchan {
   elem := t.elem

   // compiler checks this but be safe.
   if elem.size >= 1<<16 {
      throw("makechan: invalid channel element type")
   }
   if hchanSize%maxAlign != 0 || elem.align > maxAlign {
      throw("makechan: bad alignment")
   }

   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 {
      panic(plainError("makechan: size out of range"))
   }

   // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
   // buf points into the same allocation, elemtype is persistent.
   // SudoG's are referenced from their owning thread so they can't be collected.
   // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
   var c *hchan
   switch {
   case mem == 0:
      // Queue or element size is zero.
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      // Race detector uses this location for synchronization.
      c.buf = c.raceaddr()
   case elem.ptrdata == 0:
      // Elements do not contain pointers.
      // Allocate hchan and buf in one call.
      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
      c.buf = add(unsafe.Pointer(c), hchanSize)
   default:
      // Elements contain pointers.
      c = new(hchan)
      c.buf = mallocgc(mem, elem, true)
   }

   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)
   lockInit(&c.lock, lockRankHchan)

   if debugChan {
      print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
   }
   return c
}

这个代码会根据Channel中收发元素的类型和缓冲区大小初始化runtime.hchan和缓冲区。

  • 如果当前Channel中不存在缓冲区那么只会为runtime.hchan分配一块内存空间
  • 如果当前Channel中存储的类型不是指针类型会为当前Channel和底层数组分配一块儿连续的空间
  • 默认情况下会单独为runtime.hchan和缓冲区分配内存
  • 最后统一更新各类字段

发送数据

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   if c == nil {
      if !block {
         return false
      }
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }

   if debugChan {
      print("chansend: chan=", c, "\n")
   }

   if raceenabled {
      racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
   }

   // Fast path: check for failed non-blocking operation without acquiring the lock.
   //
   // After observing that the channel is not closed, we observe that the channel is
   // not ready for sending. Each of these observations is a single word-sized read
   // (first c.closed and second full()).
   // Because a closed channel cannot transition from 'ready for sending' to
   // 'not ready for sending', even if the channel is closed between the two observations,
   // they imply a moment between the two when the channel was both not yet closed
   // and not ready for sending. We behave as if we observed the channel at that moment,
   // and report that the send cannot proceed.
   //
   // It is okay if the reads are reordered here: if we observe that the channel is not
   // ready for sending and then observe that it is not closed, that implies that the
   // channel wasn't closed during the first observation. However, nothing here
   // guarantees forward progress. We rely on the side effects of lock release in
   // chanrecv() and closechan() to update this thread's view of c.closed and full().
   if !block && c.closed == 0 && full(c) {
      return false
   }

   var t0 int64
   if blockprofilerate > 0 {
      t0 = cputicks()
   }

   lock(&c.lock)

   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError("send on closed channel"))
   }

   if sg := c.recvq.dequeue(); sg != nil {
      // Found a waiting receiver. We pass the value we want to send
      // directly to the receiver, bypassing the channel buffer (if any).
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
   }

   if c.qcount < c.dataqsiz {
      // Space is available in the channel buffer. Enqueue the element to send.
      qp := chanbuf(c, c.sendx)
      if raceenabled {
         racenotify(c, c.sendx, nil)
      }
      typedmemmove(c.elemtype, qp, ep)
      c.sendx++
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      c.qcount++
      unlock(&c.lock)
      return true
   }

   if !block {
      unlock(&c.lock)
      return false
   }

   // Block on the channel. Some receiver will complete our operation for us.
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   // No stack splits between assigning elem and enqueuing mysg
   // on gp.waiting where copystack can find it.
   mysg.elem = ep
   mysg.waitlink = nil
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil
   c.sendq.enqueue(mysg)
   // Signal to anyone trying to shrink our stack that we're about
   // to park on a channel. The window between when this G's status
   // changes and when we set gp.activeStackChans is not safe for
   // stack shrinking.
   atomic.Store8(&gp.parkingOnChan, 1)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
   // Ensure the value being sent is kept alive until the
   // receiver copies it out. The sudog has a pointer to the
   // stack object, but sudogs aren't considered as roots of the
   // stack tracer.
   KeepAlive(ep)

   // someone woke us up.
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   gp.activeStackChans = false
   closed := !mysg.success
   gp.param = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   mysg.c = nil
   releaseSudog(mysg)
   if closed {
      if c.closed == 0 {
         throw("chansend: spurious wakeup")
      }
      panic(plainError("send on closed channel"))
   }
   return true
}

这个函数比较复杂所以我们简单介绍一下

  • 在发送逻辑执行之前会为当前Channel加锁防止多个线程并发修改数据
  • 当存在等待的接收者时将发送的数据写入Channel的缓冲区
  • 当不存在缓冲区或者缓冲区已经满的时候等待其他GoroutineChannel接收数据

直接发送

如果目标Channel没有被关闭并且已经有处于读等待的Goroutine那么runtime.chansend会从接收队列recvq中取出最先陷入等待的Goroutine并直接向它发送数据。如图所示

channel-direct-send

该图来自于面向信仰编程

我们之前讲解过了recvpwaitq类型的然后我们来看一下waitq类型的结构体。

type waitq struct {
   first *sudog
   last  *sudog
}

runtime.sudog表示一个在等待列表中的Goroution该结构中存储了两个分别指向前后runtime.sudog指针以构成链表。

发送数据的时候会调用runtime.send

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   if raceenabled {
      if c.dataqsiz == 0 {
         racesync(c, sg)
      } else {
         // Pretend we go through the buffer, even though
         // we copy directly. Note that we need to increment
         // the head/tail locations only when raceenabled.
         racenotify(c, c.recvx, nil)
         racenotify(c, c.recvx, sg)
         c.recvx++
         if c.recvx == c.dataqsiz {
            c.recvx = 0
         }
         c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
      }
   }
   if sg.elem != nil {
      sendDirect(c.elemtype, sg, ep)
      sg.elem = nil
   }
   gp := sg.g
   unlockf()
   gp.param = unsafe.Pointer(sg)
   sg.success = true
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   goready(gp, skip+1)
}

这个函数的执行可以分成两个部分

  • 调用runtime.sendDirect将发送的数据直接复制到x = <-c表达式中变量x所在的内存地址上。
  • 调用runtime.goready将等待接收数据的Goroutine标记成可运行状态Grunnable并该Goroutine放到发送方所在处理器的runnext上等待执行该处理器在下次调度的时候会立即唤醒数据的接收方。发送数据的过程只是将接收方的Gouroutine放到了处理器的runnext中并没有立即执行

缓冲区

如果创建的Channel包含缓冲区并且Channel中的数据没有装满。

首先我们会使用runtime.chanbuf计算出下一个可以存储数据的位置然后通过runtime.typememmove将发送的数据复制到缓冲区中并增加sendx索引和qcount计数器。

if c.qcount < c.dataqsiz {
   // Space is available in the channel buffer. Enqueue the element to send.
   qp := chanbuf(c, c.sendx)
   if raceenabled {
      racenotify(c, c.sendx, nil)
   }
   typedmemmove(c.elemtype, qp, ep)
   c.sendx++
   if c.sendx == c.dataqsiz {
      c.sendx = 0
   }
   c.qcount++
   unlock(&c.lock)
   return true
}

channel-buffer-send

图片来自于面向信仰编程

如果当前Channel的缓冲区未满向Channel发送的数据会存储在Channelsendx索引所在的位置并将sendx索引加一。因为这里的buf是一个循环数组所以当sendx等于dataqsiz时会重新回到数组开始的位置。

阻塞发送

Channel没有接收能够处理数据时向Channel发送数据会被下游阻塞。

if !block {
   unlock(&c.lock)
   return false
}

// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
   mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)

// someone woke us up.
if mysg != gp.waiting {
   throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
   blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
   if c.closed == 0 {
      throw("chansend: spurious wakeup")
   }
   panic(plainError("send on closed channel"))
}
return true

该函数的调用过程如下

  • 调用 runtime.getg 获取发送数据使用的 Goroutine
  • 执行 runtime.acquireSudog 获取 runtime.sudog 结构并设置这一次阻塞发送的相关信息例如发送的 Channel、是否在 select 中和待发送数据的内存地址等
  • 将刚刚创建并初始化的 runtime.sudog 加入发送等待队列并设置到当前 Goroutine 的 waiting 上表示 Goroutine 正在等待该 sudog 准备就绪
  • 调用 runtime.goparkunlock 将当前的 Goroutine 陷入沉睡等待唤醒
  • 被调度器唤醒后会执行一些收尾工作将一些属性置零并且释放 runtime.sudog 结构体

函数在最后会返回 true 表示这次我们已经成功向 Channel 发送了数据。

接收数据

我们接下来继续介绍 Channel 操作的另一方接收数据。Go 语言中可以使用两种不同的方式去接收 Channel 中的数据

i <- ch
i, ok <- ch

Go

这两种不同的方法经过编译器的处理都会变成 ORECV 类型的节点后者会在类型检查阶段被转换成 OAS2RECV 类型。数据的接收操作遵循以下的路线图

channel-receive-node

图片来自于面向信仰编程

图 6-22 Channel 接收操作的路线图

虽然不同的接收方式会被转换成 runtime.chanrecv1runtime.chanrecv2 两种不同函数的调用但是这两个函数最终还是会调用 runtime.chanrecv

接收数据包含3种场景

  • 当存在等待的发送者时通过 runtime.recv 从阻塞的发送者或者缓冲区中获取数据
  • 当缓冲区存在数据时从 Channel 的缓冲区中接收数据
  • 当缓冲区中不存在数据时等待其他 Goroutine 向 Channel 发送数据

直接接收

当 Channel 的 sendq 队列中包含处于等待状态的 Goroutine 时该函数会取出队列头等待的 Goroutine处理的逻辑和发送时相差无几只是发送数据时调用的是 runtime.send 函数而接收数据时使用 runtime.recv

我们来看一下源码

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   if c.dataqsiz == 0 {
      if raceenabled {
         racesync(c, sg)
      }
      if ep != nil {
         // copy data from sender
         recvDirect(c.elemtype, sg, ep)
      }
   } else {
      // Queue is full. Take the item at the
      // head of the queue. Make the sender enqueue
      // its item at the tail of the queue. Since the
      // queue is full, those are both the same slot.
      qp := chanbuf(c, c.recvx)
      if raceenabled {
         racenotify(c, c.recvx, nil)
         racenotify(c, c.recvx, sg)
      }
      // copy data from queue to receiver
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      // copy data from sender to queue
      typedmemmove(c.elemtype, qp, sg.elem)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
   }
   sg.elem = nil
   gp := sg.g
   unlockf()
   gp.param = unsafe.Pointer(sg)
   sg.success = true
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   goready(gp, skip+1)
}
  • 如果Channel不存在缓冲区

    Channel发送队列中的Goroutine存储的数据复制到目标内存地址中

  • 如果存在缓冲区

    • 将队列中的数据复制到接收方的内存地址中
    • 将发送队列头的数据复制到缓冲区中释放一个阻塞的发送方

channel-receive-from-sendq

图 6-23 从发送队列中获取数据

上图展示了 Channel 在缓冲区已经没有空间并且发送队列中存在等待的 Goroutine 时运行 <-ch 的执行过程。发送队列头的 runtime.sudog 中的元素会替换接收索引 recvx 所在位置的元素原有的元素会被拷贝到接收数据的变量对应的内存空间上。

缓冲区

当缓冲区中已经包含数据时从Channel中接收数据会直接从缓冲区中recvx的索引位置取出数据进行处理

if c.qcount > 0 {
   // Receive directly from queue
   qp := chanbuf(c, c.recvx)
   if raceenabled {
      racenotify(c, c.recvx, nil)
   }
   if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
   }
   typedmemclr(c.elemtype, qp)
   c.recvx++
   if c.recvx == c.dataqsiz {
      c.recvx = 0
   }
   c.qcount--
   unlock(&c.lock)
   return true, true
}

如果接收数据的内存地址不为空那么会使用typedmemmove将缓冲区中的数据复制到内存中清除队列中的数据并完成收尾工作。

channel-buffer-receive

图 6-24 从缓冲区中接接收数据

收尾工作包括递增 recvx一旦发现索引超过了 Channel 的容量时会将它归零重置循环队列的索引除此之外该函数还会减少 qcount 计数器并释放持有 Channel 的锁。

阻塞接收

当 Channel 的发送队列中不存在等待的 Goroutine 并且缓冲区中也不存在任何数据时从管道中接收数据的操作会变成阻塞的然而不是所有的接收操作都是阻塞的与 select 语句结合使用时就可能会使用到非阻塞的接收操作

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...
	if !block {
		unlock(&c.lock)
		return false, false
	}

	gp := getg()
	mysg := acquireSudog()
	mysg.elem = ep
	gp.waiting = mysg
	mysg.g = gp
	mysg.c = c
	c.recvq.enqueue(mysg)
	goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

	gp.waiting = nil
	closed := gp.param == nil
	gp.param = nil
	releaseSudog(mysg)
	return true, !closed
}

在正常的接收场景中我们会使用 runtime.sudog 将当前 Goroutine 包装成一个处于等待状态的 Goroutine 并将其加入到接收队列中。

完成入队之后上述代码还会调用 runtime.goparkunlock 立刻触发 Goroutine 的调度让出处理器的使用权并等待调度器的调度。

关闭Channel

recvqsendq两个队列中的数据加入Goroutine列表gList中于此同时清除runtime.sudog上所有未被处理的元素。

参考文章《Go语言设计与实现 – Channel》

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: go