您好,登錄后才能下訂單哦!
這篇“Go協(xié)作與搶占怎么實(shí)現(xiàn)”文章的知識(shí)點(diǎn)大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細(xì),步驟清晰,具有一定的借鑒價(jià)值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Go協(xié)作與搶占怎么實(shí)現(xiàn)”文章吧。
在介紹兩種搶占調(diào)度之前,我們首先介紹一下runtime.Gosched
函數(shù):
// Gosched yields the processor, allowing other goroutines to run. It does not // suspend the current goroutine, so execution resumes automatically. func Gosched() { checkTimeouts() mcall(gosched_m) }
根據(jù)說明,runtime.Gosched
函數(shù)會(huì)主動(dòng)放棄當(dāng)前處理器,并且允許其他協(xié)程執(zhí)行,但是起并不會(huì)暫停自己,而只是讓渡調(diào)度權(quán),之后依賴調(diào)度器獲得重新調(diào)度。
之后,會(huì)通過mcall
函數(shù)切換到g0
棧去執(zhí)行gosched_m
函數(shù):
// Gosched continuation on g0. func gosched_m(gp *g) { if trace.enabled { traceGoSched() } goschedImpl(gp) }
gosched_m
調(diào)用goschedImpl
函數(shù),其會(huì)為協(xié)程gp
讓渡出本M,并且將gp
放到全局隊(duì)列中,等待調(diào)度。
func goschedImpl(gp *g) { status := readgstatus(gp) if status&^_Gscan != _Grunning { dumpgstatus(gp) throw("bad g status") } casgstatus(gp, _Grunning, _Grunnable) dropg() // 使當(dāng)前m放棄gp,就是其參數(shù) curg lock(&sched.lock) globrunqput(gp) // 并且把gp放到全局隊(duì)列中,等待調(diào)度 unlock(&sched.lock) schedule() }
雖然runtime.Gosched
具有主動(dòng)放棄CPU的能力,但是對(duì)用戶的要求比較高,并非用戶友好的。
package main import ( "fmt" "runtime" "sync" "time" ) var once = sync.Once{} func f() { once.Do(func() { fmt.Println("I am go routine 1!") }) } func main() { defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1)) go func() { for { f() } }() time.Sleep(10 * time.Millisecond) fmt.Println("I am main goroutine!") }
我們考慮如上代碼,首先我們設(shè)置P的個(gè)數(shù)為1,然后起一個(gè)協(xié)程中進(jìn)入死循環(huán),循環(huán)調(diào)用一個(gè)函數(shù),如果沒有搶占調(diào)度,那么這個(gè)協(xié)程將一直占據(jù)P,也就是會(huì)一直占據(jù)CPU,代碼就永遠(yuǎn)不可能執(zhí)行到fmt.Println("I am main goroutine!")
這行。下面我們看看,協(xié)作式搶占是怎么避免以上問題的。
$ go tool compile -N -l main.go
$ go tool objdump main.o >> main.i
我們通過以上指令,得到2.1中代碼的匯編代碼,截取f
函數(shù)的匯編代碼如下:
TEXT "".f(SB) gofile../home/chenyiguo/smb_share/go_routine_test/main.go
main.go:12 0x151a 493b6610 CMPQ 0x10(R14), SP
main.go:12 0x151e 762b JBE 0x154b
main.go:12 0x1520 4883ec18 SUBQ $0x18, SP
main.go:12 0x1524 48896c2410 MOVQ BP, 0x10(SP)
main.go:12 0x1529 488d6c2410 LEAQ 0x10(SP), BP
main.go:13 0x152e 488d0500000000 LEAQ 0(IP), AX [3:7]R_PCREL:"".once
main.go:13 0x1535 488d1d00000000 LEAQ 0(IP), BX [3:7]R_PCREL:"".f.func1·f
main.go:13 0x153c e800000000 CALL 0x1541 [1:5]R_CALL:sync.(*Once).Do
main.go:16 0x1541 488b6c2410 MOVQ 0x10(SP), BP
main.go:16 0x1546 4883c418 ADDQ $0x18, SP
main.go:16 0x154a c3 RET
main.go:12 0x154b e800000000 CALL 0x1550 [1:5]R_CALL:runtime.morestack_noctxt
main.go:12 0x1550 ebc8 JMP "".f(SB)
其中第一行,CMPQ 0x10(R14), SP
就是比較SP
和0x10(R14)
(其實(shí)就是stackguard0
)的大小(注意AT&T
格式下CMP
系列指令的順序),當(dāng)SP
小于等于0x10(R14)
時(shí),就會(huì)調(diào)轉(zhuǎn)到0x154b
地址調(diào)用runtime.morestack_noctxt
,觸發(fā)棧擴(kuò)張操作。其實(shí)如果你仔細(xì)觀察就會(huì)發(fā)現(xiàn),所有的函數(shù)的序言(函數(shù)調(diào)用的最前方)都被插入了檢測指令,除非在函數(shù)上標(biāo)記//go:nosplit
。
接下來,我們將關(guān)注于兩點(diǎn)來打通整個(gè)鏈路,即:
棧擴(kuò)張?jiān)趺粗匦抡{(diào)度,讓出CPU的執(zhí)行權(quán)?
何時(shí)會(huì)設(shè)置棧擴(kuò)張標(biāo)記?
// morestack but not preserving ctxt. TEXT runtime·morestack_noctxt(SB),NOSPLIT,$0 MOVL $0, DX JMP runtime·morestack(SB) TEXT runtime·morestack(SB),NOSPLIT,$0-0 ... // Set g->sched to context in f. MOVQ 0(SP), AX // f's PC MOVQ AX, (g_sched+gobuf_pc)(SI) LEAQ 8(SP), AX // f's SP MOVQ AX, (g_sched+gobuf_sp)(SI) MOVQ BP, (g_sched+gobuf_bp)(SI) MOVQ DX, (g_sched+gobuf_ctxt)(SI) ... CALL runtime·newstack(SB) CALL runtime·abort(SB) // crash if newstack returns RET
以上代碼中,runtime·morestack_noctxt
調(diào)用runtime·morestack
,在runtime·morestack
中,會(huì)首先記錄協(xié)程的PC和SP,然后調(diào)用runtime.newstack
:
func newstack() { ... gp := thisg.m.curg ... stackguard0 := atomic.Loaduintptr(&gp.stackguard0) ... preempt := stackguard0 == stackPreempt ... if preempt { if gp == thisg.m.g0 { throw("runtime: preempt g0") } if thisg.m.p == 0 && thisg.m.locks == 0 { throw("runtime: g is running but p is not") } if gp.preemptShrink { // We're at a synchronous safe point now, so // do the pending stack shrink. gp.preemptShrink = false shrinkstack(gp) } if gp.preemptStop { preemptPark(gp) // never returns } // Act like goroutine called runtime.Gosched. gopreempt_m(gp) // never return } ... }
我們簡化runtime.newstack
函數(shù),總結(jié)起來就是通過現(xiàn)有工作協(xié)程的stackguard0
字段,來判斷是不是應(yīng)該發(fā)生搶占,如果需要的話,則調(diào)用gopreempt_m(gp)
函數(shù):
func gopreempt_m(gp *g) { if trace.enabled { traceGoPreempt() } goschedImpl(gp) }
可以看到,gopreempt_m
函數(shù)和前面講到Gosched
函數(shù)時(shí)說到的gosched_m
函數(shù)一樣,都將調(diào)用goschedImpl
函數(shù),為協(xié)程gp
讓渡出本M,并且將gp
放到全局隊(duì)列中,等待調(diào)度。
這里我們就明白了,一旦發(fā)生棧擴(kuò)張,就有可能會(huì)發(fā)生讓渡出執(zhí)行權(quán),進(jìn)行重新調(diào)度的可能性,那什么時(shí)候會(huì)發(fā)生棧擴(kuò)張呢?
在代碼中,將stackguard0
字段置為stackPreempt
的地方有不少,但是和我們以上場景相符的還是在后臺(tái)監(jiān)護(hù)線程sysmon
循環(huán)中,對(duì)于陷入系統(tǒng)調(diào)用和長時(shí)間運(yùn)行的goroutine
的運(yùn)行權(quán)進(jìn)行奪取的retake
函數(shù):
func sysmon() { ... for { ... // retake P's blocked in syscalls // and preempt long running G's if retake(now) != 0 { idle = 0 } else { idle++ } ... } }
func retake(now int64) uint32 { ... for i := 0; i < len(allp); i++ { ... s := _p_.status sysretake := false if s == _Prunning || s == _Psyscall { // Preempt G if it's running for too long. t := int64(_p_.schedtick) if int64(pd.schedtick) != t { pd.schedtick = uint32(t) pd.schedwhen = now } else if pd.schedwhen+forcePreemptNS <= now { // forcePreemptNS=10ms preemptone(_p_) // 在這里設(shè)置棧擴(kuò)張標(biāo)記 // In case of syscall, preemptone() doesn't // work, because there is no M wired to P. sysretake = true } } ... } unlock(&allpLock) return uint32(n) }
其中,在preemptone
函數(shù)中進(jìn)行棧擴(kuò)張標(biāo)記的設(shè)置:
func preemptone(_p_ *p) bool { mp := _p_.m.ptr() if mp == nil || mp == getg().m { return false } gp := mp.curg if gp == nil || gp == mp.g0 { return false } gp.preempt = true // Every call in a goroutine checks for stack overflow by // comparing the current stack pointer to gp->stackguard0. // Setting gp->stackguard0 to StackPreempt folds // preemption into the normal stack overflow check. gp.stackguard0 = stackPreempt // 設(shè)置棧擴(kuò)張標(biāo)記 // Request an async preemption of this P. if preemptMSupported && debug.asyncpreemptoff == 0 { _p_.preempt = true preemptM(mp) } return true }
通過以上,我們串通起了goroutine
協(xié)作式搶占的邏輯:
首先,后臺(tái)監(jiān)控線程會(huì)對(duì)運(yùn)行時(shí)間過長(≥10ms
)的協(xié)程設(shè)置棧擴(kuò)張標(biāo)記;
協(xié)程運(yùn)行到任何一個(gè)函數(shù)的序言的時(shí)候,都會(huì)首先檢查棧擴(kuò)張標(biāo)記;
如果需要進(jìn)行棧擴(kuò)張,在進(jìn)行棧擴(kuò)張的時(shí)候,會(huì)奪取這個(gè)協(xié)程的運(yùn)行權(quán),從而實(shí)現(xiàn)搶占式調(diào)度。
分析以上結(jié)論我們可以知道,上述搶占觸發(fā)邏輯有一個(gè)致命的缺點(diǎn),那就是必須要運(yùn)行到函數(shù)棧的序言部分,而這根本無法讀取以下協(xié)程的運(yùn)行權(quán),在Go的1.14版本之前,一下代碼不會(huì)打印最后一句"I am main goroutine!"
:
package main import ( "fmt" "runtime" "sync" "time" ) var once = sync.Once{} func main() { defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1)) go func() { for { once.Do(func() { fmt.Println("I am go routine 1!") }) } }() time.Sleep(10 * time.Millisecond) fmt.Println("I am main goroutine!") }
因?yàn)橐陨蠀f(xié)程中的for
循環(huán)是個(gè)死循環(huán),且并不會(huì)包含棧擴(kuò)張邏輯,所以不會(huì)讓渡出自身的執(zhí)行權(quán)。
為此,Go SDK
引入了基于信號(hào)的搶占式調(diào)度。我們注意分析上一節(jié)preemptone
函數(shù)代碼中有以下部分:
if preemptMSupported && debug.asyncpreemptoff == 0 { _p_.preempt = true preemptM(mp) }
其中preemptM
函數(shù)會(huì)發(fā)送_SIGURG
信號(hào)給需要搶占的線程:
const sigPreempt = _SIGURG func preemptM(mp *m) { // On Darwin, don't try to preempt threads during exec. // Issue #41702. if GOOS == "darwin" || GOOS == "ios" { execLock.rlock() } if atomic.Cas(&mp.signalPending, 0, 1) { if GOOS == "darwin" || GOOS == "ios" { atomic.Xadd(&pendingPreemptSignals, 1) } // If multiple threads are preempting the same M, it may send many // signals to the same M such that it hardly make progress, causing // live-lock problem. Apparently this could happen on darwin. See // issue #37741. // Only send a signal if there isn't already one pending. signalM(mp, sigPreempt) } if GOOS == "darwin" || GOOS == "ios" { execLock.runlock() } }
說到這里,我們就需要回到最開始,在第一個(gè)協(xié)程m0
開啟mstart
的調(diào)用鏈路上,會(huì)調(diào)用mstartm0
函數(shù),在這里會(huì)調(diào)用initsig
:
func initsig(preinit bool) { ... for i := uint32(0); i < _NSIG; i++ { ... handlingSig[i] = 1 setsig(i, abi.FuncPCABIInternal(sighandler)) } }
在以上,注冊了sighandler
函數(shù):
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) { ... if sig == sigPreempt && debug.asyncpreemptoff == 0 { // Might be a preemption signal. doSigPreempt(gp, c) // Even if this was definitely a preemption signal, it // may have been coalesced with another signal, so we // still let it through to the application. } ... }
然后接收到sigPreempt
信號(hào)時(shí),會(huì)通過doSigPreempt
函數(shù)處理如下:
func doSigPreempt(gp *g, ctxt *sigctxt) { // Check if this G wants to be preempted and is safe to // preempt. if wantAsyncPreempt(gp) { if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok { // Adjust the PC and inject a call to asyncPreempt. ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc) // 插入搶占調(diào)用 } } // Acknowledge the preemption. atomic.Xadd(&gp.m.preemptGen, 1) atomic.Store(&gp.m.signalPending, 0) if GOOS == "darwin" || GOOS == "ios" { atomic.Xadd(&pendingPreemptSignals, -1) } }
最終,doSigPreempt—>asyncPreempt->asyncPreempt2
:
func asyncPreempt2() { gp := getg() gp.asyncSafePoint = true if gp.preemptStop { mcall(preemptPark) } else { mcall(gopreempt_m) } gp.asyncSafePoint = false }
然后,又回到了我們熟悉的gopreempt_m
函數(shù),這里就不贅述了。
所以對(duì)于基于信號(hào)的搶占調(diào)度,總結(jié)如下:
M1發(fā)送信號(hào)_SIGURG
;
M2接收到信號(hào),并通過信號(hào)處理函數(shù)進(jìn)行處理;
M2修改執(zhí)行的上下文,并恢復(fù)到修改后的位置;
重新進(jìn)入調(diào)度循環(huán),進(jìn)而調(diào)度其他goroutine
。
以上就是關(guān)于“Go協(xié)作與搶占怎么實(shí)現(xiàn)”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對(duì)大家有幫助,若想了解更多相關(guān)的知識(shí)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。