溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Golang鎖原理如何實(shí)現(xiàn)

發(fā)布時(shí)間:2023-03-15 14:20:49 來源:億速云 閱讀:119 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹了Golang鎖原理如何實(shí)現(xiàn)的相關(guān)知識(shí),內(nèi)容詳細(xì)易懂,操作簡(jiǎn)單快捷,具有一定借鑒價(jià)值,相信大家閱讀完這篇Golang鎖原理如何實(shí)現(xiàn)文章都會(huì)有所收獲,下面我們一起來看看吧。

什么是鎖

  • 鎖的本質(zhì),就是一種資源,是由操作系統(tǒng)維護(hù)的一種專門用于同步的資源

  • 比如說互斥鎖,說白了就是一種互斥的資源。只能有一個(gè)進(jìn)程(線程)占有。當(dāng)一個(gè)進(jìn)程(線程)通過競(jìng)爭(zhēng)獲得鎖的時(shí)候,其他進(jìn)程(或線程)將得不到這把鎖。這是內(nèi)核代碼決定的

  • 如果我們希望某種資源在多個(gè)進(jìn)程(線程/協(xié)程)之間共享,但是某一時(shí)刻最多有一個(gè)進(jìn)程占有,這不就是互斥鎖的概念嗎,也就是說,我們希望自己的資源也變成一種鎖

  • 最簡(jiǎn)單的辦法就是將自己的資源和操作系統(tǒng)定義好的鎖綁定到一起。也就是說,進(jìn)程要獲取我的資源之前,必須要獲得操作系統(tǒng)的鎖。進(jìn)一步說,得鎖得資源,失鎖失資源。這樣的話,我們的資源也變成了一把鎖

為什么使用鎖

并發(fā)編程中保證數(shù)據(jù)一致性和安全性的

Golang中的鎖

Golang鎖原理如何實(shí)現(xiàn)

Golang的提供的同步機(jī)制有sync模塊下的Mutex、WaitGroup以及語言自身提供的chan等。 這些同步的方法都是以runtime中實(shí)現(xiàn)的底層同步機(jī)制(cas、atomic、spinlock、sem)為基礎(chǔ)的

1. cas、atomic

cas(Compare And Swap)和原子運(yùn)算是其他同步機(jī)制的基礎(chǔ)

  • 原子操作:指那些不能夠被打斷的操作被稱為原子操作,當(dāng)有一個(gè)CPU在訪問這塊內(nèi)容addr時(shí),其他CPU就不能訪問

  • CAS:比較及交換,其實(shí)也屬于原子操作,但它是非阻塞的,所以在被操作值被頻繁變更的情況下,CAS操作并不那么容易成功,不得不利用for循環(huán)以進(jìn)行多次嘗試

2. 自旋鎖(spinlock)

自旋鎖是指當(dāng)一個(gè)線程在獲取鎖的時(shí)候,如果鎖已經(jīng)被其他線程獲取,那么該線程將循環(huán)等待,然后不斷地判斷是否能夠被成功獲取,知直到獲取到鎖才會(huì)退出循環(huán)。獲取鎖的線程一直處于活躍狀態(tài)
Golang中的自旋鎖用來實(shí)現(xiàn)其他類型的鎖,與互斥鎖類似,不同點(diǎn)在于,它不是通過休眠來使進(jìn)程阻塞,而是在獲得鎖之前一直處于活躍狀態(tài)(自旋)

3. 信號(hào)量

實(shí)現(xiàn)休眠和喚醒協(xié)程的一種方式

信號(hào)量有兩個(gè)操作P和V
P(S):分配一個(gè)資源
1. 資源數(shù)減1:S=S-1
2. 進(jìn)行以下判斷
    如果S<0,進(jìn)入阻塞隊(duì)列等待被釋放
    如果S>=0,直接返回

V(S):釋放一個(gè)資源
1. 資源數(shù)加1:S=S+1
2. 進(jìn)行如下判斷
    如果S>0,直接返回
    如果S<=0,表示還有進(jìn)程在請(qǐng)求資源,釋放阻塞隊(duì)列中的第一個(gè)等待進(jìn)程
    
golang中信號(hào)量操作:runtime/sema.go
P操作:runtime_Semacquire
V操作:runtime_Semrelease

mutex的使用

package main

import (
    "fmt"
    "sync"
)

var num int
var mtx sync.Mutex
var wg sync.WaitGroup

func add() {
    mtx.Lock()  //mutex實(shí)例無需實(shí)例化,聲明即可使用

    defer mtx.Unlock()
    defer wg.Done()

    num += 1
}

func main() {
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go add()
    }

    wg.Wait()

    fmt.Println("num:", num)
}

mutex的必要性

鎖在高度競(jìng)爭(zhēng)時(shí)會(huì)不斷掛起恢復(fù)線程從而讓出cpu資源,原子變量在高度競(jìng)爭(zhēng)時(shí)會(huì)一直占用cpu;原子操作時(shí)線程級(jí)別的,不支持協(xié)程

mutex演進(jìn)

1. 互斥鎖

type Mutex struct {
    state int32
    sema  uint32
}

const (
    mutexLocked = 1 << iota
    mutexWoken
    mutexWaiterShift = iota  //根據(jù) mutex.state >> mutexWaiterShift 得到當(dāng)前等待的 goroutine 數(shù)目
)

state表示當(dāng)前鎖的狀態(tài),是一個(gè)共用變量

state:  |32|31|....|3|2|1|
         \__________/ | |
               |      | |
               |      | 當(dāng)前mutex是否加鎖
               |      |
               |      當(dāng)前mutex是否被喚醒
               |
               等待隊(duì)列的goroutine協(xié)程數(shù)

Lock 方法申請(qǐng)對(duì) mutex 加鎖的時(shí)候分兩種情況

  • 無沖突 通過 CAS 操作把當(dāng)前狀態(tài)設(shè)置為加鎖狀態(tài)

  • 有沖突 通過調(diào)用 semacquire 函數(shù)來讓當(dāng)前 goroutine 進(jìn)入休眠狀態(tài),等待其他協(xié)程釋放鎖的時(shí)候喚醒

//如果已經(jīng)加鎖,那么當(dāng)前協(xié)程進(jìn)入休眠阻塞,等待喚醒
func (m *Mutex) Lock() {

    // 快速加鎖:CAS更新state為locked
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }

    awoke := false //當(dāng)前goroutine是否被喚醒
    for {
        old := m.state // 保存當(dāng)前state的狀態(tài)
        new := old | mutexLocked // 新值locked位設(shè)置為1
        // 如果當(dāng)前處于加鎖狀態(tài),新到來的goroutine進(jìn)入等待隊(duì)列
        if old&mutexLocked != 0 {
            new = old + 1<<mutexWaiterShift
        }
        if awoke {
            //如果被喚醒,新值需要重置woken位為 0
            new &^= mutexWoken
        }
        
        // 兩種情況會(huì)走到這里:1.休眠中被喚醒 2.加鎖失敗進(jìn)入等待隊(duì)列
        // CAS 更新,如果更新失敗,說明有別的協(xié)程搶先一步,那么重新發(fā)起競(jìng)爭(zhēng)。
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 如果更新成功,有兩種情況
            // 1.如果為 1,說明當(dāng)前 CAS 是為了更新 waiter 計(jì)數(shù)
            // 2.如果為 0,說明是搶鎖成功,那么直接 break 退出。
            if old&mutexLocked == 0 { 
                break
            }
            runtime_Semacquire(&m.sema) // 此時(shí)如果 sema <= 0 那么阻塞在這里等待喚醒,也就是 park 住。走到這里都是要休眠了。
            awoke = true  // 有人釋放了鎖,然后當(dāng)前 goroutine 被 runtime 喚醒了,設(shè)置 awoke true
        }
    }

    if raceenabled {
        raceAcquire(unsafe.Pointer(m))
    }
}

UnLock 解鎖分兩步

  • 解鎖,通過CAS操作把當(dāng)前狀態(tài)設(shè)置為解鎖狀態(tài)

  • 喚醒休眠協(xié)程,CAS操作把當(dāng)前狀態(tài)的waiter數(shù)減1,然后喚醒休眠goroutine

//鎖沒有和某個(gè)特定的協(xié)程關(guān)聯(lián),可以由一個(gè)協(xié)程lock,另一個(gè)協(xié)程unlock
func (m *Mutex) Unlock() {
    if raceenabled {
        _ = m.state
        raceRelease(unsafe.Pointer(m))
    }

    // CAS更新state的狀態(tài)為locked 注意:解鎖的瞬間可能會(huì)有新的協(xié)程到來并搶到鎖
    new := atomic.AddInt32(&m.state, -mutexLocked)
    // 釋放了一個(gè)沒上鎖的鎖會(huì)panic:原先的lock位為0
    if (new+mutexLocked)&mutexLocked == 0 { 
        panic("sync: unlock of unlocked mutex")
    }
    
    //判斷是否需要釋放資源
    old := new
    for {
        /**
         * 不需要喚醒的情況
         * 1.等待隊(duì)列為0
         * 2.已經(jīng)有協(xié)程搶到鎖(上面的瞬間搶鎖)
         * 3.已經(jīng)有協(xié)程被喚醒
         */
        if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
            return
        }
        //將waiter計(jì)數(shù)位減一,并設(shè)置state為woken(喚醒)
        //問:會(huì)同時(shí)有多個(gè)被喚醒的協(xié)程存在嗎
        new = (old - 1<<mutexWaiterShift) | mutexWoken
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            runtime_Semrelease(&m.sema) // cas成功后,再做sema release操作,喚醒休眠的 goroutine
            return
        }
        old = m.state
    }
}

知識(shí)點(diǎn)

使用&來判斷位值,使用|來設(shè)置位值,使用&^來清空位置(內(nèi)存對(duì)齊)

一代互斥鎖的問題

處于休眠中的goroutine優(yōu)先級(jí)低于當(dāng)前活躍的,unlock解鎖的瞬間最新的goroutine會(huì)搶到鎖
大多數(shù)果鎖的時(shí)間很短,所有的goroutine都要休眠,增加runtime調(diào)度開銷

2. 自旋鎖

Lock 方法申請(qǐng)對(duì) mutex 加鎖的時(shí)候分三種情況

  • 無沖突 通過 CAS 操作把當(dāng)前狀態(tài)設(shè)置為加鎖狀態(tài)

  • 有沖突 開始自旋,并等待鎖釋放,如果其他 goroutine 在這段時(shí)間內(nèi)釋放了該鎖,直接獲得該鎖;如果沒有釋放,進(jìn)入3

  • 有沖突 通過調(diào)用 semacquire 函數(shù)來讓當(dāng)前 goroutine 進(jìn)入等待狀態(tài),等待其他協(xié)程釋放鎖的時(shí)候喚醒

func (m *Mutex) Lock() {
    //快速加鎖,邏輯不變
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }

    awoke := false
    iter := 0
    for {
        old := m.state
        new := old | mutexLocked
        if old&mutexLocked != 0 { // 如果當(dāng)前己經(jīng)上鎖,那么判斷是否可以自旋
            //短暫的自旋過后如果無果,就只能通過信號(hào)量讓當(dāng)前goroutine進(jìn)入休眠等待了
            if runtime_canSpin(iter) {
                // Active spinning makes sense.
                /**
                 * 自旋的操作:設(shè)置state為woken,這樣在unlock的時(shí)候就不會(huì)喚醒其他協(xié)程.
                 * 自旋的條件:
                 * 1.當(dāng)前協(xié)程未被喚醒 !awoke
                 * 2.其他協(xié)程未被喚醒 old&mutexWoken == 0
                 * 3.等待隊(duì)列大于0
                 */
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                }
                //進(jìn)行自旋操作
                runtime_doSpin()
                iter++
                continue
            }
            new = old + 1<<mutexWaiterShit
        }
        if awoke {
            //todo 為什么加這個(gè)判斷
            if new&mutexWoken == 0 {
                panic("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&mutexLocked == 0 {
                break
            }
            runtime_Semacquire(&m.sema)
            awoke = true
            iter = 0
        }
    }

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

path: runtime/proc.go

const (
        mutex_unlocked = 0
        mutex_locked   = 1
        mutex_sleeping = 2

        active_spin     = 4
        active_spin_cnt = 30
        passive_spin    = 1
)

/**
 * 有四種情況會(huì)返回false
 * 1.已經(jīng)執(zhí)行了很多次 iter >= active_spin 默認(rèn)為4。避免長(zhǎng)時(shí)間自旋浪費(fèi)CPU
 * 2.是單核CPU ncpu <= 1 || GOMAXPROCS < 1 保證除了當(dāng)前運(yùn)行的Goroutine之外,還有其他的Goroutine在運(yùn)行
 * 3.沒有其他正在運(yùn)行的p
 * 4 當(dāng)前P的G隊(duì)列為空 避免自旋鎖等待的條件是由當(dāng)前p的其他G來觸發(fā),這樣會(huì)導(dǎo)致再自旋變得沒有意義,因?yàn)闂l件永遠(yuǎn)無法觸發(fā)
 */
func sync_runtime_canSpin(i int) bool {
        // sync.Mutex is cooperative, so we are conservative with spinning.
        // Spin only few times and only if running on a multicore machine and
        // GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
        // As opposed to runtime mutex we don't do passive spinning here,
        // because there can be work on global runq or on other Ps.
        if i >= active_spin || ncpu <= 1 || gomaxprocs <=
        int32(sched.npidle+sched.nmspinning)+1 {
                return false
        }
        if p := getg().m.p.ptr(); !runqempty(p) {
                return false
        }
        return true
}

// 自旋邏輯
// procyeld函數(shù)內(nèi)部循環(huán)調(diào)用PAUSE指令,PAUSE指令什么都不做,但是會(huì)消耗CPU時(shí)間
// 在這里會(huì)執(zhí)行30次PAUSE指令消耗CPU時(shí)間等待鎖的釋放;
func sync_runtime_doSpin() {
    procyield(active_spin_cnt)
}

TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL    cycles+0(FP), AX
again:
    PAUSE
    SUBL    $1, AX
    JNZ again
    RET

問題:

  • 還是沒有解決休眠進(jìn)程優(yōu)先級(jí)低的問題

3. 公平鎖

基本邏輯

  • Mutex 兩種工作模式,normal 正常模式,starvation 饑餓模式。normal 情況下鎖的邏輯與老版相似,休眠的 goroutine 以 FIFO 鏈表形式保存在 sudog 中,被喚醒的 goroutine 與新到來活躍的 goroutine 競(jìng)解,但是很可能會(huì)失敗。如果一個(gè) goroutine 等待超過 1ms,那么 Mutex 進(jìn)入饑餓模式

  • 饑餓模式下,解鎖后,鎖直接交給 waiter FIFO 鏈表的第一個(gè),新來的活躍 goroutine 不參與競(jìng)爭(zhēng),并放到 FIFO 隊(duì)尾

  • 如果當(dāng)前獲得鎖的 goroutine 是 FIFO 隊(duì)尾,或是等待時(shí)長(zhǎng)小于 1ms,那么退出饑餓模式

  • normal 模式下性能是比較好的,但是 starvation 模式能減小長(zhǎng)尾 latency

LOCK流程:

  • 無沖突 通過 CAS 操作把當(dāng)前狀態(tài)設(shè)置為加鎖狀態(tài)

  • 有沖突 開始自旋 如果是饑餓模式禁止自旋,開始自旋,并等待鎖釋放,如果其他 goroutine 在這段時(shí)間內(nèi)釋放了該鎖,直接獲得該鎖;如果沒有釋放,進(jìn)入3

  • 有沖突,且已經(jīng)過了自旋階段 通過調(diào)用 semacquire 函數(shù)來讓當(dāng)前 goroutine 進(jìn)入等待狀態(tài),等待其他協(xié)程釋放鎖的時(shí)候喚醒,休眠前:如果是饑餓模式,把當(dāng)前協(xié)程放到隊(duì)列最前面;喚醒后:如果是饑餓模式喚醒的,直接獲得鎖

type Mutex struct {
        state int32 
        sema  **uint32**
}

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
        Lock()
        Unlock()
}

//為什么使用位掩碼表達(dá)式
//第3位到第32位表示等待在mutex上協(xié)程數(shù)量
const (
        mutexLocked = 1 << iota // mutex is locked 
        mutexWoken                                  
        mutexStarving           //新增饑餓狀態(tài)
        mutexWaiterShift = iota                     
        starvationThresholdNs = 1e6 //饑餓狀態(tài)的閾值:等待時(shí)間超過1ms就會(huì)進(jìn)入饑餓狀態(tài)
)


