溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Golang中sync.Mutex源碼分析

發(fā)布時間:2023-03-15 11:07:45 來源:億速云 閱讀:67 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容介紹了“Golang中sync.Mutex源碼分析”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

Mutex結(jié)構(gòu)

type Mutex struct {
	state int32
	sema  uint32
}
  • state 記錄鎖的狀態(tài),轉(zhuǎn)換為二進制前29位表示等待鎖的goroutine數(shù)量,后三位從左到右分別表示當前g 是否已獲得鎖、是否被喚醒、是否正饑餓

  • sema 充當臨界資源,其地址作為這個鎖在全局的唯一標識,所有等待這個鎖的goroutine都會在阻塞前把自己的sudog放到這個鎖的等待隊列上,然后等待被喚醒,sema的值就是可以被喚醒的goroutine的數(shù)目,只有0和1。

常量

const (
	mutexLocked = 1 << iota // mutex is locked  //值1,轉(zhuǎn)二進制后三位為001,表示鎖已被搶
	mutexWoken                                  //值2,轉(zhuǎn)二進制后三位為010,告訴即將釋放鎖的g現(xiàn)在已有g(shù)被喚醒
	mutexStarving                               //值4,轉(zhuǎn)二進制后三位為100,表示當前處在饑餓狀態(tài)
	mutexWaiterShift = iota                     //值3,表示mutex.state右移3位為等待鎖的goroutine數(shù)量
	starvationThresholdNs = 1e6                 //表示mutext切換到饑餓狀態(tài)所需等待時間的閾值,1ms。
)

Locker接口

type Locker interface {
	Lock()
	Unlock()
}

下面重點看這兩個方法。

加鎖Lock

Golang中sync.Mutex源碼分析

Lock()

func (m *Mutex) Lock() {
	// 第一種情況:快上鎖,即此刻無人來搶鎖
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {  //競爭檢測相關(guān),不用看
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	// 第二種情況:慢上鎖,即此刻有競爭對手
	m.lockSlow()
}

CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool){},go的CAS操作,底層通過調(diào)用cpu指令集提供的CAS指令實現(xiàn),位置在src/runtime/internal/atomic/atomic_amd64.s/&middot;Cas(SB)。

  • 參數(shù)addr:變量地址

  • 參數(shù)old:舊值

  • 參數(shù)new:新值

  • 原理:如果addr和old相等,則將new賦值給addr,并且返回true,否則返回false

lockSlow()

// 注釋里的第一人稱“我”只當前g
func (m *Mutex) lockSlow() {
	var waitStartTime int64	// 等待開始的時間
	starving := false		// 我是否饑餓
	awoke := false			// 我是否被喚醒
	iter := 0				// 我的自旋次數(shù)
	old := m.state			// 這個鎖此時此刻所有的信息
	for {
		// 如果:鎖已經(jīng)被搶了 或著 正處在饑餓狀態(tài) 或者 允許我自旋 那么進行自旋
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// 如果:我沒有處在喚醒態(tài) 并且 當前無g處在喚醒態(tài) 并且
            // 有等待鎖的g 并且CAS嘗試將我置為喚醒態(tài)成功 則進行自旋
            // 之所以將我置為喚醒態(tài)是為了明示那些執(zhí)行完畢正在退出的g不用再去喚醒其它g了,因為只允許存在一個喚醒的g。
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()	// 我自旋一次
			iter++				// 我自旋次數(shù)加1
			old = m.state
			continue
		}
		new := old	// new只是個中間態(tài),后面的cas操作將會判斷是否將這個中間態(tài)落實
		// 如果不是處在饑餓模式就立即搶鎖
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
        // 如果鎖被搶了 或者 處在饑餓模式,那就去排隊
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift	// 等待鎖的goroutine數(shù)量加1
		}
		// 如果我現(xiàn)在饑渴難耐 而且 鎖也被搶走了,那就立即將鎖置為饑餓模式
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
		if awoke {
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
            // 釋放我的喚醒態(tài)
            // 因為后面我要么搶到鎖要么被阻塞,都不是處在和喚醒態(tài)
			new &^= mutexWoken
		}
        //此處CAS操作嘗試將new這個中間態(tài)落實
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {
				break // 搶鎖成功!
			}
			// queueLifo我之前有沒有排過隊
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
            //原語:如果我之前排過隊,這次就把我放到等待隊列隊首,否則把我放到隊尾,并將我掛起
			runtime_SemacquireMutex(&m.sema, queueLifo, 1) 
            // 剛被喚醒的我先判斷自己是不是饑餓了,如果我等待鎖的時間小于starvationThresholdNs(1ms),那就不餓
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			if old&mutexStarving != 0 {
				// 我一覺醒來發(fā)覺鎖正處在饑餓狀態(tài),蒼天有眼這個鎖屬于我了,因為饑餓狀態(tài)絕對沒有人跟我搶鎖
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
                // delta是一個中間狀態(tài),atomic.AddInt32方法將給鎖落實這個狀態(tài)
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {
                    // 如果現(xiàn)在我不饑餓或者等待鎖的就我一個,那么就將鎖切換到正常狀態(tài)。
                    // 饑餓模式效率很低,而且一旦有兩個g把mutex切換為饑餓模式,那就會死鎖。
					delta -= mutexStarving
				}
                // 原語:給鎖落實delta的狀態(tài)。
				atomic.AddInt32(&m.state, delta)
                // 我拿到鎖啦
				break
			}
            // 把我的狀態(tài)置為喚醒,我將繼續(xù)去搶鎖
			awoke = true
            // 把我的自旋次數(shù)置0,我又可以自旋搶鎖啦
			iter = 0
		} else {
            // 繼續(xù)去搶鎖
			old = m.state
		}
	}

    // 競爭檢測的代碼,不管
	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}

