Go源码分析(5) - 数据结构

Go源码分析(5) - 数据结构

介绍常见的数据结构,比如channel、slice、map等,通过对其底层实现原理的分析,来更好的使用这些数据结构。

一、切片

Slice数据结构

1
2
3
4
5
type slice struct {
array unsafe.Pointer
len int
cap int
}
  • array指针指向底层数组
  • len表示切片长度
  • cap表示底层数组容量

slice的三种初始化方式

make初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func makeslice(et *_type, len, cap int) slice {
// NOTE: The len > maxElements check here is not strictly necessary,
// but it produces a 'len out of range' error instead of a 'cap out of range' error
// when someone does make([]T, bignumber). 'cap out of range' is true too,
// but since the cap is only being supplied implicitly, saying len is clearer.
// See issue 4085.
maxElements := maxSliceCap(et.size)
if len < 0 || uintptr(len) > maxElements {
panic(errorString("makeslice: len out of range"))
}

if cap < len || uintptr(cap) > maxElements {
panic(errorString("makeslice: cap out of range"))
}

p := mallocgc(et.size*uintptr(cap), et, true)
return slice{p, len, cap}
}

使用make()方法会调用makeslice()的方法,会对 slice的 array 指针进行分配地址,创建后可直接使用。

[]struct{}{}

使用该方法创建后的对象可以直接使用。

new([]struct{})

1
2
3
4
5
func main() {
array := new([]int)
(*array)[0] = 1 // nil[0] = 1 err
println(array)
}

使用该方式只是创建一个该数组类型的指针,调用runtime.newobject方法 ,new是返回 slice 的地址。指针类型初始化后,值为nil,如果对该类型进行赋值会发生空指针异常,创建后不可直接使用。

slice的扩容机制

使用append()方法的扩容:

使用append()向Slice添加一个元素的实现步骤如下:

  1. 假如Slice容量够用,则将新元素追加进去,Slice.len++,返回原Slice
  2. 原Slice容量不够,则将Slice先扩容,扩容后得到新Slice
  3. 将新元素追加进新Slice,Slice.len++,返回新的Slice。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func growslice(et *_type, old slice, cap int) slice {

// 省略一些判断...

newcap := old.cap
doublecap := newcap + newcap
if cap > doublecap {
newcap = cap
} else {
if old.len < 1024 {
newcap = doublecap
} else {
// Check 0 < newcap to detect overflow
// and prevent an infinite loop.
for 0 < newcap && newcap < cap {
newcap += newcap / 4
}
// Set newcap to the requested cap when
// the newcap calculation overflowed.
if newcap <= 0 {
newcap = cap
}
}
}
// 省略一些后续...
}
  • 如果原Slice容量小于1024,则新Slice容量将扩大为原来的2倍;
  • 如果原Slice容量大于等于1024,则新Slice容量将扩大为原来的1.25倍;

slice的复制

使用copy()进行复制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func slicecopy(to, fm slice, width uintptr) int {
if fm.len == 0 || to.len == 0 {
return 0
}

n := fm.len
if to.len < n {
n = to.len
}

if width == 0 {
return n
}

if raceenabled {
callerpc := getcallerpc()
pc := funcPC(slicecopy)
racewriterangepc(to.array, uintptr(n*int(width)), callerpc, pc)
racereadrangepc(fm.array, uintptr(n*int(width)), callerpc, pc)
}
if msanenabled {
msanwrite(to.array, uintptr(n*int(width)))
msanread(fm.array, uintptr(n*int(width)))
}

size := uintptr(n) * width
if size == 1 { // common case worth about 2x to do here
// TODO: is this still worth it with new memmove impl?
*(*byte)(to.array) = *(*byte)(fm.array) // known to be a byte pointer
} else {
memmove(to.array, fm.array, size)
}
return n
}

使用copy()内置函数拷贝两个切片时,memmove()方法同时将被拷贝切片的array的切片的值逐一拷贝到新到切片,拷贝数量取两个切片长度的最小值。

使用slice[start:end]方法进行复制

使用该方法,同时将被拷贝切片的array的切片的指针拷贝到新到切片的指针上,同时len变为end-start,cap则和原slice一样。

二、channel

chan数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
  • qcount 当前队列中剩余元素个数
  • dataqsiz 环形队列长度,即可以存放的元素个数
  • buf 环形队列指针
  • elemsize 每个元素的大小
  • closed 标识关闭状态
  • elemtype 元素类型
  • sendx 队列下标,指示元素写入时存放到队列中的位置
  • recvx 队列下标,指示元素从队列的该位置读出
  • recvq 等待读消息的goroutine队列
  • sendq 等待写消息的goroutine队列
  • lock 互斥锁,chan不允许并发读写

hchan 实际上就是一个环形队列,buf指向环形队列,dataqsiz、qcount 分别指定了队列的容量和当前使用量,其中recvq队列和sendq队列是一个链表的结构,包含该goroutine和该groutine的数据,然后再来看一下waitq这个数据结构。

1
2
3
4
type waitq struct {
first *sudog
last *sudog
}

waitq是链表的定义,包含一个头结点和一个尾结点。我们可能对节点中存放对内容感到疑惑,再来看一下sudog这个数据结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.

g *g

// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)

// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.

acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}

sudog结构可以实际上是一个对应goroutine上保存着其对应的data数据,类似于java中的ThreadLocal。其中:

  • g 代表着 G-M-P模型中的 G,sudog 是对g的封装便于在 csp 模型中 g 可以同时阻塞在不同的 channel 上
  • elem 用于存储 goroutine 的数据;读通道时,数据会从 hchan 的队列中拷贝到 sudog 的 elem 域;写通道时,数据则是由 sudog 的elem 域拷贝到 hchan 的队列中。

下面给出他们的结构图:


创建channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func makechan(t *chantype, size int64) *hchan {
elem := t.elem

// compiler checks this but be safe.
// 异常判断 元素类型大小限制
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 异常判断 对齐限制
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// maxAlloc 是 Arena 区域的最大值,缓冲元素的大小与hchan相加不能超过 缓冲槽大小
if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) {
panic(plainError("makechan: size out of range"))
}

var c *hchan
// 不是指针类型
if elem.kind&kindNoPointers != 0 || size == 0 {
// 在一个调用中分配内存。
// 在这种情况下,Hchan不包含GC感兴趣的指针:
// buf指向相同的分配,elemtype是持久的。
// SudoG从它们自己的线程中引用,因此无法将其收集。
// TODO(dvyukov,rlh):重新考虑何时收集器可以移动分配的对象。
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
// 竞争检测器使用此位置进行同步
// 还可以防止我们超出分配范围(请参见问题9401)。
c.buf = unsafe.Pointer(c)
}
} else {
// 是指针类型 分配hchan结构体 buf单独分配
c = new(hchan)
c.buf = newarray(elem, int(size))
}
// 初始化元素类型的大小
c.elemsize = uint16(elem.size)
// 初始化元素的类型
c.elemtype = elem
// 初始化 channel 的容量
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}

创建channel的过程实际上是初始化hchan结构。其中类型信息和缓冲区长度由make语句传入,buf的大小则与元素大小和缓冲区长度共同决定。

向channel写数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc(unsafe.Pointer(&c)))
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}
...

if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

lock(&c.lock)

if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 1
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

// 2
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

if !block {
unlock(&c.lock)
return false
}

// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
  1. if c == nil判断 channel 为空 向其中发送数据将会永久阻塞
    1. if !block如果非阻塞返回 false
    2. gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)如果阻塞, gopark 会使当前 goroutine 挂起,通过 unlockf 唤醒;调用gopark时传入的unlockf为nil,会被一直休眠
  2. 在不获取锁定的情况下检查失败的非阻塞操作。如果检测到,将直接返回false
    1. !block && c.closed == 0非阻塞并且没有关闭channel
      1. (c.dataqsiz == 0 && c.recvq.first == nil)无缓冲channel并且消费者环形队列头结点为空,说明channel还没有准备好。
      2. (c.dataqsiz > 0 && c.qcount == c.dataqsiz)有缓冲channel中存储的元素数量与容量相等,容量已经满了,不能够缓存更多的。
  3. lock(&c.lock)对channel进行加锁
  4. c.closed != 0如果channel在途中关闭,unlock(&c.lock)解锁并 panic
  5. sg := c.recvq.dequeue(); sg != nil当有 goroutine 在 recvq 队列上等待时,跳过缓存队列,send(c, sg, ep, func() { unlock(&c.lock) }, 3)将消息直接发给 reciever goroutine;dequeue 从等待接受的 goroutine 队列链表获取一个sudog,goready()唤醒阻塞的 goroutine
  6. c.qcount < c.dataqsiz缓存队列未满,将消息复制到缓存队列上并移动 sendx 下标,hchan buf 数据量增加。
    1. typedmemmove(c.elemtype, qp, ep)数据拷贝到 buf 中
    2. c.sendx++index 移动
    3. c.sendx == c.dataqsiz环形队列如果已经加到最大,c.sendx = 0就置 0
    4. c.qcount++缓冲元素数量加 1
    5. unlock(&c.lock)解锁返回
  7. if !block阻塞 解锁直接返回 false
  8. gp := getg()返回指向当前goroutine的指针
  9. mysg := acquireSudog()从sudogcache中获取sudog
  10. mysg进行一系列的赋值
  11. c.sendq.enqueue(mysg) 加入到写阻塞的等待队列
  12. goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)将当前gorountine挂起休眠
  13. KeepAlive(ep)保证数据不被回收
  14. mysg != gp.waiting此时被唤醒 gp.waiting不是当前的 mysg 直接 panic
  15. gp.waiting = nil说明waiting是当前waiting,将gp中的waiting置为nil
  16. gp.param == nil唤醒时传递的参数为 nil 说明出问题了直接 panic
  17. mysg.c = nilsudog 中的 hchan 置为 nil
  18. releaseSudog(mysg)释放 sudog

从上面的可以看出来channelsend()的处理逻辑为:

  • 如果当前 Channel 的 recvq 上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前的 Goroutine 并将其设置成下一个运行的协程。
  • 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们就会直接将数据直接存储到当前缓冲 区 sendx 所在的位置上。
  • 如果都不满足上面的两种情况,就会创建一个 sudog 结构并加入 Channel 的 sendq 队 列并更新到 Goroutine 的 waiting 字段上,同时当前的 Goroutine 就会陷入阻塞等待 其他的协程向 Channel 接收数据,一旦有其它的协程向 Channel 接收数据时就会唤醒当前的 Goroutine;发送数据的过程中包含几个会触发 Goroutine 调度的时机,首先是发送数据时发现 Channel 上存在等待接收数据的 Goroutine,这是会立刻设置处理器的 runnext 属 性,但是并不会立刻触发调度,第二个时机是发送数据时并没有找到接收方并且缓冲区已经满 了,这时就会将自己加入 Channel 的 sendq 队列并立刻调用 goparkunlock 触发 Goroutine 的调度让出处理器的使用权。

直接将消息发送给reciever是怎样的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
// 写入的数据不为空
if sg.elem != nil {
// 将数据拷贝到 hchan
sendDirect(c.elemtype, sg, ep)
// sudog 中数据置为 nil
sg.elem = nil
}
// 取数 goroutine
gp := sg.g
unlockf()
// 传入 sudug 使 param 不为空
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒 goroutine
goready(gp, skip+1)
}

调用sendDirect函数将发送的消息拷贝到接收方持有的目标内存地址上,取出gp := sg.g,然后将当前sg的数据赋值回去,使用goready(gp, skip+1)将接收方 Goroutine 的状态修改成 Grunnable 并更新发送方所在处理器 P 的 runnext 属性,当处理器 P 再次发生调度时就会优先执行 runnext 中的协程。

从channel读数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.

if debugChan {
print("chanrecv: chan=", c, "\n")
}

if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}

// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not ready for receiving, we observe that the
// channel is not closed. Each of these observations is a single word-sized read
// (first c.sendq.first or c.qcount, and second c.closed).
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
//
// The order of operations is important here: reversing the operations can lead to
// incorrect behavior when racing with a close.
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

lock(&c.lock)

if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}

if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}

if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}

if !block {
unlock(&c.lock)
return false, false
}

gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
//在分配elem和排队mysg之间没有堆栈拆分
//在gp.waiting上copystack可以找到它的地方。
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}

该方法与chansend()没有太大的区别,主要讲两个不同的地方。

  • if sg := c.sendq.dequeue(); sg != nil如果有 send 生产者阻塞在队列中,recv(c, sg, ep, func() { unlock(&c.lock) }, 3)直接从 send 生产者取数据
  • c.recvq.enqueue(mysg)否则 goroutine 加入到读阻塞等待队列

所以大致逻辑为:

  • 如果 Channel 上的 sendq 队列中存在挂起的 Goroutine,就会将 recvx 索引所在的数 据拷贝到接收变量所在的内存空间上并将 sendq 队列中 Goroutine 的数据拷贝到缓冲区中。
  • 如果 Channel 的缓冲区中包含数据就会直接从 recvx 所在的索引上进行读取
  • 在默认情况下会直接挂起当前的 Goroutine,将 sudog 结构加入 recvq 队列并更新 Goroutine 的 waiting 属性,最后陷入休眠等待调度器的唤醒;在从管道中接收数据的过 程中,其实会在两个时间点触发 Goroutine 的调度,首先空的 Channel 意味着永远接收不 到消息,那么就会直接挂起当前 Goroutine,第二个时间点是缓冲区中不存在数据,在这时也 会直接挂起当前的 Goroutine 等待发送方发送数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 缓存队列不为空,直接从生产者获取数据
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
// 有 send 阻塞在这里,从 buf 中获取数据
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
// 将 buf 中未读的当前位置数据拷贝给消费者
typedmemmove(c.elemtype, ep, qp)
}
// 将阻塞的生产者数据拷贝此位置
typedmemmove(c.elemtype, qp, sg.elem)
// 接收元素索引向后移动
c.recvx++
// 环形队列如果已经加到最大就置 0
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 环形队列读取的索引位置就是写入数据环形的末端
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
// 数据置为 nil
sg.elem = nil
// 获取 SudoG 中的 goroutine 传递给 param 参数
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒 sendq 里面 SudoG 对应的 g
goready(gp, skip+1)
}

关闭channel

关闭通道设置chan关闭标志位,closed=1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}

lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}

if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}

c.closed = 1

var glist gList

// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}

// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)

// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
  1. c == nil // 关闭为 nil 的 hchan 直接 panic
  2. lock(&c.lock)获取同步锁
  3. if c.closed != 0已关闭 hchan 释放锁 panic
  4. c.closed = 1将 closed 置为 1
  5. sg := c.recvq.dequeue()释放所有的读者if sg == nil break
  6. sg := c.sendq.dequeue()释放所有的写者if sg == nil break
  7. unlock(&c.lock)释放同步锁
  8. goready(gp, 3)将接收队列和发送队列全部唤醒

三、map

Golang中map由链式哈希表实现,主要涉及创建、插入、查找、删除等基本操作,而核心
涉及到Map的冲突解决、扩容机制及迁移策略,这也是Map中最难理解的部分。

map数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// A header for a Go map.
type hmap struct {
// Note: the format of the hmap is also encoded in cmd/compile/internal/gc/reflect.go.
// Make sure this stays in sync with the compiler's definition.
count int // # live cells == size of map. Must be first (used by len() builtin)
flags uint8
B uint8 // log_2 of # of buckets (can hold up to loadFactor * 2^B items)
noverflow uint16 // approximate number of overflow buckets; see incrnoverflow for details
hash0 uint32 // hash seed
buckets unsafe.Pointer // array of 2^B Buckets. may be nil if count==0.
oldbuckets unsafe.Pointer // previous bucket array of half the size, non-nil only when growing
nevacuate uintptr // progress counter for evacuation (buckets less than this have been evacuated)
extra *mapextra // optional fields
}
  • count表示当前哈希表中的元素数量。
  • B表示当前哈希表持有的 buckets 数量,B为2的对数,2^B。
  • hash0是哈希的种子,它能为哈希函数的结果引入随机性,这个值在创建哈希表时确定,并在 调用哈希函数时作为参数传入。
  • buckets2^B个Buckets的桶
  • oldbuckets是哈希在扩容时用于保存之前buckets的字段,它的大小是当前buckets的一半。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// mapextra holds fields that are not present on all maps.
type mapextra struct {
// If both key and value do not contain pointers and are inline, then we mark bucket
// type as containing no pointers. This avoids scanning such maps.
// However, bmap.overflow is a pointer. In order to keep overflow buckets
// alive, we store pointers to all overflow buckets in hmap.extra.overflow and hmap.extra.oldoverflow.
// overflow and oldoverflow are only used if key and value do not contain pointers.
// overflow contains overflow buckets for hmap.buckets.
// oldoverflow contains overflow buckets for hmap.oldbuckets.
// The indirection allows to store a pointer to the slice in hiter.
overflow *[]*bmap
oldoverflow *[]*bmap

// nextOverflow holds a pointer to a free overflow bucket.
nextOverflow *bmap
}

type struct Bucket {
// tophash generally contains the top byte of the hash value
// for each key in this bucket. If tophash[0] < minTopHash,
// tophash[0] is a bucket evacuation state instead.
tophash [bucketCnt]uint8
// Followed by bucketCnt keys and then bucketCnt values.
// NOTE: packing all the keys together and then all the values together makes the
// code a bit more complicated than alternating key/value/key/value/... but it allows
// us to eliminate padding which would be needed for, e.g., map[int64]int8.
// Followed by an overflow pointer.
};
  • tophash是个长度为8的数组,哈希值相同的键(准确的说是哈希值低位相同的键)存入当前bucket时会将哈希值的高位存储在该数组中,以方便后续匹配。
  • overflow指针指向的是下一个bucket,据此将所有冲突的键连接起来。
  • data其中BUCKETSIZE是用宏定义的8,每个bucket中存放最多8个key/value对,存放顺序是key/key/key/…value/value/value,如此存放是为了节省 字节对齐带来的空间浪费。 如果多于8个,那么会申请一个新的bucket,并将它与之前的bucket链起来。

平衡因子选取

扩容的填充因子是多少呢?如果grow的太频繁,会造成空间的利用率很低, 如果很久才grow,会形成很多的overflow buckets,查找的效率也会下降。 这个平衡点如何选取呢(在go中,这个平衡点是有一个宏控制的(#define LOAD 6.5), 它的意思是这样的,如果table中元素的个数大于table中能容纳的元素的个数, 那么就触发一次grow动作。那么这个6.5是怎么得到的呢?作者给出了测试的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
       LOAD    %overflow  bytes/entry     hitprobe    missprobe
4.00 2.13 20.77 3.00 4.00
4.50 4.05 17.30 3.25 4.50
5.00 6.85 14.77 3.50 5.00
5.50 10.55 12.94 3.75 5.50
6.00 15.27 11.67 4.00 6.00
6.50 20.90 10.79 4.25 6.50
7.00 27.14 10.15 4.50 7.00
7.50 34.03 9.73 4.75 7.50
8.00 41.10 9.40 5.00 8.00

%overflow = percentage of buckets which have an overflow bucket
bytes/entry = overhead bytes used per key/value pair
hitprobe = # of entries to check when looking up a present key
missprobe = # of entries to check when looking up an absent key

冲突解决

就像我们之前所提到的,在通常情况下,哈希函数输入的范围一定会远远大于输出的范围,所以在使用哈希表时一定会遇到冲突,哪怕我们使用了完美的哈希函数,当输入的键足够多最终也会造成冲突。

map使用了拉链法

在一个性能比较好的哈希表中,每一个桶中都应该有 0~1 个元素,有时会有 2~3 个,很少会超过这个数量,计算哈希、定位桶和遍历链表三个过程是哈希表读写操作的主要开销,使用拉链法实现的哈希也有装载因子这一概念:装载因子 := 元素数量 / 桶数量

map创建

make([Type]Type)

当不指定map元素数量时,使用make_small函数创建hmap结构,但并不初始化桶。产生哈希种子–>返回。

1
2
3
4
5
func makemap_small() *hmap {            
h := new(hmap)
h.hash0 = fastrand() /* 创建哈希种子 */
return h
}

make([Type]Type, len)

指定元素数量,当元素数量小于8并且小于1<<B*6.5时,B = 0,此时仍然不会初始化桶指针buckets,只产生哈希种子返回,在使用的过程中初始化;其他情况设定B的值,并对桶指针buckets进行初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func makemap(t *maptype, hint int, h *hmap) *hmap {
mem, overflow := math.MulUintptr(uintptr(hint), t.bucket.size)
if overflow || mem > maxAlloc {
hint = 0
}

if h == nil {
h = new(hmap) /* 新建hmap结构 */
}
h.hash0 = fastrand() /* 产生哈希种子 */

B := uint8(0)
for overLoadFactor(hint, B) { /* 确定B的值 */
B++
}
h.B = B

if h.B != 0 { /* B != 0 时初始化桶指针buckets */
var nextOverflow *bmap
h.buckets, nextOverflow = makeBucketArray(t, h.B, nil) /* 初始化桶指针 buckets并分配空间 */
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow /* 设置溢出桶 */
}
}
return h
}

map查找过程

mapaccess1指针返回到h[k]。 永远不会返回nil,而是会参考返回零对象的值类型,如果key是没有在map上。 注:返回的指针可以保持整个map的存活,所以不要抓住它很长时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if raceenabled && h != nil {
callerpc := getcallerpc()
pc := funcPC(mapaccess1)
racereadpc(unsafe.Pointer(h), callerpc, pc)
raceReadObjectPC(t.key, key, callerpc, pc)
}
if msanenabled && h != nil {
msanread(key, t.key.size)
}
if h == nil || h.count == 0 { /* 判断哈希表中是否含有数据 */
if t.hashMightPanic() {
t.key.alg.hash(key, 0) // see issue 23734
}
return unsafe.Pointer(&zeroVal[0])
}
if h.flags&hashWriting != 0 { /* 是否并发写 */
throw("concurrent map read and map write")
}
alg := t.key.alg
hash := alg.hash(key, uintptr(h.hash0)) /* 计算键的哈希值 */
m := bucketMask(h.B) /* 1<<h.B -1 ,低B位掩码*/
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize))) /* 找到相应的桶,hash&m为第n个桶 */
if c := h.oldbuckets; c != nil {
if !h.sameSizeGrow() {
m >>= 1
}
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
if !evacuated(oldb) {
b = oldb
}
}
top := tophash(hash) /* 计算该键tophash的值 */
bucketloop:
for ; b != nil; b = b.overflow(t) { /* 依次查找桶或溢出桶的元素 */
for i := uintptr(0); i < bucketCnt; i++ { /* 依次遍历桶中的每个key, bucketCnt=8 */
if b.tophash[i] != top { /* 如果找到top值,则比较第i个key */
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize)) /* 求key地址 */
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
if alg.equal(key, k) { /* 比较键是否相等。如果相等,则找到key对应的值 */
v := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
if t.indirectvalue() {
v = *((*unsafe.Pointer)(v))
}
return v
}
}
}
return unsafe.Pointer(&zeroVal[0])
}
  1. 按key的类型采用相应的hash算法得到key的hash值。
  2. 将hash值的低位与hmpa.B取模确定bucket位置。
  3. 先比较hash值高位与bucket的tophash[i]是否相等,如果相等则再比较bucket的第i个的key与所给的key是否相等。
    1. 如果相等,则返回其对应的value。
    2. 反之,在overflowbuckets 中按照上述方法继续寻找。
  4. 如果当前处于搬迁过程,则优先从oldbuckets查找

map插入过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if h == nil {
panic(plainError("assignment to entry in nil map"))
}
if raceenabled {
callerpc := getcallerpc()
pc := funcPC(mapassign)
racewritepc(unsafe.Pointer(h), callerpc, pc)
raceReadObjectPC(t.key, key, callerpc, pc)
}
if msanenabled {
msanread(key, t.key.size)
}
if h.flags&hashWriting != 0 {
throw("concurrent map writes")
}
alg := t.key.alg
hash := alg.hash(key, uintptr(h.hash0)) //1

h.flags ^= hashWriting //给falgs上标记

if h.buckets == nil {
h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
}

again:
bucket := hash & bucketMask(h.B) //2
if h.growing() {
growWork(t, h, bucket)
}
b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))
top := tophash(hash) //取tophash

var inserti *uint8
var insertk unsafe.Pointer
var val unsafe.Pointer
bucketloop:
for {
for i := uintptr(0); i < bucketCnt; i++ { /* 遍历桶中的8个key */
if b.tophash[i] != top {
if isEmpty(b.tophash[i]) && inserti == nil { //4.找到插入的位置
inserti = &b.tophash[i]
insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
}
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey() {
k = *((*unsafe.Pointer)(k))
}
if !alg.equal(key, k) {
continue
}
if t.needkeyupdate() { //3.更新
typedmemmove(t.key, k, key)
}
val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
goto done
}
ovf := b.overflow(t)
if ovf == nil {
break
}
b = ovf
}

if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again // Growing the table invalidates everything, so try again
}

if inserti == nil {
newb := h.newoverflow(t, b) //5
inserti = &newb.tophash[0]
insertk = add(unsafe.Pointer(newb), dataOffset)
val = add(insertk, bucketCnt*uintptr(t.keysize))
}

if t.indirectkey() {
kmem := newobject(t.key)
*(*unsafe.Pointer)(insertk) = kmem
insertk = kmem
}
if t.indirectvalue() {
vmem := newobject(t.elem)
*(*unsafe.Pointer)(val) = vmem
}
typedmemmove(t.key, insertk, key)
*inserti = top
h.count++

done:
if h.flags&hashWriting == 0 {
throw("concurrent map writes")
}
h.flags &^= hashWriting
if t.indirectvalue() {
val = *((*unsafe.Pointer)(val))
}
return val
}
  1. 跟据key值算出哈希值。
  2. 取哈希值低位与hmap.B取模确定bucket位置。
  3. 取哈希值高位与hmap.B查找该key是否已经存在,如果存在则直接更新值。
  4. 如果未找到且剩下的空间为empty,则将新的键存到该位置
  5. 如果未找到且遍历完buckets,查看是否有溢出桶,若有则遍历溢出桶;如果没有溢出桶,则申请一个新的溢出桶存放该元素。

map删除过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
if raceenabled && h != nil {
callerpc := getcallerpc()
pc := funcPC(mapdelete)
racewritepc(unsafe.Pointer(h), callerpc, pc)
raceReadObjectPC(t.key, key, callerpc, pc)
}
if msanenabled && h != nil {
msanread(key, t.key.size)
}
if h == nil || h.count == 0 {
if t.hashMightPanic() {
t.key.alg.hash(key, 0) // see issue 23734
}
return
}
if h.flags&hashWriting != 0 {
throw("concurrent map writes")
}

alg := t.key.alg
hash := alg.hash(key, uintptr(h.hash0))

// Set hashWriting after calling alg.hash, since alg.hash may panic,
// in which case we have not actually done a write (delete).
h.flags ^= hashWriting

bucket := hash & bucketMask(h.B)
if h.growing() {
growWork(t, h, bucket)
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
bOrig := b
top := tophash(hash)
search:
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
break search
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
k2 := k
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
if !alg.equal(key, k2) {
continue
}
// Only clear key if there are pointers in it.
if t.indirectkey() {
*(*unsafe.Pointer)(k) = nil
} else if t.key.kind&kindNoPointers == 0 {
memclrHasPointers(k, t.key.size)
}
v := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
if t.indirectvalue() {
*(*unsafe.Pointer)(v) = nil
} else if t.elem.kind&kindNoPointers == 0 {
memclrHasPointers(v, t.elem.size)
} else {
memclrNoHeapPointers(v, t.elem.size)
}
b.tophash[i] = emptyOne
// If the bucket now ends in a bunch of emptyOne states,
// change those to emptyRest states.
// It would be nice to make this a separate function, but
// for loops are not currently inlineable.
if i == bucketCnt-1 {
if b.overflow(t) != nil && b.overflow(t).tophash[0] != emptyRest {
goto notLast
}
} else {
if b.tophash[i+1] != emptyRest {
goto notLast
}
}
for {
b.tophash[i] = emptyRest
if i == 0 {
if b == bOrig {
break // beginning of initial bucket, we're done.
}
// Find previous bucket, continue at its last entry.
c := b
for b = bOrig; b.overflow(t) != c; b = b.overflow(t) {
}
i = bucketCnt - 1
} else {
i--
}
if b.tophash[i] != emptyOne {
break
}
}
notLast:
h.count--
break search
}
}

if h.flags&hashWriting == 0 {
throw("concurrent map writes")
}
h.flags &^= hashWriting
}

哈希表的删除逻辑与写入逻辑非常相似,只是触发哈希的删除需要使用关键字,如果在删除期间遇到了哈希表的扩容,就会对即将操作的桶进行迁移,迁移结束之后会找到桶中的目标元素完成键值对的删除工作。

map扩容原理

扩容的前提条件

  1. 增量扩容:负载因子 > 6.5时,也即平均每个bucket存储的键值对达到6.5个。
1
2
3
4
// overLoadFactor reports whether count items placed in 1<<B buckets is over loadFactor.
func overLoadFactor(count int, B uint8) bool {
return count > bucketCnt && uintptr(count) > loadFactorNum*(bucketShift(B)/loadFactorDen)
}
  1. 等量扩容:overflow数量 > 2^15时,也即overflow数量超过32768时。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// tooManyOverflowBuckets reports whether noverflow buckets is too many for a map with 1<<B buckets.
// Note that most of these overflow buckets must be in sparse use;
// if use was dense, then we'd have already triggered regular map growth.
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
// If the threshold is too low, we do extraneous work.
// If the threshold is too high, maps that grow and shrink can hold on to lots of unused memory.
// "too many" means (approximately) as many overflow buckets as regular buckets.
// See incrnoverflow for more details.
if B > 15 {
B = 15
}
// The compiler doesn't see here that B < 16; mask B to generate shorter shift code.
return noverflow >= uint16(1)<<(B&15)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func hashGrow(t *maptype, h *hmap) {
bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) { /* 判断是2倍空间扩容还是等量空间扩容 */
bigger = 0
h.flags |= sameSizeGrow /* 等量空间扩容,bigger=0 */
}
oldbuckets := h.buckets
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil) /* 分配桶空间 */

flags := h.flags &^ (iterator | oldIterator) /* 将buckets和oldbuckets迭代标志置0 */
if h.flags&iterator != 0 {
flags |= oldIterator
}

h.B += bigger /* 增量扩容为h.B+1,等量扩容为h.B */
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
h.nevacuate = 0 /* 搬迁状态为0表示未进行迁移 */
h.noverflow = 0

/* 当key/value不是指针时,用extramap中的指针存储溢出桶,而不用bmap中的
* overflow。overflow表示hmap结构buckets中的溢出桶,oldoverflow表示hmap中
* oldbuckets中的溢出桶 ,nextoverflow预分配溢出桶空间 。
*/
if h.extra != nil && h.extra.overflow != nil {
if h.extra.oldoverflow != nil {
throw("oldoverflow is not nil")
}
h.extra.oldoverflow = h.extra.overflow
h.extra.overflow = nil
}
if nextOverflow != nil {
if h.extra == nil {
h.extra = new(mapextra)
}
h.extra.nextOverflow = nextOverflow
}
}

map迁移原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))) /* oldbucket为旧桶的索引 */
newbit := h.noldbuckets() /* 与原来旧桶分配的容量相等 */
if !evacuated(b) {
var xy [2]evacDst /*xy 包含x和y的 (low and high)疏散目的地.*/
x := &xy[0] /* 等量扩容或2倍扩容的前一部分(X,和原来相等) */
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
x.k = add(unsafe.Pointer(x.b), dataOffset) /* key的地址 */
x.v = add(x.k, bucketCnt*uintptr(t.keysize)) /* value得地址 */

if !h.sameSizeGrow() { //如果不是等量扩容
y := &xy[1] /* 若为2倍扩容,需要后一部分,即增长的空间 */
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize))) /* 后一部分桶的索引 */
y.k = add(unsafe.Pointer(y.b), dataOffset)
y.v = add(y.k, bucketCnt*uintptr(t.keysize))
}

for ; b != nil; b = b.overflow(t) { /* 遍历最后一个bmap及溢出桶 */
k := add(unsafe.Pointer(b), dataOffset) /* key的地址 */
v := add(k, bucketCnt*uintptr(t.keysize)) /* value的地址 */
for i := 0; i < bucketCnt; i, k, v = i+1, add(k, uintptr(t.keysize)), add(v, uintptr(t.valuesize)) { /* 遍历桶中的元素 */
top := b.tophash[i] /* 获取tophash的值 */
if isEmpty(top) { /* 如果tophash为空,标记为已被搬迁状态 */
b.tophash[i] = evacuatedEmpty
continue
}
if top < minTopHash { /* tophash的值为hash+minTopHash */
throw("bad map state")
}
k2 := k
if t.indirectkey() {
k2 = *((*unsafe.Pointer)(k2))
}
var useY uint8 /* useY用来判断是落在oldbucket还是newbit */
if !h.sameSizeGrow() { /* 如果为2倍扩容,h.B增大1,桶的位置发生变化 */
hash := t.key.alg.hash(k2, uintptr(h.hash0))
if h.flags&iterator != 0 && !t.reflexivekey() && !t.key.alg.equal(k2,k2) {
useY = top & 1
top = tophash(hash)
} else {
if hash&newbit != 0 {
useY = 1
}
}
}

/* evacuatedY = evacuatedX + 1 */
if evacuatedX+1 != evacuatedY || evacuatedX^1 != evacuatedY {
throw("bad evacuatedN")
}

b.tophash[i] = evacuatedX + useY /* 搬迁为X或者Y状态 */
dst := &xy[useY] /* useY=0表示搬迁到前半部分, 否则到后半部分*/

if dst.i == bucketCnt { /* 当桶中元素数量达到最大8时,需要溢出桶 */
dst.b = h.newoverflow(t, dst.b)
dst.i = 0
dst.k = add(unsafe.Pointer(dst.b), dataOffset)
dst.v = add(dst.k, bucketCnt*uintptr(t.keysize))
}
dst.b.tophash[dst.i&(bucketCnt-1)] = top
if t.indirectkey() {
*(*unsafe.Pointer)(dst.k) = k2 /* key为指针时,复制指针 */
} else {
typedmemmove(t.key, dst.k, k)
}
if t.indirectvalue() {
*(*unsafe.Pointer)(dst.v) = *(*unsafe.Pointer)(v) /* value为指针时,复制指针 */
} else {
typedmemmove(t.elem, dst.v, v)
}
/* 进行下一个元素的搬迁 */
dst.i++
dst.k = add(dst.k, uintptr(t.keysize))
dst.v = add(dst.v, uintptr(t.valuesize))
}
}

/* 遍历完桶后,如果没有其他goroutine使用该桶,就把该桶清空 */
if h.flags&oldIterator == 0 && t.bucket.kind&kindNoPointers == 0 {
b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
ptr := add(b, dataOffset)
n := uintptr(t.bucketsize) - dataOffset
memclrHasPointers(ptr, n)
}
}

if oldbucket == h.nevacuate {
advanceEvacuationMark(h, t, newbit)
}
}

/* 确定桶的搬迁进度,如果搬迁完成进行后续操作 */
func advanceEvacuationMark(h *hmap, t *maptype, newbit uintptr) {
h.nevacuate++
stop := h.nevacuate + 1024
if stop > newbit {
stop = newbit
}
for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) { /*如果搬迁没有完成将搬迁进度nevacuate加1 */
h.nevacuate++
}
if h.nevacuate == newbit {
h.oldbuckets = nil /* 搬迁完成,将oldbuckets置nil */
if h.extra != nil {
h.extra.oldoverflow = nil /* 溢出桶置为nil */
}
h.flags &^= sameSizeGrow /* 等量扩容位置0 */
}
}

评论

`
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×