您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“Golang中sync.Mutex源碼分析”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!
type Mutex struct { state int32 sema uint32 }
state 記錄鎖的狀態(tài),轉(zhuǎn)換為二進制前29位表示等待鎖的goroutine數(shù)量,后三位從左到右分別表示當前g 是否已獲得鎖、是否被喚醒、是否正饑餓
sema 充當臨界資源,其地址作為這個鎖在全局的唯一標識,所有等待這個鎖的goroutine都會在阻塞前把自己的sudog放到這個鎖的等待隊列上,然后等待被喚醒,sema的值就是可以被喚醒的goroutine的數(shù)目,只有0和1。
const ( mutexLocked = 1 << iota // mutex is locked //值1,轉(zhuǎn)二進制后三位為001,表示鎖已被搶 mutexWoken //值2,轉(zhuǎn)二進制后三位為010,告訴即將釋放鎖的g現(xiàn)在已有g(shù)被喚醒 mutexStarving //值4,轉(zhuǎn)二進制后三位為100,表示當前處在饑餓狀態(tài) mutexWaiterShift = iota //值3,表示mutex.state右移3位為等待鎖的goroutine數(shù)量 starvationThresholdNs = 1e6 //表示mutext切換到饑餓狀態(tài)所需等待時間的閾值,1ms。 )
type Locker interface { Lock() Unlock() }
下面重點看這兩個方法。
func (m *Mutex) Lock() { // 第一種情況:快上鎖,即此刻無人來搶鎖 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { //競爭檢測相關(guān),不用看 race.Acquire(unsafe.Pointer(m)) } return } // 第二種情況:慢上鎖,即此刻有競爭對手 m.lockSlow() }
CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool){},go的CAS操作,底層通過調(diào)用cpu指令集提供的CAS指令實現(xiàn),位置在src/runtime/internal/atomic/atomic_amd64.s/·Cas(SB)。
參數(shù)addr:變量地址
參數(shù)old:舊值
參數(shù)new:新值
原理:如果addr和old相等,則將new賦值給addr,并且返回true,否則返回false
// 注釋里的第一人稱“我”只當前g func (m *Mutex) lockSlow() { var waitStartTime int64 // 等待開始的時間 starving := false // 我是否饑餓 awoke := false // 我是否被喚醒 iter := 0 // 我的自旋次數(shù) old := m.state // 這個鎖此時此刻所有的信息 for { // 如果:鎖已經(jīng)被搶了 或著 正處在饑餓狀態(tài) 或者 允許我自旋 那么進行自旋 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 如果:我沒有處在喚醒態(tài) 并且 當前無g處在喚醒態(tài) 并且 // 有等待鎖的g 并且CAS嘗試將我置為喚醒態(tài)成功 則進行自旋 // 之所以將我置為喚醒態(tài)是為了明示那些執(zhí)行完畢正在退出的g不用再去喚醒其它g了,因為只允許存在一個喚醒的g。 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() // 我自旋一次 iter++ // 我自旋次數(shù)加1 old = m.state continue } new := old // new只是個中間態(tài),后面的cas操作將會判斷是否將這個中間態(tài)落實 // 如果不是處在饑餓模式就立即搶鎖 if old&mutexStarving == 0 { new |= mutexLocked } // 如果鎖被搶了 或者 處在饑餓模式,那就去排隊 if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift // 等待鎖的goroutine數(shù)量加1 } // 如果我現(xiàn)在饑渴難耐 而且 鎖也被搶走了,那就立即將鎖置為饑餓模式 if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } // 釋放我的喚醒態(tài) // 因為后面我要么搶到鎖要么被阻塞,都不是處在和喚醒態(tài) new &^= mutexWoken } //此處CAS操作嘗試將new這個中間態(tài)落實 if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // 搶鎖成功! } // queueLifo我之前有沒有排過隊 queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } //原語:如果我之前排過隊,這次就把我放到等待隊列隊首,否則把我放到隊尾,并將我掛起 runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 剛被喚醒的我先判斷自己是不是饑餓了,如果我等待鎖的時間小于starvationThresholdNs(1ms),那就不餓 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { // 我一覺醒來發(fā)覺鎖正處在饑餓狀態(tài),蒼天有眼這個鎖屬于我了,因為饑餓狀態(tài)絕對沒有人跟我搶鎖 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // delta是一個中間狀態(tài),atomic.AddInt32方法將給鎖落實這個狀態(tài) delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { // 如果現(xiàn)在我不饑餓或者等待鎖的就我一個,那么就將鎖切換到正常狀態(tài)。 // 饑餓模式效率很低,而且一旦有兩個g把mutex切換為饑餓模式,那就會死鎖。 delta -= mutexStarving } // 原語:給鎖落實delta的狀態(tài)。 atomic.AddInt32(&m.state, delta) // 我拿到鎖啦 break } // 把我的狀態(tài)置為喚醒,我將繼續(xù)去搶鎖 awoke = true // 把我的自旋次數(shù)置0,我又可以自旋搶鎖啦 iter = 0 } else { // 繼續(xù)去搶鎖 old = m.state } } // 競爭檢測的代碼,不管 if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
runtime_canSpin(iter) 判斷當前g可否自旋,已經(jīng)自旋過iter次
func sync_runtime_canSpin(i int) bool { // 可自旋的條件: // 1.多核cpu // 2.GOMAXPROCS > 1 且 至少有一個其他的p在運行 且 該p的本地runq為空 // 3.iter小于最大自旋次數(shù)active_spin = 4 if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }
runtime_doSpin()通過調(diào)用procyield(n int32)方法來實現(xiàn)空耗CPU,n乃空耗CPU的次數(shù)。
//go:linkname sync_runtime_doSpin sync.runtime_doSpin //go:nosplit func sync_runtime_doSpin() { procyield(active_spin_cnt) }
procyield(active_spin_cnt) 的底層通過執(zhí)行PAUSE指令來空耗30個CPU時鐘周期。
TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AX again: PAUSE SUBL $1, AX JNZ again RET
runtime_SemacquireMutex(&m.sema, queueLifo, 1) 將當前g放到mutex的等待隊列中去
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) { semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes) }
semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) 若lifo為true,則把g放到等待隊列隊首,若lifo為false,則把g放到隊尾
atomic.AddInt32(int32_t *val, int32_t delta) 原語:給t加上t_delta
uint32_t AddUint32 (uint32_t *val, uint32_t delta) { return __atomic_add_fetch (val, delta, __ATOMIC_SEQ_CST); }
func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // 如果沒有g(shù)在等待鎖則立即釋放鎖 new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // 如果還有g(shù)在等待鎖,則在鎖釋放后需要做一點收尾工作。 m.unlockSlow(new) } }
func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } // 如果鎖處在正常模式下 if new&mutexStarving == 0 { old := new for { // 如果鎖正處在正常模式下,同時 沒有等待鎖的g 或者 已經(jīng)有g(shù)被喚醒了 或者 鎖已經(jīng)被搶了,就什么也不用做直接返回 // 如果鎖正處在饑餓模式下,也是什么也不用做直接返回 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // 給鎖的喚醒標志位置1,表示已經(jīng)有g(shù)被喚醒了,Mutex.state后三位010 new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { // 喚醒鎖的等待隊列頭部的一個g // 并把g放到p的funq尾部 runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // 鎖處在饑餓模式下,直接喚醒鎖的等待隊列頭部的一個g //因為在饑餓模式下沒人跟剛被喚醒的g搶鎖,所以不用設(shè)置鎖的喚醒標志位 runtime_Semrelease(&m.sema, true, 1) } }
runtime_Semrelease(&m.sema, false, 1) 用來釋放mutex等待隊列上的一個g
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) { semrelease1(addr, handoff, skipframes) }
semrelease1(addr, handoff, skipframes) 參數(shù)handoff若為true,則讓被喚醒的g立刻繼承當前g的時間片繼續(xù)執(zhí)行。若handoff為false,則把剛被喚醒的g放到當前p的runq中。
“Golang中sync.Mutex源碼分析”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。