溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

crossbeam-epoch中基于epoch的無鎖垃圾收集原理是什么

發(fā)布時(shí)間:2021-12-27 09:26:15 來源:億速云 閱讀:235 作者:柒染 欄目:大數(shù)據(jù)

這篇文章給大家介紹crossbeam-epoch中基于epoch的無鎖垃圾收集原理是什么,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。

crossbeam提供了一個(gè)基于epoch的“垃圾收集”(epoch based reclamation)庫。首先來簡單的說一下這一垃圾回收的原理。

基于epoch的“垃圾回收”

這一算法主要在Keir Fraser的  博士論文  中有涉及。我們從之前無鎖并發(fā)棧的簡單實(shí)現(xiàn)中看到,并發(fā)環(huán)境中的數(shù)據(jù)結(jié)構(gòu)的容器(比如包含了出棧元素的那個(gè)節(jié)點(diǎn)),會(huì)在一個(gè)線程中完成它的使命然后準(zhǔn)備被釋放。但是這個(gè)時(shí)候可能還有其他線程還持有其快照(主要是為了進(jìn)行CAS)。不過不會(huì)再有新的線程還會(huì)繼續(xù)取得其快照了。那么只要等到所有對(duì)該容器取得快照的線程都完成操作,就可以安心的釋放這個(gè)容器了。
這一算法需要:
  1. 一個(gè)全局的epoch計(jì)數(shù)器
  2. 每個(gè)線程有一個(gè)本地的epoch計(jì)數(shù)器
  3. 一個(gè)全局列表記錄每個(gè)epoch產(chǎn)生的垃圾
  4. 標(biāo)示線程是否活躍的標(biāo)示符
算法大概是這樣的:當(dāng)一個(gè)線程需要對(duì)某個(gè)數(shù)據(jù)結(jié)構(gòu)操作時(shí),它先將自己的標(biāo)識(shí)符設(shè)置為活躍,然后將本地epoch計(jì)數(shù)器更新為全局epoch計(jì)數(shù)器。而如果線程需要從數(shù)據(jù)結(jié)構(gòu)中刪除一個(gè)節(jié)點(diǎn),它會(huì)把該節(jié)點(diǎn)加入當(dāng)前epoch的垃圾列表。當(dāng)線程完成對(duì)數(shù)據(jù)結(jié)構(gòu)的操作時(shí),線程將自己標(biāo)記為不活躍。然后當(dāng)需要收集垃圾的時(shí)候,一個(gè)線程會(huì)遍歷所有線程并檢查是否所有活躍的線程的epoch都和當(dāng)前全局的epoch相同,如果是那么全局epoch計(jì)數(shù)將會(huì)+1,同時(shí)2個(gè)epoch前的垃圾可以被回收。
事實(shí)上,當(dāng)處于epoch N+2的時(shí)候,由于所有對(duì)數(shù)據(jù)進(jìn)行操作(活躍)的線程都在epoch N+1之后,可以安全的清理epoch N的垃圾列表。

crossbeam-epoch的API

Guard

使用  pin  可以產(chǎn)生  Guard  ,這將當(dāng)前線程標(biāo)識(shí)為活躍
    
  
  
  
use crossbeam_epoch as epoch;
let guard = &epoch::pin();
     
     
defer_destroy  則將數(shù)據(jù)放入垃圾列表,等待2個(gè)epoch之后就可以被清理了
    
  
  
  pub unsafe fn defer_destroy<T>(&self, ptr: Shared<T>)
然后是我們將看到三大指針類型:

Shared

我們剛剛在  defer_destroy  的函數(shù)簽名中看到了  Shared<T>  ,這是一個(gè)被Guard的生命周期  'g  所保護(hù)的指針類型,相當(dāng)于  &'g T  。它保證了Guard存在期間數(shù)據(jù)的可訪問。

Owned

這相當(dāng)于  Box<T>  ,是一個(gè)不會(huì)被其他線程接觸到的量。

Atomic

這是一個(gè)可以在線程間交換的原子指針??梢栽?  Guard  的保護(hù)下從中讀出  Shared<T>
    
  
  
  pub fn load(&self, ord: Ordering, &'g Guard) -> Shared<'g, T>
也可以將  Owned<T>  或者  Shared<T>  (即  Pointer<T>  )存入
    
  
  
  pub fn store<P>(&self, new: P, ord: Ordering)where    P: Pointer<T>,
也可以對(duì)其進(jìn)行CAS操作
    
  
  
  pub fn compare_and_set<O, P>(    &self,    current: Shared<T>,    new: P,    ord: O,    &'g Guard) -> Result<Shared<'g, T>, CompareAndSetError<'g, T, P>>where    O: CompareAndSetOrdering,    P: Pointer<T>,

實(shí)現(xiàn)并發(fā)棧

有了這些API,我們就可以實(shí)現(xiàn)一個(gè)無鎖的MPMC的Treiber棧了。
    
  
  
  
use std::mem::ManuallyDrop;use std::ptr;use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
use crossbeam_epoch::{self as epoch, Atomic, Owned};
#[derive(Debug)]pub struct TreiberStack<T> {    head: Atomic<Node<T>>,}
#[derive(Debug)]struct Node<T> {    data: ManuallyDrop<T>, // 告訴編譯器該變量不需要自動(dòng)Drop    next: Atomic<Node<T>>,}
impl<T> TreiberStack<T> {    pub fn new() -> TreiberStack<T> {        TreiberStack {            head: Atomic::null(),        }    }
   pub fn push(&self, t: T) {        let mut n = Owned::new(Node {            data: ManuallyDrop::new(t),            next: Atomic::null(),        });
       let guard = epoch::pin(); // 標(biāo)記當(dāng)前線程為活躍
       loop {            let head = self.head.load(Relaxed, &guard);            n.next.store(head, Relaxed);
           match self.head.compare_and_set(head, n, Release, &guard) { // CAS                Ok(_) => break,                Err(e) => n = e.new,            }        }    }
   pub fn pop(&self) -> Option<T> {        let guard = epoch::pin(); // 標(biāo)記當(dāng)前線程為活躍        loop {            let head = self.head.load(Acquire, &guard);
           match unsafe { head.as_ref() } {                Some(h) => {                    let next = h.next.load(Relaxed, &guard);
                   if self                        .head                        .compare_and_set(head, next, Relaxed, &guard) // CAS                        .is_ok()                    {                        unsafe {                            guard.defer_destroy(head); // 將垃圾加入列表                            return Some(ManuallyDrop::into_inner(ptr::read(&(*h).data))); // 返回節(jié)點(diǎn)中的數(shù)據(jù)                        }                    }                }                None => return None,            }        }    }
   pub fn is_empty(&self) -> bool {        let guard = epoch::pin();        self.head.load(Acquire, &guard).is_null()    }}
impl<T> Drop for TreiberStack<T> {    fn drop(&mut self) {        while self.pop().is_some() {}    }}
   
     

關(guān)于crossbeam-epoch中基于epoch的無鎖垃圾收集原理是什么就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI