【Golang进阶】Channel 实现
文章目录
Channel 作为 Go 中最常见的多线程通信和同步方式,使用场景非常广泛。对于带 buffer 的 Channel 内部通过一个环形队列存储,对于无 buffer 的 Channel 则不分配内存空间,直接将数据从 sender 拷贝到 receiver。
数据结构
Channel 在 Go 的运行时由 runtime.hchan
结构表示,其内部数据字段如下所示:
type hchan struct {
qcount uint // channel 中元素的总量
elemsize uint16
closed uint32
elemtype *_type // 元素类型
dataqsiz uint
buf unsafe.Pointer
sendx uint
recvx uint
recvq waitq // 等待接受列表
sendq waitq // 等待发送列表
lock mutex
}
rutnime.hchan
中用 dataqsiz
,buf
,sendx
,recvx
四个字段构建环形队列:
dataqsiz
- 环形队列的长度buf
- 环形队列的内存地址sendx
- 发送方数据存储的下标recvx
- 接收方数据获取的下标
如下图所示,元素在发送时会放入 sendx
位置,在接受时会从 recvx
位置获取元素。当 recvx
、sendx
与 dataqsiz
相等时被重置为0,构成环形队列。
初始化
通过 asm 我们能发现, make(chan int)
是通过调用 runtime.makechan
实现的,其返回一个 hchan
的指针。
- 无buffer Channel 不会生成环形队列,直接使用 buf
- 非指针类型的 Channel 会将
hchan
和环形队列的内存一并申请,作为一个整体向 GC Collector 进行注册 - 指针类型的 Channel 会先申请一个
hchan
的内存,再单独申请环形队列的内存,以标记指针数据的 GC
func makechan(t *chantype, size int) *hchan {
elem := t.elem
...
// 计算元素所占用的空间
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
...
var c *hchan
switch {
case mem == 0: // 无 buffer Channel
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0: // 元素类型为非指针 Channel
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: // 其他
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
...
lockInit(&c.lock, lockRankHchan)
return c
}
写入
对 Channel 的写入是 runtime.chansend
实现的,并通过 Lock 对 hchan 中的数据进行保护。
- 如果当前
recvq
已有等待接收的 goroutine 则直接将数据拷贝到接收者,并唤醒接收者 - 如果环形队列中还有可用空间
c.qcount < c.dataqsiz
,则直接将元素拷贝到队列中,并移动sendx
- 如果队列已满则需要 block 当前 goroutine,将 goroutine 放入
sendq
,并等待接收者唤醒
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
...
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
gp := getg()
mysg := acquireSudog()
...
c.sendq.enqueue(mysg)
...
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
KeepAlive(ep) // 防止当前 goroutine 暂停期间 ep 被回收
...
// 被唤醒,处理收尾工作
releaseSudog(mysg)
...
return true
}
读取
从 Channel 中读取数据是 runtime.chanrecv
实现的,内部逻辑和 runtime.chansend
基本一致
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
lock(&c.lock)
...
// 存在等待 sender goroutine
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 从队列中获取数据
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
...
gp := getg()
mysg := acquireSudog()
...
c.recvq.enqueue(mysg)
...
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
...
// 被唤醒
releaseSudog(mysg)
return true, success
}