协程抢占

协作式抢占

morestack:执行抢占

目前go实现是的协作抢占:在每个函数开头插入morestack检查,除了检查是否需要扩张栈,同时还检查是否当前协程需要抢占。那么怎么判断一个协程是否需要抢占呢?后台线程会定时扫描当前运行中的协程,如果发现一个协程运行比较久,会将其标记为抢占状态。

我们首先看一下一个main函数的汇编代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
0x0000 00000 (morestack.go:5)	TEXT	"".main(SB), $64-0
0x0000 00000 (morestack.go:5) MOVQ TLS, CX
0x0009 00009 (morestack.go:5) MOVQ (CX)(TLS*2), CX // 获取当前g
// 比较当前SP和g.stackguard0,如果小于则需要触发morestarck
0x0010 00016 (morestack.go:5) CMPQ SP, 16(CX)
0x0014 00020 (morestack.go:5) JLS 110
0x0016 00022 (morestack.go:5) SUBQ $64, SP // SP-64,相当于设置栈帧大小64字节
0x001a 00026 (morestack.go:5) MOVQ BP, 56(SP) // 保存caller的BP,可以看到BP是保存到当前函数的栈帧中的,如果一个函数栈帧大小为0,则不需要保存BP
0x001f 00031 (morestack.go:5) LEAQ 56(SP), BP // 设置当前BP
...
0x0064 00100 (morestack.go:7) MOVQ 56(SP), BP // 还原BP
0x0069 00105 (morestack.go:7) ADDQ $64, SP // SP+64,相当于销毁栈帧
0x006d 00109 (morestack.go:7) RET
0x006e 00110 (morestack.go:7) NOP
0x006e 00110 (morestack.go:5) CALL runtime.morestack_noctxt(SB)
0x0073 00115 (morestack.go:5) JMP 0

我们可以看到,进入函数之后,首先会检查当前函数的SP寄存器是否已经达到g.stackguard0,如果是的话,则需要先调用runtime.morestack_noctxt方法扩张当前函数栈(现在的实现是重新分配一个更大的函数栈,然后把旧的函数栈内容拷贝过去),然后再根据栈帧大小设置SPBP指针,而在函数返回前需要先恢复BPSP指针。上面的BP和SP寄存器的相关设置是在morestack之后,也就是在执行morestack的时候,0(SP)为函数返回地址

我们接着来看一下runtime·morestack_noctxt的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// morestack but not preserving ctxt.
// 这里noctxt表示调用方法没有context,即没有闭包或者receiver
TEXT runtime·morestack_noctxt(SB),NOSPLIT,$0
MOVL $0, DX // 清空DX的低32位,DX寄存器用于保存函数上下文
JMP runtime·morestack(SB) // 跳转到morestack方法,这里用的是JMP,不是CALL

/*
* support for morestack
*/

// Called during function prolog when more stack is needed.
//
// The traceback routines see morestack on a g0 as being
// the top of a stack (for example, morestack calling newstack
// calling the scheduler calling newm calling gc), so we must
// record an argument size. For that purpose, it has no arguments.
TEXT runtime·morestack(SB),NOSPLIT,$0-0
// Cannot grow scheduler stack (m->g0).
get_tls(CX) // 这里get_tls(r)是一个宏:MOVQ TLS, r
MOVQ g(CX), BX // 保存当前的g到BX
MOVQ g_m(BX), BX // 保存m到BX
MOVQ m_g0(BX), SI // 保存g0到SI
CMPQ g(CX), SI // 如果当前处于g0栈
JNE 3(PC) // PC+3
CALL runtime·badmorestackg0(SB) // g0栈不允许扩张
CALL runtime·abort(SB)

// Cannot grow signal stack (m->gsignal).
MOVQ m_gsignal(BX), SI // gsignal用于处理信号量的栈
CMPQ g(CX), SI
JNE 3(PC)
CALL runtime·badmorestackgsignal(SB) // gsignal栈不允许扩张
CALL runtime·abort(SB)

// Called from f:把调用morestack的函数记为f
// 保存 f's caller的信息到m.morebuf中
// 8(SP)保存f的返回地址,即f's caller的PC
MOVQ 8(SP), AX
MOVQ AX, (m_morebuf+gobuf_pc)(BX) // 设置m.morebuf.pc为f's caller的PC
// 16(SP)的地址为f's caller的SP
LEAQ 16(SP), AX // f's caller's SP
MOVQ AX, (m_morebuf+gobuf_sp)(BX)
get_tls(CX)
MOVQ g(CX), SI
MOVQ SI, (m_morebuf+gobuf_g)(BX)

// Set g->sched to context in f.
MOVQ 0(SP), AX // f's PC,morestack的frameSize为0,此时0(SP)为f的返回地址
MOVQ AX, (g_sched+gobuf_pc)(SI) // 设置g.sched.pc为f的PC
MOVQ SI, (g_sched+gobuf_g)(SI)
// 8(SP)的地址即为f的SP
LEAQ 8(SP), AX // f's SP
MOVQ AX, (g_sched+gobuf_sp)(SI) // 保存f的SP
MOVQ BP, (g_sched+gobuf_bp)(SI) // 保存f的BP
MOVQ DX, (g_sched+gobuf_ctxt)(SI) // 保存f的DX

// Call newstack on m->g0's stack.
MOVQ m_g0(BX), BX // 获取g0
MOVQ BX, g(CX) // 设置当前g为g0
MOVQ (g_sched+gobuf_sp)(BX), SP // 设置SP寄存器为g0.sched.sp
CALL runtime·newstack(SB) // 调用newstack,该方法不会返回
CALL runtime·abort(SB) // crash if newstack returns
RET

我们接着来看一下runtime.newstack这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
func newstack() {
thisg := getg() // 这里获取的是当前执行的g,实际就是g0
...
// 这里的curg是触发了morestack的g,不是g0
gp := thisg.m.curg
...
morebuf := thisg.m.morebuf
thisg.m.morebuf.pc = 0
thisg.m.morebuf.lr = 0
thisg.m.morebuf.sp = 0
thisg.m.morebuf.g = 0

// 检查是否需要抢占,当发现一个协程需要被抢占时,会将其g.stackguard0设置成stackPreempt,从而触发morestack的执行
preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
// 触发了抢占
if preempt {
// We are interested in preempting user Go code, not runtime code.
// If we're holding locks, mallocing, or preemption is disabled, don't
// preempt.
if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
// 还原gp.stackguard0
// gp->preempt is set, so it will be preempted next time.
gp.stackguard0 = gp.stack.lo + _StackGuard
// restore state from Gobuf; longjmp
gogo(&gp.sched) // never return
}
}

sp := gp.sched.sp
if sys.ArchFamily == sys.AMD64 || sys.ArchFamily == sys.I386 || sys.ArchFamily == sys.WASM {
// The call to morestack cost a word.
sp -= sys.PtrSize
}

// 再次检查抢占
if preempt {
// g0不允许被抢占
if gp == thisg.m.g0 {
throw("runtime: preempt g0")
}
if thisg.m.p == 0 && thisg.m.locks == 0 {
throw("runtime: g is running but p is not")
}
// Synchronize with scang.
// 更新状态为_Gwaiting
casgstatus(gp, _Grunning, _Gwaiting)
// gc相关,抢占g扫描
if gp.preemptscan {
for !castogscanstatus(gp, _Gwaiting, _Gscanwaiting) {
// Likely to be racing with the GC as
// it sees a _Gwaiting and does the
// stack scan. If so, gcworkdone will
// be set and gcphasework will simply
// return.
}
if !gp.gcscandone {
// gcw is safe because we're on the
// system stack.
gcw := &gp.m.p.ptr().gcw
// 扫描gp的栈
scanstack(gp, gcw)
if gcBlackenPromptly {
gcw.dispose()
}
gp.gcscandone = true
}
gp.preemptscan = false
gp.preempt = false
casfrom_Gscanstatus(gp, _Gscanwaiting, _Gwaiting)
// This clears gcscanvalid.
casgstatus(gp, _Gwaiting, _Grunning)
gp.stackguard0 = gp.stack.lo + _StackGuard
gogo(&gp.sched) // never return
}

// Act like goroutine called runtime.Gosched.
casgstatus(gp, _Gwaiting, _Grunning)
// 这里执行抢占,实际上就是调用schedule方法,该方法不会返回
gopreempt_m(gp) // never return
}

// 如果不是由于抢占而执行morestack,那么就是真的因为栈不够用了,需要扩容栈
oldsize := gp.stack.hi - gp.stack.lo
newsize := oldsize * 2 // 新的栈是原来的两倍
if newsize > maxstacksize { // 栈是有限制的哦:1<<20
print("runtime: goroutine stack exceeds ", maxstacksize, "-byte limit\n")
throw("stack overflow") // 栈溢出了
}

// The goroutine must be executing in order to call newstack,
// so it must be Grunning (or Gscanrunning).
// 设置g的状态
casgstatus(gp, _Grunning, _Gcopystack)

// The concurrent GC will not scan the stack while we are doing the copy since
// the gp is in a Gcopystack status.
// coypstack会创建一个新的栈,然后把旧的栈的内容拷到新的栈中
copystack(gp, newsize, true)
if stackDebug >= 1 {
print("stack grow done\n")
}
// 可以开始跑了
casgstatus(gp, _Gcopystack, _Grunning)
gogo(&gp.sched) // gogo开始跑了
}
sysmon:抢占标记

那么,我们的协程是什么时候被标记为可抢占的呢?当然是后台线程sysmon的功劳了,sysmon的工作之一就是找出持续运行很久的协程,然后把他标记为可抢占:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Always runs without a P, so write barriers are not allowed.
//
//go:nowritebarrierrec
func sysmon() {
lock(&sched.lock)
sched.nmsys++
checkdead()
unlock(&sched.lock)

// If a heap span goes unused for 5 minutes after a garbage collection,
// we hand it back to the operating system.
scavengelimit := int64(5 * 60 * 1e9)

if debug.scavenge > 0 {
// Scavenge-a-lot for testing.
forcegcperiod = 10 * 1e6
scavengelimit = 20 * 1e6
}

lastscavenge := nanotime()
nscavenge := 0

lasttrace := int64(0)
idle := 0 // how many cycles in succession we had not wokeup somebody
delay := uint32(0)
for {
if idle == 0 { // start with 20us sleep...
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { // up to 10ms
delay = 10 * 1000
}
usleep(delay)
...
// 注释很清楚了
// retake P's blocked in syscalls
// and preempt long running G's
if retake(now) != 0 {
idle = 0
} else {
idle++
}
...
}
}

func retake(now int64) uint32 {
n := 0
// 加锁
lock(&allpLock)
// 遍历p列表
for i := 0; i < len(allp); i++ {
_p_ := allp[i]
if _p_ == nil {
// This can happen if procresize has grown
// allp but not yet created new Ps.
continue
}
pd := &_p_.sysmontick
s := _p_.status
if s == _Psyscall {
// 这里是处理系统调用时的P
// 如果系统调用阻塞到一定时长,并且当前有其他g可执行时,考虑将_Psyscall的p夺回过来,标记为_Pidle
...
} else if s == _Prunning {
// 如果当前P已经在同一个G上运行很久了,标记抢占
// Preempt G if it's running for too long.
t := int64(_p_.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
continue
}
if pd.schedwhen+forcePreemptNS > now {
continue
}
// 抢占
preemptone(_p_)
}
}
unlock(&allpLock)
return uint32(n)
}


func preemptone(_p_ *p) bool {
mp := _p_.m.ptr()
if mp == nil || mp == getg().m {
return false
}
gp := mp.curg
if gp == nil || gp == mp.g0 {
return false
}

// 设置抢占标志位
gp.preempt = true

// Every call in a go routine checks for stack overflow by
// comparing the current stack pointer to gp->stackguard0.
// Setting gp->stackguard0 to StackPreempt folds
// preemption into the normal stack overflow check.
// 设置gp.stackguard0 = stackPreempt,从而能够触发morestack
gp.stackguard0 = stackPreempt
return true
}

至此,我们对go中的协作式抢占机制已经很明了了,但是,抢占只是标记一下,你说任你说,我不一定照做啊。

缺陷

之所以在函数调用的时候执行检查是因为函数切换的时候是一个safe point,这时候的通用寄存器是空的,切换协程不需要保存这些寄存器,而且栈中的root pointer是确定的,能够精确执行gc扫描。

然而,如果有一个协程没有发生函数调用,比如下面这段代码:

1
2
3
4
i:=0
for {
i++
}

这个协程就没有机会被抢占,也就不会让出cpu

开始执行gc的时候先暂停所有协程的执行,然后再执行stop the world开启写屏障。这时候假如我们有一个协程跑的是类似上面的代码,那么我们的程序就会直接卡死:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
go func() {
i := 0
// 该代码导致当前协程不会让出cpu
for {
i++
}
}()
// sleep 1s,确保子协程开始执行
time.Sleep(time.Second)
log.Println("开始触发gc")
// 触发gc
runtime.GC()
log.Println("手动gc完成")
}

因为其他协程已经被暂停,而最后这个协程由于没有发生函数调用,无法执行抢占操作,从而垃圾收集器会一直处于等待,gc也一直不会被触发。当然我们平时的业务代码一般不会有这种情况发生,但是如果有一个函数需要执行比较久才会被抢占,那么就会导致gc的延时,同时也会严重影响程序的吞吐量。

go:nosplit

TODO

非协作式抢占

因为协作式抢占存在很明显的缺点:抢占不及时,有一种解决方案是在函数中插入一些细粒度更小的抢占检查点,但是这会对性能产生影响,比如每次循环都需要额外执行一次分支判断。

因此有人提出来非协作式抢占来弥补这个缺陷,能够在任意指令触发抢占。非协作式抢占的主要难点在于如何满足gc安全,抢占应该发生在safe point