Channel 作为 Go 中最常见的多线程通信和同步方式,使用场景非常广泛。对于带 buffer 的 Channel 内部通过一个环形队列存储,对于无 buffer 的 Channel 则不分配内存空间,直接将数据从 sender 拷贝到 receiver。

数据结构

Channel 在 Go 的运行时由 runtime.hchan 结构表示,其内部数据字段如下所示:

type hchan struct {
	qcount   uint           // channel 中元素的总量
	elemsize uint16
	closed   uint32
	elemtype *_type // 元素类型

	dataqsiz uint
	buf      unsafe.Pointer
	sendx    uint
	recvx    uint
	recvq    waitq  // 等待接受列表
	sendq    waitq  // 等待发送列表

	lock mutex
}

rutnime.hchan 中用 dataqsiz,buf,sendx,recvx 四个字段构建环形队列:

  • dataqsiz - 环形队列的长度
  • buf - 环形队列的内存地址
  • sendx - 发送方数据存储的下标
  • recvx - 接收方数据获取的下标

如下图所示,元素在发送时会放入 sendx 位置,在接受时会从 recvx 位置获取元素。当 recvxsendxdataqsiz 相等时被重置为0,构成环形队列。

初始化

通过 asm 我们能发现, make(chan int) 是通过调用 runtime.makechan 实现的,其返回一个 hchan 的指针。

  • 无buffer Channel 不会生成环形队列,直接使用 buf
  • 非指针类型的 Channel 会将 hchan 和环形队列的内存一并申请,作为一个整体向 GC Collector 进行注册
  • 指针类型的 Channel 会先申请一个 hchan 的内存,再单独申请环形队列的内存,以标记指针数据的 GC
func makechan(t *chantype, size int) *hchan {
	elem := t.elem
	...
	// 计算元素所占用的空间
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	...
	var c *hchan
	switch {
	case mem == 0: // 无 buffer Channel
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		c.buf = c.raceaddr()
	case elem.ptrdata == 0: // 元素类型为非指针 Channel
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default: // 其他
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}
	...
	lockInit(&c.lock, lockRankHchan)

	return c
}

写入

对 Channel 的写入是 runtime.chansend 实现的,并通过 Lock 对 hchan 中的数据进行保护。

  • 如果当前 recvq 已有等待接收的 goroutine 则直接将数据拷贝到接收者,并唤醒接收者
  • 如果环形队列中还有可用空间 c.qcount < c.dataqsiz,则直接将元素拷贝到队列中,并移动 sendx
  • 如果队列已满则需要 block 当前 goroutine,将 goroutine 放入 sendq,并等待接收者唤醒
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	...
	lock(&c.lock)

	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

	if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	if c.qcount < c.dataqsiz {
		qp := chanbuf(c, c.sendx)
		...
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}

	gp := getg()
	mysg := acquireSudog()
	...
	c.sendq.enqueue(mysg)
	...
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	KeepAlive(ep) // 防止当前 goroutine 暂停期间 ep 被回收

	...
	// 被唤醒,处理收尾工作
	releaseSudog(mysg)
	...
	return true
}

读取

从 Channel 中读取数据是 runtime.chanrecv 实现的,内部逻辑和 runtime.chansend 基本一致

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...
	lock(&c.lock)
	...
	// 存在等待 sender goroutine
	if sg := c.sendq.dequeue(); sg != nil {
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}
	// 从队列中获取数据
	if c.qcount > 0 {
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}
	...
	gp := getg()
	mysg := acquireSudog()
	...
	c.recvq.enqueue(mysg)
	...
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

	...
	// 被唤醒
	releaseSudog(mysg)
	return true, success
}

参考