func (m *Mutex) Lock() {
        //快速加鎖:邏輯不變
        if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
                if race.Enabled {
                        race.Acquire(unsafe.Pointer(m))
                }
                return
        }

        var waitStartTime int64 //等待時(shí)間
        starving := false   //饑餓標(biāo)記
        awoke := false      //喚醒標(biāo)記
        iter := 0           //循環(huán)計(jì)數(shù)器
        old := m.state      //保存當(dāng)前鎖狀態(tài)
        for {
            // 自旋的時(shí)候增加了一個(gè)判斷:如果處于饑餓狀態(tài)就不進(jìn)入自旋,因?yàn)轲囸I模式下,釋放的鎖會(huì)直接給等待隊(duì)列的第一個(gè),當(dāng)前協(xié)程直接進(jìn)入等待隊(duì)列
                if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
                        if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                                awoke = true
                        }
                        runtime_doSpin()
                        iter++
                        old = m.state
                        continue
                }
                new := old
                // 當(dāng)mutex不處于饑餓狀態(tài)的時(shí)候,將new值設(shè)置為locked,也就是說如果是饑餓狀態(tài),新到來的goroutine直接排隊(duì)
                if old&mutexStarving == 0 {
                        new |= mutexLocked
                }
                // 當(dāng)mutex處于加鎖鎖或者饑餓狀態(tài)時(shí),新到來的goroutine進(jìn)入等待隊(duì)列
                if old&(mutexLocked|mutexStarving) != 0 {
                        new += 1 << mutexWaiterShift
                }
                // 當(dāng)?shù)却龝r(shí)間超過閾值,當(dāng)前goroutine切換mutex為饑餓模式,如果未加鎖,就不需要切換
                if starving && old&mutexLocked != 0 {
                        new |= mutexStarving
                }
                if awoke {
                        if new&mutexWoken == 0 {
                                throw("sync: inconsistent mutex state")
                        }
                        new &^= mutexWoken
                }
                if atomic.CompareAndSwapInt32(&m.state, old, new) {
                    // mutex 處于未加鎖,正常模式下,當(dāng)前 goroutine 獲得鎖
                        if old&(mutexLocked|mutexStarving) == 0 {
                                break // locked the mutex with CAS
                        }
                        // 如果已經(jīng)在排隊(duì)了,就排到隊(duì)伍的最前面
                        queueLifo := waitStartTime != 0
                        if waitStartTime == 0 {
                                waitStartTime = runtime_nanotime()
                        }
                        // queueLifo 為真的時(shí)候,當(dāng)前goroutine會(huì)被放到隊(duì)頭,
                        // 也就是說被喚醒卻沒搶到鎖的goroutine放到最前面
                        runtime_SemacquireMutex(&m.sema, queueLifo)
                        // 當(dāng)前goroutine等待時(shí)間超過閾值,切換為饑餓模式,starving設(shè)置為true
                        starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
                        old = m.state
                        //如果當(dāng)前是饑餓模式
                        if old&mutexStarving != 0 {
                                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                                        throw("sync: inconsistent mutex state")
                                }
                                // 如果切換為饑餓模式,等待隊(duì)列計(jì)數(shù)減1
                                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                                // 如果等待時(shí)間小于1ms或者自己是最后一個(gè)被喚醒的,退出饑餓模式
                                if !starving || old>>mutexWaiterShift == 1 {
                                        delta -= mutexStarving
                                }
                                atomic.AddInt32(&m.state, delta)
                                break
                        }
                        awoke = true
                        iter = 0
                } else {
                        old = m.state
                }
        }

        if race.Enabled {
                race.Acquire(unsafe.Pointer(m))
        }
}

UnLock 解鎖分兩步

  • 解鎖,通過CAS操作把當(dāng)前狀態(tài)設(shè)置為解鎖狀態(tài)

  • 喚醒休眠協(xié)程,CAS操作把當(dāng)前狀態(tài)的waiter數(shù)減1,然后喚醒休眠goroutine,如果是饑餓模式的話,喚醒等待隊(duì)列的第一個(gè)

func (m *Mutex) Unlock() {
        if race.Enabled {
                _ = m.state
                race.Release(unsafe.Pointer(m))
        }

        new := atomic.AddInt32(&m.state, -mutexLocked)
        if (new+mutexLocked)&mutexLocked == 0 {
                throw("sync: unlock of unlocked mutex")
        }

        if new&mutexStarving == 0 {
        // 正常模式
                old := new
                for {
                    /**
             * 不需要喚醒的情況
             * 1.等待隊(duì)列為0
             * 2.已經(jīng)有協(xié)程搶到鎖(上面的瞬間搶鎖)
             * 3.已經(jīng)有協(xié)程被喚醒
             * 4.處于饑餓模式 在饑餓模式獲取到鎖的協(xié)程仍然處于饑餓狀態(tài),新的goroutine無法獲取到鎖
             */
                        if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                                return
                        }
                        // Grab the right to wake someone.
                        new = (old - 1<<mutexWaiterShift) | mutexWoken
                        if atomic.CompareAndSwapInt32(&m.state, old, new) {
                                runtime_Semrelease(&m.sema, false)
                                return
                        }
                        old = m.state
                }
        } else {
            // 饑餓模式
                runtime_Semrelease(&m.sema, true)
        }
}

關(guān)于“Golang鎖原理如何實(shí)現(xiàn)”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對(duì)“Golang鎖原理如何實(shí)現(xiàn)”知識(shí)都有一定的了解,大家如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI