溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

更簡(jiǎn)的并發(fā)代碼有哪些

發(fā)布時(shí)間:2021-10-14 11:30:17 來源:億速云 閱讀:119 作者:iii 欄目:編程語言

本篇內(nèi)容介紹了“更簡(jiǎn)的并發(fā)代碼有哪些”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

name作用
AtomicBoolbool類型 原子類
AtomicDurationDuration有關(guān) 原子類
AtomicFloat64float64類型 原子類
Barrier欄柵【將加鎖解鎖包裝】
Cond條件變量
DoneChan優(yōu)雅通知關(guān)閉
ImmutableResource創(chuàng)建后不會(huì)修改的資源
Limit控制請(qǐng)求數(shù)
LockedCalls確保方法的串行調(diào)用
ManagedResource資源管理
Once提供 once func
OnceGuard一次性使用的資源管理
Poolpool,簡(jiǎn)單的池
RefResource引用計(jì)數(shù)的資源
ResourceManager資源管理器
SharedCalls類似 singflight 的功能
SpinLock自旋鎖:自旋+CAS
TimeoutLimitLimit + timeout 控制

下面開始對(duì)以上庫組件做分別介紹。

atomic

因?yàn)闆]有 泛型 支持,所以才會(huì)出現(xiàn)多種類型的原子類支持。以下采用 float64 作為例子:

func (f *AtomicFloat64) Add(val float64) float64 {
	for {
		old := f.Load()
		nv := old + val
		if f.CompareAndSwap(old, nv) {
			return nv
		}
	}
}

func (f *AtomicFloat64) CompareAndSwap(old, val float64) bool {
	return atomic.CompareAndSwapUint64((*uint64)(f), math.Float64bits(old), math.Float64bits(val))
}

func (f *AtomicFloat64) Load() float64 {
	return math.Float64frombits(atomic.LoadUint64((*uint64)(f)))
}

func (f *AtomicFloat64) Set(val float64) {
	atomic.StoreUint64((*uint64)(f), math.Float64bits(val))
}
  • Add(val):如果 CAS 失敗,不斷for循環(huán)重試,獲取 old val,并set old+val;

  • CompareAndSwap(old, new):調(diào)用底層 atomicCAS

  • Load():調(diào)用 atomic.LoadUint64 ,然后轉(zhuǎn)換

  • Set(val):調(diào)用 atomic.StoreUint64

至于其他類型,開發(fā)者想自己擴(kuò)展自己想要的類型,可以依照上述,基本上調(diào)用原始 atomic 操作,然后轉(zhuǎn)換為需要的類型,比如:遇到 bool 可以借助 0, 1 來分辨對(duì)應(yīng)的 false, true。

Barrier

這里 Barrier 只是將業(yè)務(wù)函數(shù)操作封裝,作為閉包傳入,內(nèi)部將 lock 操作的加鎖解鎖自行解決了【防止開發(fā)者加鎖了忘記解鎖】

func (b *Barrier) Guard(fn func()) {
	b.lock.Lock()
	defer b.lock.Unlock()
  // 自己的業(yè)務(wù)邏輯
	fn()
}

Cond/Limit/TimeoutLimit

這個(gè)數(shù)據(jù)結(jié)構(gòu)和 Limit 一起組成了 TimeoutLimit ,這里將這3個(gè)一起講:

func NewTimeoutLimit(n int) TimeoutLimit {
	return TimeoutLimit{
		limit: NewLimit(n),
		cond:  NewCond(),
	}
}

func NewLimit(n int) Limit {
	return Limit{
		pool: make(chan lang.PlaceholderType, n),
	}
}
  • limit 這里是有緩沖的 channel;

  • cond 是無緩沖的;

所以這里結(jié)合名字來理解:因?yàn)?Limit 是限制某一種資源的使用,所以需要預(yù)先在資源池中放入預(yù)置數(shù)量的資源;Cond 類似閥門,需要兩邊都準(zhǔn)備好,才能進(jìn)行數(shù)據(jù)交換,所以使用無緩沖,同步控制。

這里我們看看 stores/mongo 中關(guān)于 session 的管理,來理解 資源控制:

func (cs *concurrentSession) takeSession(opts ...Option) (*mgo.Session, error) {
  // 選項(xiàng)參數(shù)注入
	...
  // 看 limit 中是否還能取出資源
	if err := cs.limit.Borrow(o.timeout); err != nil {
		return nil, err
	} else {
		return cs.Copy(), nil
	}
}

func (l TimeoutLimit) Borrow(timeout time.Duration) error {
  // 1. 如果還有 limit 中還有資源,取出一個(gè),返回
	if l.TryBorrow() {
		return nil
	}
	// 2. 如果 limit 中資源已經(jīng)用完了
	var ok bool
	for {
    // 只有 cond 可以取出一個(gè)【無緩存,也只有 cond <- 此條才能通過】
		timeout, ok = l.cond.WaitWithTimeout(timeout)
    // 嘗試取出一個(gè)【上面 cond 通過時(shí),就有一個(gè)資源返回了】
    // 看 `Return()`
		if ok && l.TryBorrow() {
			return nil
		}
		// 超時(shí)控制
		if timeout <= 0 {
			return ErrTimeout
		}
	}
}

func (l TimeoutLimit) Return() error {
  // 返回去一個(gè)資源
	if err := l.limit.Return(); err != nil {
		return err
	}
	// 同步通知另一個(gè)需要資源的協(xié)程【實(shí)現(xiàn)了閥門,兩方交換】
	l.cond.Signal()
	return nil
}

資源管理

同文件夾中還有 ResourceManager,從名字上類似,這里將兩個(gè)組件放在一起講解。

先從結(jié)構(gòu)上:

type ManagedResource struct {
  // 資源
	resource interface{}
	lock     sync.RWMutex
  // 生成資源的邏輯,由開發(fā)者自己控制
	generate func() interface{}
  // 對(duì)比資源
	equals   func(a, b interface{}) bool
}

type ResourceManager struct {
  // 資源:這里看得出來是 I/O,
	resources   map[string]io.Closer
	sharedCalls SharedCalls
  // 對(duì)資源map互斥訪問
	lock        sync.RWMutex
}

然后來看獲取資源的方法簽名:

func (manager *ResourceManager) GetResource(key, create func() (io.Closer, error)) (io.Closer, error)

// 獲取一個(gè)資源(有就直接獲取,沒有生成一個(gè))
func (mr *ManagedResource) Take() interface{}
// 判斷這個(gè)資源是否不符合傳入的判斷要求,不符合則重置
func (mr *ManagedResource) MarkBroken(resource interface{})
  1. ResourceManager 使用 SharedCalls 做防重復(fù)請(qǐng)求,并將資源緩存在內(nèi)部的 sourMap;另外傳入的 create funcIO 操作有關(guān),常見用在網(wǎng)絡(luò)資源的緩存;

  2. ManagedResource 緩存資源沒有 map 而是單一的 interface ,說明只有一份,但是它提供了 Take() 和傳入 generate()說明可以讓開發(fā)者自行更新 resource

所以在用途上:

  • ResourceManager:用在網(wǎng)絡(luò)資源的管理。如:數(shù)據(jù)庫連接管理;

  • ManagedResource:用在一些變化資源,可以做資源前后對(duì)比,達(dá)到更新資源。如:token 管理和驗(yàn)證

RefResource

這個(gè)就和 GC 中引用計(jì)數(shù)類似:

  • Use() -> ref++

  • Clean() -> ref--; if ref == 0 -> ref clean

func (r *RefResource) Use() error {
  // 互斥訪問
	r.lock.Lock()
	defer r.lock.Unlock()
	// 清除標(biāo)記
	if r.cleaned {
		return ErrUseOfCleaned
	}
	// 引用 +1
	r.ref++
	return nil
}

SharedCalls

一句話形容:使用SharedCalls可以使得同時(shí)多個(gè)請(qǐng)求只需要發(fā)起一次拿結(jié)果的調(diào)用,其他請(qǐng)求"坐享其成",這種設(shè)計(jì)有效減少了資源服務(wù)的并發(fā)壓力,可以有效防止緩存擊穿。

這個(gè)組件被反復(fù)應(yīng)用在其他組件中,上面說的 ResourceManager。

類似當(dāng)需要高頻并發(fā)訪問一個(gè)資源時(shí),就可以使用 SharedCalls 緩存。

// 當(dāng)多個(gè)請(qǐng)求同時(shí)使用Do方法請(qǐng)求資源時(shí)
func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
  // 先申請(qǐng)加鎖
  g.lock.Lock()

  // 根據(jù)key,獲取對(duì)應(yīng)的call結(jié)果,并用變量c保存
  if c, ok := g.calls[key]; ok {
    // 拿到call以后,釋放鎖,此處call可能還沒有實(shí)際數(shù)據(jù),只是一個(gè)空的內(nèi)存占位
    g.lock.Unlock()
    // 調(diào)用wg.Wait,判斷是否有其他goroutine正在申請(qǐng)資源,如果阻塞,說明有其他goroutine正在獲取資源
    c.wg.Wait()
    // 當(dāng)wg.Wait不再阻塞,表示資源獲取已經(jīng)結(jié)束,可以直接返回結(jié)果
    return c.val, c.err
  }

  // 沒有拿到結(jié)果,則調(diào)用makeCall方法去獲取資源,注意此處仍然是鎖住的,可以保證只有一個(gè)goroutine可以調(diào)用makecall
  c := g.makeCall(key, fn)
  // 返回調(diào)用結(jié)果
  return c.val, c.err
}

“更簡(jiǎn)的并發(fā)代碼有哪些”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

go
AI