您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關(guān)Go語言中怎么調(diào)度循環(huán)源碼,小編覺得挺實用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
提到"調(diào)度",我們首先想到的就是操作系統(tǒng)對進程、線程的調(diào)度。操作系統(tǒng)調(diào)度器會將系統(tǒng)中的多個線程按照一定算法調(diào)度到物理CPU上去運行。雖然線程比較輕量,但是在調(diào)度時也有比較大的額外開銷。每個線程會都占用 1M 以上的內(nèi)存空間,線程切換和恢復(fù)寄存器中的內(nèi)容也需要向系統(tǒng)申請資源。
Go 語言的 Goroutine 可以看作對 thread 加的一層抽象,它更輕量級,不僅減少了上下文切換帶來的額外開銷,Goroutine 占用的資源也會更少。如創(chuàng)建一個 Goroutine 的棧內(nèi)存消耗為 2 KB,而 thread 占用 1M 以上空間;thread 創(chuàng)建和銷毀是內(nèi)核級的,所以都會有巨大的消耗,而 Goroutine 由 Go runtime 負責(zé)管理的,創(chuàng)建和銷毀的消耗非常??;Goroutine 的切換成本也比 thread 要小得多。
Go 的調(diào)度器使用三個結(jié)構(gòu)體來實現(xiàn) Goroutine 的調(diào)度:G M P。
G:代表一個 Goroutine,每個 Goroutine 都有自己獨立的棧存放當(dāng)前的運行內(nèi)存及狀態(tài)??梢园岩粋€ G 當(dāng)做一個任務(wù),當(dāng) Goroutine 被調(diào)離 CPU 時,調(diào)度器代碼負責(zé)把 CPU 寄存器的值保存在 G 對象的成員變量之中,當(dāng) Goroutine 被調(diào)度起來運行時,調(diào)度器代碼又負責(zé)把 G 對象的成員變量所保存的寄存器的值恢復(fù)到 CPU 的寄存器。
M:表示內(nèi)核線程,它本身就與一個內(nèi)核線程進行綁定,每個工作線程都有唯一的一個 M 結(jié)構(gòu)體的實例對象與之對應(yīng)。M 結(jié)構(gòu)體對象除了記錄著工作線程的諸如棧的起止位置、當(dāng)前正在執(zhí)行的Goroutine 以及是否空閑等等狀態(tài)信息之外,還通過指針維持著與 P 結(jié)構(gòu)體的實例對象之間的綁定關(guān)系。
P:代表一個虛擬的 Processor 處理器,它維護一個局部 Goroutine 可運行 G 隊列,工作線程優(yōu)先使用自己的局部運行隊列,只有必要時才會去訪問全局運行隊列,這大大減少了鎖沖突,提高了工作線程的并發(fā)性。每個 G 要想真正運行起來,首先需要被分配一個 P。
除了上面三個結(jié)構(gòu)體以外,還有一個存放所有Runnable 可運行 Goroutine 的容器 schedt。每個Go程序中schedt結(jié)構(gòu)體只有一個實例對象,在代碼中是一個共享的全局變量,每個工作線程都可以訪問它以及它所擁有的 Goroutine 運行隊列。
下面是G、P、M以及schedt中的全局隊列的關(guān)系:
從上圖可以看出,每個 m 都綁定了一個 P,每個 P 都有一個私有的本地 Goroutine 隊列,m對應(yīng)的線程從本地和全局 Goroutine 隊列中獲取 Goroutine 并運行,綠色的 G 代表正在運行的 G。
在默認情況下,運行時會將 GOMAXPROCS
設(shè)置成當(dāng)前機器的核數(shù),假設(shè)一個四核機器會創(chuàng)建四個活躍的操作系統(tǒng)線程,每一個線程都對應(yīng)一個運行時中的 M。
G M P 結(jié)構(gòu)體定義于src/runtime/runtime2.go
type g struct { // 當(dāng)前 Goroutine 的棧內(nèi)存范圍 [stack.lo, stack.hi) stack stack // 用于調(diào)度器搶占式調(diào)度 stackguard0 uintptr _panic *_panic _defer *_defer // 當(dāng)前 Goroutine 占用的線程 m *m // 存儲 Goroutine 的調(diào)度相關(guān)的數(shù)據(jù) sched gobuf // Goroutine 的狀態(tài) atomicstatus uint32 // 搶占信號 preempt bool // preemption signal, duplicates stackguard0 = stackpreempt // 搶占時將狀態(tài)修改成 `_Gpreempted` preemptStop bool // transition to _Gpreempted on preemption; otherwise, just deschedule // 在同步安全點收縮棧 preemptShrink bool // shrink stack at synchronous safe point ... }
下面看看gobuf結(jié)構(gòu)體,主要在調(diào)度器保存或者恢復(fù)上下文的時候用到:
type gobuf struct { // 棧指針 sp uintptr // 程序計數(shù)器 pc uintptr // gobuf對應(yīng)的Goroutine g guintptr // 系統(tǒng)調(diào)用的返回值 ret sys.Uintreg ... }
在執(zhí)行過程中,G可能處于以下幾種狀態(tài):
const ( // 剛剛被分配并且還沒有被初始化 _Gidle = iota // 0 // 沒有執(zhí)行代碼,沒有棧的所有權(quán),存儲在運行隊列中 _Grunnable // 1 // 可以執(zhí)行代碼,擁有棧的所有權(quán),被賦予了內(nèi)核線程 M 和處理器 P _Grunning // 2 // 正在執(zhí)行系統(tǒng)調(diào)用,擁有棧的所有權(quán),沒有執(zhí)行用戶代碼, // 被賦予了內(nèi)核線程 M 但是不在運行隊列上 _Gsyscall // 3 // 由于運行時而被阻塞,沒有執(zhí)行用戶代碼并且不在運行隊列上, // 但是可能存在于 Channel 的等待隊列上 _Gwaiting // 4 // 表示當(dāng)前goroutine沒有被使用,沒有執(zhí)行代碼,可能有分配的棧 _Gdead // 6 // 棧正在被拷貝,沒有執(zhí)行代碼,不在運行隊列上 _Gcopystack // 8 // 由于搶占而被阻塞,沒有執(zhí)行用戶代碼并且不在運行隊列上,等待喚醒 _Gpreempted // 9 // GC 正在掃描??臻g,沒有執(zhí)行代碼,可以與其他狀態(tài)同時存在 _Gscan = 0x1000 ... )
上面的狀態(tài)看起來很多,但是實際上只需要關(guān)注下面幾種就好了:
等待中:_ Gwaiting、_Gsyscall 和 _Gpreempted,這幾個狀態(tài)表示G沒有在執(zhí)行;
可運行:_Grunnable,表示G已經(jīng)準備就緒,可以在線程運行;
運行中:_Grunning,表示G正在運行;
Go 語言并發(fā)模型中的 M 是操作系統(tǒng)線程,最多只會有 GOMAXPROCS
個活躍線程能夠正常運行。
type m struct { // 持有調(diào)度棧的 Goroutine g0 *g // 處理 signal 的 G gsignal *g // 線程本地存儲 thread-local tls [6]uintptr // thread-local storage (for x86 extern register) // 當(dāng)前運行的G curg *g // current running goroutine caughtsig guintptr // goroutine running during fatal signal // 正在運行代碼的P p puintptr // attached p for executing go code (nil if not executing go code) nextp puintptr // 之前使用的P oldp puintptr ... }
調(diào)度器中的處理器 P 是線程 M 和 G 的中間層,用于調(diào)度 G 在 M 上執(zhí)行。
type p struct { id int32 // p 的狀態(tài) status uint32 // 調(diào)度器調(diào)用會+1 schedtick uint32 // incremented on every scheduler call // 系統(tǒng)調(diào)用會+1 syscalltick uint32 // incremented on every system call // 對應(yīng)關(guān)聯(lián)的 M m muintptr mcache *mcache pcache pageCache // defer 結(jié)構(gòu)池 deferpool [5][]*_defer deferpoolbuf [5][32]*_defer // 可運行的 Goroutine 隊列,可無鎖訪問 runqhead uint32 runqtail uint32 runq [256]guintptr // 緩存可立即執(zhí)行的 G runnext guintptr // 可用的 G 列表,G 狀態(tài)等于 Gdead gFree struct { gList n int32 } ... }
下面看看P的幾個狀態(tài):
const ( // 表示P沒有運行用戶代碼或者調(diào)度器 _Pidle = iota // 被線程 M 持有,并且正在執(zhí)行用戶代碼或者調(diào)度器 _Prunning // 沒有執(zhí)行用戶代碼,當(dāng)前線程陷入系統(tǒng)調(diào)用 _Psyscall // 被線程 M 持有,當(dāng)前處理器由于垃圾回收 STW 被停止 _Pgcstop // 當(dāng)前處理器已經(jīng)不被使用 _Pdead )
sched 我們在上面也提到了,主要存放了調(diào)度器持有的全局資源,如空閑的 P 鏈表、 G 的全局隊列等。
type schedt struct { ... lock mutex // 空閑的 M 列表 midle muintptr // 空閑的 M 列表數(shù)量 nmidle int32 // 下一個被創(chuàng)建的 M 的 id mnext int64 // 能擁有的最大數(shù)量的 M maxmcount int32 // 空閑 p 鏈表 pidle puintptr // idle p's // 空閑 p 數(shù)量 npidle uint32 // 處于 spinning 狀態(tài)的 M 的數(shù)量 nmspinning uint32 // 全局 runnable G 隊列 runq gQueue runqsize int32 // 有效 dead G 的全局緩存. gFree struct { lock mutex stack gList // Gs with stacks noStack gList // Gs without stacks n int32 } // sudog 結(jié)構(gòu)的集中緩存 sudoglock mutex sudogcache *sudog // defer 結(jié)構(gòu)的池 deferlock mutex deferpool [5]*_defer ... }
這里還是借助dlv來進行調(diào)試。有關(guān) dlv 如何斷點匯編的內(nèi)容我在這一篇:https://www.luozhiyun.com/archives/434 《詳解Go中內(nèi)存分配源碼實現(xiàn)》已經(jīng)有很詳細的介紹了,感興趣的可以去看看。需要注意的是這里有個坑,下面的例子是在Linux中進行的。
首先我們寫一個非常簡單的例子:
package main import "fmt" func main() { fmt.Println("hello world") }
然后進行構(gòu)建:
go build main.go dlv exec ./main
開打程序后按步驟輸入下面的命令:
(dlv) r Process restarted with PID 33191 (dlv) list > _rt0_amd64_linux() /usr/local/go/src/runtime/rt0_linux_amd64.s:8 (PC: 0x4648c0) Warning: debugging optimized function Warning: listing may not match stale executable 3: // license that can be found in the LICENSE file. 4: 5: #include "textflag.h" 6: 7: TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8 => 8: JMP _rt0_amd64(SB) 9: 10: TEXT _rt0_amd64_linux_lib(SB),NOSPLIT,$0 11: JMP _rt0_amd64_lib(SB) (dlv) si > _rt0_amd64() /usr/local/go/src/runtime/asm_amd64.s:15 (PC: 0x4613e0) Warning: debugging optimized function Warning: listing may not match stale executable 10: // _rt0_amd64 is common startup code for most amd64 systems when using 11: // internal linking. This is the entry point for the program from the 12: // kernel for an ordinary -buildmode=exe program. The stack holds the 13: // number of arguments and the C-style argv. 14: TEXT _rt0_amd64(SB),NOSPLIT,$-8 => 15: MOVQ 0(SP), DI // argc 16: LEAQ 8(SP), SI // argv 17: JMP runtime·rt0_go(SB) 18: 19: // main is common startup code for most amd64 systems when using 20: // external linking. The C startup code will call the symbol "main" (dlv)
通過上面的斷點可以知道在linux amd64系統(tǒng)的啟動函數(shù)是在asm_amd64.s的runtime·rt0_go函數(shù)中。當(dāng)然,不同的平臺有不同的程序入口,感興趣的同學(xué)可以自行去了解。
下面我們看看runtime·rt0_go
:
TEXT runtime·rt0_go(SB),NOSPLIT,$0 ... // 初始化執(zhí)行文件的絕對路徑 CALL runtime·args(SB) // 初始化 CPU 個數(shù)和內(nèi)存頁大小 CALL runtime·osinit(SB) // 調(diào)度器初始化 CALL runtime·schedinit(SB) // 創(chuàng)建一個新的 goroutine 來啟動程序 MOVQ $runtime·mainPC(SB), AX // entry // 新建一個 goroutine,該 goroutine 綁定 runtime.main CALL runtime·newproc(SB) // 啟動M,開始調(diào)度goroutine CALL runtime·mstart(SB) ...
上面的CALL方法中:
schedinit進行各種運行時組件初始化工作,這包括我們的調(diào)度器與內(nèi)存分配器、回收器的初始化;
newproc負責(zé)根據(jù)主 G 入口地址創(chuàng)建可被運行時調(diào)度的執(zhí)行單元;
mstart開始啟動調(diào)度器的調(diào)度循環(huán);
func schedinit() { ... _g_ := getg() ... // 最大線程數(shù)10000 sched.maxmcount = 10000 // M0 初始化 mcommoninit(_g_.m, -1) ... // 垃圾回收器初始化 gcinit() sched.lastpoll = uint64(nanotime()) // 通過 CPU 核心數(shù)和 GOMAXPROCS 環(huán)境變量確定 P 的數(shù)量 procs := ncpu if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 { procs = n } // P 初始化 if procresize(procs) != nil { throw("unknown runnable goroutine during bootstrap") } ... }
schedinit函數(shù)會將 maxmcount 設(shè)置成10000,這也就是一個 Go 語言程序能夠創(chuàng)建的最大線程數(shù)。然后調(diào)用 mcommoninit 對 M0 進行初始化,通過 CPU 核心數(shù)和 GOMAXPROCS 環(huán)境變量確定 P 的數(shù)量之后就會調(diào)用 procresize 函數(shù)對 P 進行初始化。
func mcommoninit(mp *m, id int64) { _g_ := getg() ... lock(&sched.lock) // 如果傳入id小于0,那么id則從mReserveID獲取,初次從mReserveID獲取id為0 if id >= 0 { mp.id = id } else { mp.id = mReserveID() } //random初始化,用于竊取 G mp.fastrand[0] = uint32(int64Hash(uint64(mp.id), fastrandseed)) mp.fastrand[1] = uint32(int64Hash(uint64(cputicks()), ^fastrandseed)) if mp.fastrand[0]|mp.fastrand[1] == 0 { mp.fastrand[1] = 1 } // 創(chuàng)建用于信號處理的gsignal,只是簡單的從堆上分配一個g結(jié)構(gòu)體對象,然后把棧設(shè)置好就返回了 mpreinit(mp) if mp.gsignal != nil { mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard } // 把 M 掛入全局鏈表allm之中 mp.alllink = allm ... }
這里傳入的 id 是-1,初次調(diào)用會將 id 設(shè)置為 0,這里并未對m0做什么關(guān)于調(diào)度相關(guān)的初始化,所以可以簡單的認為這個函數(shù)只是把m0放入全局鏈表allm之中就返回了。
runtime.procresize
var allp []*p func procresize(nprocs int32) *p { // 獲取先前的 P 個數(shù) old := gomaxprocs // 更新統(tǒng)計信息 now := nanotime() if sched.procresizetime != 0 { sched.totaltime += int64(old) * (now - sched.procresizetime) } sched.procresizetime = now // 根據(jù) runtime.MAXGOPROCS 調(diào)整 p 的數(shù)量,因為 runtime.MAXGOPROCS 用戶可以自行設(shè)定 if nprocs > int32(len(allp)) { lock(&allpLock) if nprocs <= int32(cap(allp)) { allp = allp[:nprocs] } else { nallp := make([]*p, nprocs) copy(nallp, allp[:cap(allp)]) allp = nallp } unlock(&allpLock) } // 初始化新的 P for i := old; i < nprocs; i++ { pp := allp[i] // 為空,則申請新的 P 對象 if pp == nil { pp = new(p) } pp.init(i) atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp)) } _g_ := getg() // P 不為空,并且 id 小于 nprocs ,那么可以繼續(xù)使用當(dāng)前 P if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs { // continue to use the current P _g_.m.p.ptr().status = _Prunning _g_.m.p.ptr().mcache.prepareForSweep() } else { // 釋放當(dāng)前 P,因為已失效 if _g_.m.p != 0 { _g_.m.p.ptr().m = 0 } _g_.m.p = 0 p := allp[0] p.m = 0 p.status = _Pidle // P0 綁定到當(dāng)前的 M0 acquirep(p) } // 從未使用的 P 釋放資源 for i := nprocs; i < old; i++ { p := allp[i] p.destroy() // 不能釋放 p 本身,因為他可能在 m 進入系統(tǒng)調(diào)用時被引用 } // 釋放完 P 之后重置allp的長度 if int32(len(allp)) != nprocs { lock(&allpLock) allp = allp[:nprocs] unlock(&allpLock) } var runnablePs *p // 將沒有本地任務(wù)的 P 放到空閑鏈表中 for i := nprocs - 1; i >= 0; i-- { p := allp[i] // 當(dāng)前正在使用的 P 略過 if _g_.m.p.ptr() == p { continue } // 設(shè)置狀態(tài)為 _Pidle p.status = _Pidle // P 的任務(wù)列表是否為空 if runqempty(p) { // 放入到空閑列表中 pidleput(p) } else { // 獲取空閑 M 綁定到 P 上 p.m.set(mget()) // p.link.set(runnablePs) runnablePs = p } } stealOrder.reset(uint32(nprocs)) var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32 atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs)) return runnablePs }
procresize方法的執(zhí)行過程如下:
allp 是全局變量 P 的資源池,如果 allp 的切片中的處理器數(shù)量少于期望數(shù)量,會對切片進行擴容;
擴容的時候會使用 new 申請一個新的 P ,然后使用 init 初始化,需要注意的是初始化的 P 的 id 就是傳入的 i 的值,狀態(tài)為 _Pgcstop;
然后通過 _g_.m.p
獲取 M0,如果 M0 已與有效的 P 綁定上,則將 被綁定的 P 的狀態(tài)修改為 _Prunning。否則獲取 allp[0]
作為 P0 調(diào)用 runtime.acquirep
與 M0 進行綁定;
超過處理器個數(shù)的 P 通過p.destroy
釋放資源,p.destroy
會將與 P 相關(guān)的資源釋放,并將 P 狀態(tài)設(shè)置為 _Pdead;
通過截斷改變?nèi)肿兞?allp 的長度保證與期望處理器數(shù)量相等;
遍歷 allp 檢查 P 的是否處于空閑狀態(tài),是的話放入到空閑列表中;
P.init
func (pp *p) init(id int32) { // 設(shè)置id pp.id = id // 設(shè)置狀態(tài)為 _Pgcstop pp.status = _Pgcstop // 與 sudog 相關(guān) pp.sudogcache = pp.sudogbuf[:0] for i := range pp.deferpool { pp.deferpool[i] = pp.deferpoolbuf[i][:0] } pp.wbBuf.reset() // mcache 初始化 if pp.mcache == nil { if id == 0 { if mcache0 == nil { throw("missing mcache?") } pp.mcache = mcache0 } else { pp.mcache = allocmcache() } } ... lockInit(&pp.timersLock, lockRankTimers) }
這里會初始化一些 P 的字段值,如設(shè)置 id、status、sudogcache、mcache、lock相關(guān) 。
初始化 sudogcache 這個字段存的是 sudog 的集合與 Channel 相關(guān),可以看這里:多圖詳解Go中的Channel源碼 https://www.luozhiyun.com/archives/427。
每個 P 中會保存相應(yīng)的 mcache ,能快速的進行分配微對象和小對象的分配,具體的可以看這里:詳解Go中內(nèi)存分配源碼實現(xiàn) https://www.luozhiyun.com/archives/434。
下面再來看看 runtime.acquirep
是如何將 P 與 M 綁定的:
runtime.acquirep
func acquirep(_p_ *p) { wirep(_p_) ... } func wirep(_p_ *p) { _g_ := getg() ... // 將 P 與 M 相互綁定 _g_.m.p.set(_p_) _p_.m.set(_g_.m) // 設(shè)置 P 狀態(tài)為 _Prunning _p_.status = _Prunning }
這個方法十分簡單,就不解釋了。下面再看看 runtime.pidleput
將 P 放入空閑列表:
func pidleput(_p_ *p) { // 如果 P 運行隊列不為空,那么不能放入空閑列表 if !runqempty(_p_) { throw("pidleput: P has non-empty run queue") } // 將 P 與 pidle 列表關(guān)聯(lián) _p_.link = sched.pidle sched.pidle.set(_p_) atomic.Xadd(&sched.npidle, 1) // TODO: fast atomic }
從匯編可以知道執(zhí)行完runtime·schedinit
后就會執(zhí)行 runtime.newproc
是創(chuàng)建G的入口。
runtime.newproc
func newproc(siz int32, fn *funcval) { argp := add(unsafe.Pointer(&fn), sys.PtrSize) // 獲取當(dāng)前的 G gp := getg() // 獲取調(diào)用者的程序計數(shù)器 PC pc := getcallerpc() systemstack(func() { // 獲取新的 G 結(jié)構(gòu)體 newg := newproc1(fn, argp, siz, gp, pc) _p_ := getg().m.p.ptr() // 將 G 加入到 P 的運行隊列 runqput(_p_, newg, true) // mainStarted 為 True 表示主M已經(jīng)啟動 if mainStarted { // 喚醒新的 P 執(zhí)行 G wakep() } }) }
runtime.newproc
會獲取 當(dāng)前 G 以及調(diào)用方的程序計數(shù)器,然后調(diào)用 newproc1 獲取新的 G 結(jié)構(gòu)體;然后將 G 放入到 P 的 runnext 字段中。
runtime.newproc1
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g { _g_ := getg() if fn == nil { _g_.m.throwing = -1 // do not dump full stacks throw("go of nil func value") } // 加鎖,禁止 G 的 M 被搶占 acquirem() // disable preemption because it can be holding p in a local var siz := narg siz = (siz + 7) &^ 7 _p_ := _g_.m.p.ptr() // 從 P 的空閑列表 gFree 查找空閑 G newg := gfget(_p_) if newg == nil { // 創(chuàng)建一個棧大小為 2K 大小的 G newg = malg(_StackMin) // CAS 改變 G 狀態(tài)為 _Gdead casgstatus(newg, _Gidle, _Gdead) // 將 G 加入到全局 allgs 列表中 allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack. } ... // 計算運行空間大小 totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign sp := newg.stack.hi - totalSize spArg := sp ... if narg > 0 { // 從 argp 參數(shù)開始的位置,復(fù)制 narg 個字節(jié)到 spArg(參數(shù)拷貝) memmove(unsafe.Pointer(spArg), argp, uintptr(narg)) ... } // 清理、創(chuàng)建并初始化的 G memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) newg.sched.sp = sp newg.stktopsp = sp newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function newg.sched.g = guintptr(unsafe.Pointer(newg)) gostartcallfn(&newg.sched, fn) newg.gopc = callerpc newg.ancestors = saveAncestors(callergp) newg.startpc = fn.fn if _g_.m.curg != nil { newg.labels = _g_.m.curg.labels } if isSystemGoroutine(newg, false) { atomic.Xadd(&sched.ngsys, +1) } // 將 G 狀態(tài)CAS為 _Grunnable 狀態(tài) casgstatus(newg, _Gdead, _Grunnable) newg.goid = int64(_p_.goidcache) _p_.goidcache++ ... // 釋放鎖,對應(yīng)上面 acquirem releasem(_g_.m) return newg }
newproc1函數(shù)比較長,下面總結(jié)一下主要做了哪幾件事:
從 P 的空閑列表 gFree 查找空閑 G;
如果獲取不到 G ,那么調(diào)用 malg 創(chuàng)建創(chuàng)建一個新的 G ,需要注意的是 _StackMin 為2048,表示創(chuàng)建的 G 的棧上內(nèi)存占用為2K。然后 CAS 改變 G 狀態(tài)為 _Gdead,并加入到全局 allgs 列表中;
根據(jù)要執(zhí)行函數(shù)的入口地址和參數(shù),初始化執(zhí)行棧的 SP 和參數(shù)的入棧位置,調(diào)用 memmove 進行參數(shù)拷貝;
清理、創(chuàng)建并初始化的 G,將 G 狀態(tài)CAS為 _Grunnable 狀態(tài),返回;
下面看看 runtime.gfget
是如何查找 G:
runtime.gfget
func gfget(_p_ *p) *g { retry: // 如果 P 的空閑列表 gFree 為空,sched 的的空閑列表 gFree 不為空 if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) { lock(&sched.gFree.lock) // 從sched 的 gFree 列表中移動 32 個到 P 的 gFree 中 for _p_.gFree.n < 32 { gp := sched.gFree.stack.pop() if gp == nil { gp = sched.gFree.noStack.pop() if gp == nil { break } } sched.gFree.n-- _p_.gFree.push(gp) _p_.gFree.n++ } unlock(&sched.gFree.lock) goto retry } // 此時如果 gFree 列表還是為空,返回空 gp := _p_.gFree.pop() if gp == nil { return nil } ... return gp }
當(dāng) P 的空閑列表 gFree 為空時會從 sched 持有的空閑列表 gFree 轉(zhuǎn)移32個 G 到當(dāng)前的 P 的空閑列表上;
然后從 P 的 gFree 列表頭返回一個 G;
當(dāng) newproc 運行完 newproc1 后會調(diào)用 runtime.runqput
將 G 放入到運行列表中:
runtime.runqput
func runqput(_p_ *p, gp *g, next bool) { if randomizeScheduler && next && fastrand()%2 == 0 { next = false } if next { retryNext: // 將 G 放入到 runnext 中作為下一個處理器執(zhí)行的任務(wù) oldnext := _p_.runnext if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) { goto retryNext } if oldnext == 0 { return } // 將原來 runnext 的 G 放入到運行隊列中 gp = oldnext.ptr() } retry: h := atomic.LoadAcq(&_p_.runqhead) t := _p_.runqtail // 放入到 P 本地運行隊列中 if t-h < uint32(len(_p_.runq)) { _p_.runq[t%uint32(len(_p_.runq))].set(gp) atomic.StoreRel(&_p_.runqtail, t+1) return } // P 本地隊列放不下了,放入到全局的運行隊列中 if runqputslow(_p_, gp, h, t) { return } goto retry }
runtime.runqput
會根據(jù) next 來判斷是否要將 G 放入到 runnext 中;
next 為 false 的時候會將傳入的 G 嘗試放入到本地隊列中,本地隊列時一個大小為256的環(huán)形鏈表,如果放不下了則調(diào)用 runqputslow
函數(shù)將 G 放入到全局隊列的 runq
中。
我們繼續(xù)回到runtime·rt0_go
中,在初始化工作完成后,會調(diào)用runtime·mstart
開始調(diào)度 G
TEXT runtime·rt0_go(SB),NOSPLIT,$0 ... // 初始化執(zhí)行文件的絕對路徑 CALL runtime·args(SB) // 初始化 CPU 個數(shù)和內(nèi)存頁大小 CALL runtime·osinit(SB) // 調(diào)度器初始化 CALL runtime·schedinit(SB) // 創(chuàng)建一個新的 goroutine 來啟動程序 MOVQ $runtime·mainPC(SB), AX // entry // 新建一個 goroutine,該 goroutine 綁定 runtime.main CALL runtime·newproc(SB) // 啟動M,開始調(diào)度goroutine CALL runtime·mstart(SB) ...
runtime·mstart
會調(diào)用到runtime·mstart1
會初始化 M0 并調(diào)用runtime.schedule
進入調(diào)度循環(huán)。
func mstart1() { _g_ := getg() if _g_ != _g_.m.g0 { throw("bad runtime·mstart") } // 一旦調(diào)用 schedule 就不會返回,所以需要保存一下棧幀 save(getcallerpc(), getcallersp()) asminit() minit() // 設(shè)置信號 handler if _g_.m == &m0 { mstartm0() } // 執(zhí)行啟動函數(shù) if fn := _g_.m.mstartfn; fn != nil { fn() } // 如果當(dāng)前 m 并非 m0,則要求綁定 p if _g_.m != &m0 { acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 } // 開始調(diào)度 schedule() }
mstart1保存調(diào)度信息后,會調(diào)用schedule進入調(diào)度循環(huán),尋找一個可執(zhí)行的 G 并執(zhí)行。下面看看schedule執(zhí)行函數(shù)。
func schedule() { _g_ := getg() if _g_.m.locks != 0 { throw("schedule: holding locks") } ... top: pp := _g_.m.p.ptr() pp.preempt = false // GC 等待 if sched.gcwaiting != 0 { gcstopm() goto top } // 不等于0,說明在安全點 if pp.runSafePointFn != 0 { runSafePointFn() } // 如果在 spinning ,那么運行隊列應(yīng)該為空, if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) { throw("schedule: spinning with local work") } // 運行 P 上準備就緒的 Timer checkTimers(pp, 0) var gp *g var inheritTime bool ... if gp == nil { // 為了公平,每調(diào)用 schedule 函數(shù) 61 次就要從全局可運行 G 隊列中獲取 if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) // 從全局隊列獲取1個 G gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) } } // 從 P 本地獲取 G 任務(wù) if gp == nil { gp, inheritTime = runqget(_g_.m.p.ptr()) } // 運行到這里表示從本地運行隊列和全局運行隊列都沒有找到需要運行的 G if gp == nil { // 阻塞地查找可用 G gp, inheritTime = findrunnable() // blocks until work is available } ... // 執(zhí)行 G 任務(wù)函數(shù) execute(gp, inheritTime) }
在這個函數(shù)中,我們只關(guān)注調(diào)度有關(guān)的代碼。從上面的代碼可以知道主要是從下面幾個方向去尋找可用的 G:
為了保證公平,當(dāng)全局運行隊列中有待執(zhí)行的 G 時,通過對 schedtick 取模 61 ,表示調(diào)度器每調(diào)度 61 次的時候,都會嘗試從全局隊列里取出待運行的 G 來運行;
調(diào)用 runqget 從 P 本地的運行隊列中查找待執(zhí)行的 G;
如果前兩種方法都沒有找到 G ,會通過 findrunnable 函數(shù)去其他 P 里面去“偷”一些 G 來執(zhí)行,如果“偷”不到,就阻塞直到有可運行的 G;
func globrunqget(_p_ *p, max int32) *g { // 如果全局隊列中沒有 G 直接返回 if sched.runqsize == 0 { return nil } // 計算 n 的個數(shù) n := sched.runqsize/gomaxprocs + 1 if n > sched.runqsize { n = sched.runqsize } // n 的最大個數(shù) if max > 0 && n > max { n = max } if n > int32(len(_p_.runq))/2 { n = int32(len(_p_.runq)) / 2 } sched.runqsize -= n // 拿到全局隊列隊頭 G gp := sched.runq.pop() n-- // 將其余 n-1 個 G 從全局隊列放入本地隊列 for ; n > 0; n-- { gp1 := sched.runq.pop() runqput(_p_, gp1, false) } return gp }
globrunqget 會從全局 runq 隊列中獲取 n 個 G ,其中第一個 G 用于執(zhí)行,n-1 個 G 從全局隊列放入本地隊列。
func runqget(_p_ *p) (gp *g, inheritTime bool) { // 如果 runnext 不為空,直接獲取返回 for { next := _p_.runnext if next == 0 { break } if _p_.runnext.cas(next, 0) { return next.ptr(), true } } // 從本地隊列頭指針遍歷本地隊列 for { h := atomic.LoadAcq(&_p_.runqhead) t := _p_.runqtail // 表示本地隊列為空 if t == h { return nil, false } gp := _p_.runq[h%uint32(len(_p_.runq))].ptr() if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume return gp, false } } }
本地隊列的獲取會先從 P 的 runnext 字段中獲取,如果不為空則直接返回。如果 runnext 為空,那么從本地隊列頭指針遍歷本地隊列,本地隊列是一個環(huán)形隊列,方便復(fù)用。
任務(wù)竊取方法 findrunnable 非常的復(fù)雜,足足有300行之多,我們慢慢來分析:
func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() top: _p_ := _g_.m.p.ptr() // 如果在 GC,則休眠當(dāng)前 M,直到復(fù)始后回到 top if sched.gcwaiting != 0 { gcstopm() goto top } // 運行到安全點 if _p_.runSafePointFn != 0 { runSafePointFn() } now, pollUntil, _ := checkTimers(_p_, 0) ... // 從本地 P 的可運行隊列獲取 G if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } // 從全局的可運行隊列獲取 G if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } } // 從I/O輪詢器獲取 G if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { // 嘗試從netpoller獲取Glist if list := netpoll(0); !list.empty() { // non-blocking gp := list.pop() //將其余隊列放入 P 的可運行G隊列 injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } } ... if !_g_.m.spinning { // 設(shè)置 spinning ,表示正在竊取 G _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } // 開始竊取 for i := 0; i < 4; i++ { for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { if sched.gcwaiting != 0 { goto top } // 如果 i>2 表示如果其他 P 運行隊列中沒有 G ,將要從其他隊列的 runnext 中獲取 stealRunNextG := i > 2 // first look for ready queues with more than 1 g // 隨機獲取一個 P p2 := allp[enum.position()] if _p_ == p2 { continue } // 從其他 P 的運行隊列中獲取一般的 G 到當(dāng)前隊列中 if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil { return gp, false } // 如果運行隊列中沒有 G,那么從 timers 中獲取可執(zhí)行的定時器 if i > 2 || (i > 1 && shouldStealTimers(p2)) { tnow, w, ran := checkTimers(p2, now) now = tnow if w != 0 && (pollUntil == 0 || w < pollUntil) { pollUntil = w } if ran { if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } ranTimer = true } } } } if ranTimer { goto top } stop: // 處于 GC 階段的話,獲取執(zhí)行GC標(biāo)記任務(wù)的G if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) { _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode gp := _p_.gcBgMarkWorker.ptr() //將本地 P 的 GC 標(biāo)記專用 G 職位 Grunnable casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } ... // 放棄當(dāng)前的 P 之前,對 allp 做一個快照 allpSnapshot := allp // return P and block lock(&sched.lock) // 進入了 gc,回到頂部并阻塞 if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 { unlock(&sched.lock) goto top } // 全局隊列中又發(fā)現(xiàn)了任務(wù) if sched.runqsize != 0 { gp := globrunqget(_p_, 0) unlock(&sched.lock) return gp, false } if releasep() != _p_ { throw("findrunnable: wrong p") } // 將 p 放入 idle 空閑鏈表 pidleput(_p_) unlock(&sched.lock) wasSpinning := _g_.m.spinning if _g_.m.spinning { // M 即將睡眠,狀態(tài)不再是 spinning _g_.m.spinning = false if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("findrunnable: negative nmspinning") } } // 休眠之前再次檢查全局 P 列表 //遍歷全局 P 列表的 P,并檢查他們的可運行G隊列 for _, _p_ := range allpSnapshot { // 如果這時本地隊列不空 if !runqempty(_p_) { lock(&sched.lock) // 重新獲取 P _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { // M 綁定 P acquirep(_p_) if wasSpinning { // spinning 重新切換為 true _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } // 這時候是有 work 的,回到頂部尋找 G goto top } break } } // 休眠前再次檢查 GC work if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) { lock(&sched.lock) _p_ = pidleget() if _p_ != nil && _p_.gcBgMarkWorker == 0 { pidleput(_p_) _p_ = nil } unlock(&sched.lock) if _p_ != nil { acquirep(_p_) if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } // Go back to idle GC check. goto stop } } // poll network // 休眠前再次檢查 poll 網(wǎng)絡(luò) if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 { ... lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ == nil { injectglist(&list) } else { acquirep(_p_) if !list.empty() { gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } goto top } } else if pollUntil != 0 && netpollinited() { pollerPollUntil := int64(atomic.Load64(&sched.pollUntil)) if pollerPollUntil == 0 || pollerPollUntil > pollUntil { netpollBreak() } } // 休眠當(dāng)前 M stopm() goto top }
這個函數(shù)需要注意一下,工作線程M的自旋狀態(tài)(spinning)。工作線程在從其它工作線程的本地運行隊列中盜取 G 時的狀態(tài)稱為自旋狀態(tài)。有關(guān)netpoller的知識可以到這里看:詳解Go語言I/O多路復(fù)用netpoller模型 https://www.luozhiyun.com/archives/439。
下面我們看一下 findrunnable 做了什么:
首先檢查是是否正在進行 GC,如果是則暫止當(dāng)前的 M 并阻塞休眠;
從本地運行隊列、全局運行隊列中查找 G;
從網(wǎng)絡(luò)輪詢器中查找是否有 G 等待運行;
將 spinning 設(shè)置為 true 表示開始竊取 G。竊取過程用了兩個嵌套for循環(huán),內(nèi)層循環(huán)遍歷 allp 中的所有 P ,查看其運行隊列是否有 G,如果有,則取其一半到當(dāng)前工作線程的運行隊列,然后從 findrunnable 返回,如果沒有則繼續(xù)遍歷下一個 P 。需要注意的是,遍歷 allp 時是從隨機位置上的 P 開始,防止每次遍歷時使用同樣的順序訪問allp中的元素;
所有的可能性都嘗試過了,在準備休眠 M 之前,還要進行額外的檢查;
首先檢查此時是否是 GC mark 階段,如果是,則直接返回 mark 階段的 G;
休眠之前再次檢查全局 P 列表,遍歷全局 P 列表的 P,并檢查他們的可運行G隊列;
還需要再檢查是否有 GC mark 的 G 出現(xiàn),如果有,獲取 P 并回到第一步,重新執(zhí)行偷取工作;
再檢查是否存在 poll 網(wǎng)絡(luò)的 G,如果有,則直接返回;
什么都沒找到,那么休眠當(dāng)前的 M ;
schedule
運行到到這里表示終于找到了可以運行的 G:
func execute(gp *g, inheritTime bool) { _g_ := getg() // 將 G 綁定到當(dāng)前 M 上 _g_.m.curg = gp gp.m = _g_.m // 將 g 正式切換為 _Grunning 狀態(tài) casgstatus(gp, _Grunnable, _Grunning) gp.waitsince = 0 // 搶占信號 gp.preempt = false gp.stackguard0 = gp.stack.lo + _StackGuard if !inheritTime { // 調(diào)度器調(diào)度次數(shù)增加 1 _g_.m.p.ptr().schedtick++ } ... // gogo 完成從 g0 到 gp 真正的切換 gogo(&gp.sched) }
當(dāng)開始執(zhí)行 execute
后,G 會被切換到 _Grunning
狀態(tài),并將 M 和 G 進行綁定,最終調(diào)用 runtime.gogo
開始執(zhí)行。
runtime.gogo
中會從 runtime.gobuf
中取出 runtime.goexit
的程序計數(shù)器和待執(zhí)行函數(shù)的程序計數(shù)器,然后跳轉(zhuǎn)到 runtime.goexit
中并執(zhí)行:
TEXT runtime·goexit(SB),NOSPLIT,$0-0 CALL runtime·goexit1(SB) func goexit1() { // 調(diào)用goexit0函數(shù) mcall(goexit0) }
goexit1
通過 mcall
完成 goexit0
的調(diào)用 :
func goexit0(gp *g) { _g_ := getg() // 設(shè)置當(dāng)前 G 狀態(tài)為 _Gdead casgstatus(gp, _Grunning, _Gdead) // 清理 G gp.m = nil ... gp.writebuf = nil gp.waitreason = 0 gp.param = nil gp.labels = nil gp.timer = nil // 解綁 M 和 G dropg() ... // 將 G 扔進 gfree 鏈表中等待復(fù)用 gfput(_g_.m.p.ptr(), gp) // 再次進行調(diào)度 schedule() }
goexit0
會對 G 進行復(fù)位操作,解綁 M 和 G 的關(guān)聯(lián)關(guān)系,將其 放入 gfree 鏈表中等待其他的 go 語句創(chuàng)建新的 g。在最后,goexit0
會重新調(diào)用 schedule
觸發(fā)新一輪的調(diào)度。
下面用一張圖大致總結(jié)一下調(diào)度過程:
以上就是Go語言中怎么調(diào)度循環(huán)源碼,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。