runtime_canSpin(iter) 判斷當前g可否自旋,已經(jīng)自旋過iter次

func sync_runtime_canSpin(i int) bool {
	// 可自旋的條件:
    // 1.多核cpu
    // 2.GOMAXPROCS > 1 且 至少有一個其他的p在運行 且 該p的本地runq為空
    // 3.iter小于最大自旋次數(shù)active_spin = 4
	if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
  	return false
	}
  if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}

runtime_doSpin()通過調(diào)用procyield(n int32)方法來實現(xiàn)空耗CPU,n乃空耗CPU的次數(shù)。

//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
	procyield(active_spin_cnt)
}

procyield(active_spin_cnt) 的底層通過執(zhí)行PAUSE指令來空耗30個CPU時鐘周期。

TEXT runtime·procyield(SB),NOSPLIT,$0-0
	MOVL	cycles+0(FP), AX
again:
	PAUSE
	SUBL	$1, AX
	JNZ	again
	RET

runtime_SemacquireMutex(&m.sema, queueLifo, 1) 將當前g放到mutex的等待隊列中去

//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
	semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}

semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) 若lifo為true,則把g放到等待隊列隊首,若lifo為false,則把g放到隊尾

atomic.AddInt32(int32_t *val, int32_t delta) 原語:給t加上t_delta

uint32_t
AddUint32 (uint32_t *val, uint32_t delta)
{
  return __atomic_add_fetch (val, delta, __ATOMIC_SEQ_CST);
}

解鎖Unlock

Golang中sync.Mutex源碼分析

Unlock

func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	// 如果沒有g(shù)在等待鎖則立即釋放鎖
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// 如果還有g(shù)在等待鎖,則在鎖釋放后需要做一點收尾工作。
		m.unlockSlow(new)
	}
}

unlockSlow

func (m *Mutex) unlockSlow(new int32) {
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
    // 如果鎖處在正常模式下
	if new&mutexStarving == 0 {
		old := new
		for {
			// 如果鎖正處在正常模式下,同時 沒有等待鎖的g 或者 已經(jīng)有g(shù)被喚醒了 或者 鎖已經(jīng)被搶了,就什么也不用做直接返回
			// 如果鎖正處在饑餓模式下,也是什么也不用做直接返回
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// 給鎖的喚醒標志位置1,表示已經(jīng)有g(shù)被喚醒了,Mutex.state后三位010
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
                // 喚醒鎖的等待隊列頭部的一個g
                // 并把g放到p的funq尾部
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// 鎖處在饑餓模式下,直接喚醒鎖的等待隊列頭部的一個g
		//因為在饑餓模式下沒人跟剛被喚醒的g搶鎖,所以不用設(shè)置鎖的喚醒標志位
		runtime_Semrelease(&m.sema, true, 1)
	}
}

runtime_Semrelease(&m.sema, false, 1) 用來釋放mutex等待隊列上的一個g

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
	semrelease1(addr, handoff, skipframes)
}

semrelease1(addr, handoff, skipframes) 參數(shù)handoff若為true,則讓被喚醒的g立刻繼承當前g的時間片繼續(xù)執(zhí)行。若handoff為false,則把剛被喚醒的g放到當前p的runq中。

“Golang中sync.Mutex源碼分析”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI