溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Envoy源碼是如何分析Dispatcher的

發(fā)布時(shí)間:2021-12-28 15:55:48 來源:億速云 閱讀:143 作者:柒染 欄目:云計(jì)算

Envoy源碼是如何分析Dispatcher的,相信很多沒有經(jīng)驗(yàn)的人對(duì)此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個(gè)問題。

Dispatcher

在Envoy的代碼中Dispatcher是隨處可見的,可以說在Envoy中有著舉足輕重的地位,一個(gè)Dispatcher就是一個(gè)EventLoop,其承擔(dān)了任務(wù)隊(duì)列、網(wǎng)絡(luò)事件處理、定時(shí)器、信號(hào)處理等核心功能。在Envoy threading model這篇文章所提到的EventLoop(Each worker thread runs a “non-blocking” event loop)指的就是這個(gè)Dispatcher對(duì)象。這個(gè)部分的代碼相對(duì)較獨(dú)立,和其他模塊耦合也比較少,但重要性卻不言而喻。下面是與Dispatcher相關(guān)的類圖,在接下來會(huì)對(duì)其中的關(guān)鍵概念進(jìn)行介紹。

Envoy源碼是如何分析Dispatcher的cdn.nlark.com/lark/0/2018/png/2826/1538970258265-1c4ce7cc-9283-48c4-81d2-ed4c595afb69.png">

Dispatcher 和 Libevent

Dispatcher本質(zhì)上就是一個(gè)EventLoop,Envoy并沒有重新實(shí)現(xiàn),而是復(fù)用了Libevent中的event_base,在Libevent的基礎(chǔ)上進(jìn)行了二次封裝并抽象出一些事件類,比如FileEventSignalEvent、Timer等。Libevent是一個(gè)C庫,而Envoy是C++,為了避免手動(dòng)管理這些C結(jié)構(gòu)的內(nèi)存,Envoy通過繼承unique_ptr的方式重新封裝了這些libevent暴露出來的C結(jié)構(gòu)。

template <class T, void (*deleter)(T*)>
class CSmartPtr : public std::unique_ptr<T, void (*)(T*)> {
public:
  CSmartPtr() : std::unique_ptr<T, void (*)(T*)>(nullptr, deleter) {}
  CSmartPtr(T* object) : std::unique_ptr<T, void (*)(T*)>(object, deleter) {}
};

通過CSmartPtr就可以將Libevent中的一些C數(shù)據(jù)結(jié)構(gòu)的內(nèi)存通過RAII機(jī)制自動(dòng)管理起來,使用方式如下:

extern "C">

在Libevent中無論是定時(shí)器到期、收到信號(hào)、還是文件可讀寫等都是事件,統(tǒng)一使用event類型來表示,Envoy中則將event作為ImplBase的成員,然后讓所有的事件類型的對(duì)象都繼承ImplBase,從而實(shí)現(xiàn)了事件的抽象。

class ImplBase {
protected:
  ~ImplBase();

  event raw_event_;
};

SignalEvent

SignalEvent的實(shí)現(xiàn)很簡(jiǎn)單,通過evsignal_assign來初始化事件,然后通過evsignal_add添加事件使事件成為未決狀態(tài)(關(guān)于Libevent事件狀態(tài)見附錄)。

class SignalEventImpl : public SignalEvent, ImplBase {
public:
  // signal_num: 要設(shè)置的信號(hào)值
  // cb: 信號(hào)事件的處理函數(shù)
  SignalEventImpl(DispatcherImpl& dispatcher, int signal_num, SignalCb cb);
private:
  SignalCb cb_;
};

SignalEventImpl::SignalEventImpl(DispatcherImpl& dispatcher, 
                                 int signal_num, SignalCb cb) : cb_(cb) {
  evsignal_assign(
      &raw_event_, &dispatcher.base(), signal_num,
      [](evutil_socket_t, short, void* arg) -> void { 
          static_cast<SignalEventImpl*>(arg)->cb_(); },
      this);
  evsignal_add(&raw_event_, nullptr);
}

Timer

Timer事件暴露了兩個(gè)接口一個(gè)用于關(guān)閉Timer,另外一個(gè)則用于啟動(dòng)Timer,需要傳遞一個(gè)時(shí)間來設(shè)置Timer的到期時(shí)間間隔。

class Timer {
public:
  virtual ~Timer() {}
  virtual void disableTimer() PURE;
  virtual void enableTimer(const std::chrono::milliseconds& d) PURE;
};

創(chuàng)建Timer的時(shí)候會(huì)通過evtimer_assgin對(duì)event進(jìn)行初始化,這個(gè)時(shí)候事件還處于未決狀態(tài)而不會(huì)觸發(fā),需要通過event_add添加到Dispatcher中才能被觸發(fā)。

class TimerImpl : public Timer, ImplBase {
public:
  TimerImpl(Libevent::BasePtr& libevent, TimerCb cb);

  // Timer
  void disableTimer() override;
  void enableTimer(const std::chrono::milliseconds& d) override;

private:
  TimerCb cb_;
};

TimerImpl::TimerImpl(DispatcherImpl& dispatcher, TimerCb cb) : cb_(cb) {
  ASSERT(cb_);
  evtimer_assign(
      &raw_event_, &dispatcher.base(),
      [](evutil_socket_t, short, void* arg) -> void { 
          static_cast<TimerImpl*>(arg)->cb_(); }, 
      this);
}

disableTimer被調(diào)用時(shí)其內(nèi)部會(huì)調(diào)用event_del來刪除事件,使事件成為非未決狀態(tài),enableTimer被調(diào)用時(shí)則間接調(diào)用event_add使事件成為未決狀態(tài),這樣一旦超時(shí)時(shí)間到了就會(huì)觸發(fā)超時(shí)事件。

void TimerImpl::disableTimer() { event_del(&raw_event_); }
void TimerImpl::enableTimer(const std::chrono::milliseconds& d) {
  if (d.count() == 0) {
    event_active(&raw_event_, EV_TIMEOUT, 0);
  } else {
    std::chrono::microseconds us = 
          std::chrono::duration_cast<std::chrono::microseconds>(d);
    timeval tv;
    tv.tv_sec = us.count() / 1000000;
    tv.tv_usec = us.count() % 1000000;
    event_add(&raw_event_, &tv);
  }
}

上面的代碼在計(jì)算timer時(shí)間timeval的時(shí)候?qū)崿F(xiàn)的并不優(yōu)雅,應(yīng)該避免使用像1000000這樣的不具備可讀性的數(shù)字常量,社區(qū)中有人建議可以改成如下的形式。

auto secs = std::chrono::duration_cast<std::chrono::seconds>(d);
auto usecs = 
  std::chrono::duration_cast<std::chrono::microseconds>(d - secs);
tv.tv_secs = secs.count();
tv.tv_usecs = usecs.count();

FileEvent

socket套接字相關(guān)的事件被封裝為FileEvent,其上暴露了二個(gè)接口:activate用于主動(dòng)觸發(fā)事件,典型的使用場(chǎng)景比如: 喚醒EventLoop、Write Buffer有數(shù)據(jù),可以主動(dòng)觸發(fā)下可寫事件(Envoy中的典型使用場(chǎng)景)等;setEnabled用于設(shè)置事件類型,將事件添加到EventLoop中使其成為未決狀態(tài)。

void FileEventImpl::activate(uint32_t events) {
  int libevent_events = 0;
  if (events & FileReadyType::Read) {
    libevent_events |= EV_READ;
  }
  if (events & FileReadyType::Write) {
    libevent_events |= EV_WRITE;
  }
  if (events & FileReadyType::Closed) {
    libevent_events |= EV_CLOSED;
  }
  ASSERT(libevent_events);
  event_active(&raw_event_, libevent_events, 0);
}

void FileEventImpl::setEnabled(uint32_t events) {
  event_del(&raw_event_);
  assignEvents(events);
  event_add(&raw_event_, nullptr);
}

任務(wù)隊(duì)列

Dispatcher的內(nèi)部有一個(gè)任務(wù)隊(duì)列,也會(huì)創(chuàng)建一個(gè)線程專們處理任務(wù)隊(duì)列中的任務(wù)。通過Dispatcherpost方法可以將任務(wù)投遞到任務(wù)隊(duì)列中,交給Dispatcher內(nèi)的線程去處理。

void DispatcherImpl::post(std::function<void()> callback) {
  bool do_post;
  {
    Thread::LockGuard lock(post_lock_);
    do_post = post_callbacks_.empty();
    post_callbacks_.push_back(callback);
  }
  if (do_post) {
    post_timer_->enableTimer(std::chrono::milliseconds(0));
  }
}

post方法將傳遞進(jìn)來的callback所代表的任務(wù),添加到post_callbacks_所代表的類型為vector<callback>的成員表變量中。如果post_callbacks_為空的話,說明背后的處理線程是處于非活動(dòng)狀態(tài),這時(shí)通過post_timer_設(shè)置一個(gè)超時(shí)時(shí)間時(shí)間為0的方式來喚醒它。post_timer_在構(gòu)造的時(shí)候就已經(jīng)設(shè)置好對(duì)應(yīng)的callbackrunPostCallbacks,對(duì)應(yīng)代碼如下:

DispatcherImpl::DispatcherImpl(TimeSystem& time_system,
                               Buffer::WatermarkFactoryPtr&& factory)
    : ......
      post_timer_(createTimer([this]() -> void { runPostCallbacks(); })),
      current_to_delete_(&to_delete_1_) {
  RELEASE_ASSERT(Libevent::Global::initialized(), "");
}

runPostCallbacks是一個(gè)while循環(huán),每次都從post_callbacks_中取出一個(gè)callback所代表的任務(wù)去運(yùn)行,直到post_callbacks_為空。每次運(yùn)行runPostCallbacks都會(huì)確保所有的任務(wù)都執(zhí)行完。顯然,在runPostCallbacks被線程執(zhí)行的期間如果post進(jìn)來了新的任務(wù),那么新任務(wù)直接追加到post_callbacks_尾部即可,而無需做喚醒線程這一動(dòng)作。

void DispatcherImpl::runPostCallbacks() {
  while (true) {
    std::function<void()> callback;
    {
      Thread::LockGuard lock(post_lock_);
      if (post_callbacks_.empty()) {
        return;
      }
      callback = post_callbacks_.front();
      post_callbacks_.pop_front();
    }
    callback();
  }
}

DeferredDeletable

最后講一下Dispatcher中比較難理解也很重要的DeferredDeletable,它是一個(gè)空接口,所有要進(jìn)行延遲析構(gòu)的對(duì)象都要繼承自這個(gè)空接口。在Envoy的代碼中像下面這樣繼承自DeferredDeletable的類隨處可見。

class DeferredDeletable {
public:
  virtual ~DeferredDeletable() {}
};

那何為延遲析構(gòu)呢?用在哪個(gè)場(chǎng)景呢?延遲析構(gòu)指的是將析構(gòu)的動(dòng)作交由Dispatcher來完成,所以DeferredDeletableDispatcher密切相關(guān)。Dispatcher對(duì)象有一個(gè)vector保存了所有要延遲析構(gòu)的對(duì)象。

class DispatcherImpl : public Dispatcher {
  ......
 private:
  ........
  std::vector<DeferredDeletablePtr> to_delete_1_;
  std::vector<DeferredDeletablePtr> to_delete_2_;
  std::vector<DeferredDeletablePtr>* current_to_delete_;
 }

to_delete_1_to_delete_2_就是用來存放所有的要延遲析構(gòu)的對(duì)象,這里使用兩個(gè)vector存放,為什么要這樣做呢?。current_to_delete_始終指向當(dāng)前正要析構(gòu)的對(duì)象列表,每次執(zhí)行完析構(gòu)后就交替指向另外一個(gè)對(duì)象列表,來回交替。

void DispatcherImpl::clearDeferredDeleteList() {
  ASSERT(isThreadSafe());
  std::vector<DeferredDeletablePtr>* to_delete = current_to_delete_;
  size_t num_to_delete = to_delete->size();
  if (deferred_deleting_ || !num_to_delete) {
    return;
  }
  ENVOY_LOG(trace, "clearing deferred deletion list (size={})", num_to_delete);
  if (current_to_delete_ == &to_delete_1_) {
    current_to_delete_ = &to_delete_2_;
  } else {
    current_to_delete_ = &to_delete_1_;
  }
  deferred_deleting_ = true;
  for (size_t i = 0; i < num_to_delete; i++) {
    (*to_delete)[i].reset();
  }

  to_delete->clear();
  deferred_deleting_ = false;
}

上面的代碼在執(zhí)行對(duì)象析構(gòu)的時(shí)候先使用to_delete來指向當(dāng)前正要析構(gòu)的對(duì)象列表,然后將current_to_delete_指向另外一個(gè)列表,這樣在添加延遲刪除的對(duì)象時(shí),就可以做到安全的把對(duì)象添加到列表中了。因?yàn)?code>deferredDelete和clearDeferredDeleteList都是在同一個(gè)線程中運(yùn)行,所以current_to_delete_是一個(gè)普通的指針,可以安全的更改指針指向另外一個(gè),而不用擔(dān)心有線程安全問題。

void DispatcherImpl::deferredDelete(DeferredDeletablePtr&& to_delete) {
  ASSERT(isThreadSafe());
  current_to_delete_->emplace_back(std::move(to_delete));
  ENVOY_LOG(trace, "item added to deferred deletion list (size={})", current_to_delete_->size());
  if (1 == current_to_delete_->size()) {
    deferred_delete_timer_->enableTimer(std::chrono::milliseconds(0));
  }
}

當(dāng)有要進(jìn)行延遲析構(gòu)的對(duì)象時(shí),調(diào)用deferredDelete即可,這個(gè)函數(shù)內(nèi)部會(huì)通過current_to_delete_把對(duì)象放到要延遲析構(gòu)的列表中,最后判斷下當(dāng)前要延遲析構(gòu)的列表大小是否是1,如果是1表明這是第一次添加延遲析構(gòu)的對(duì)象,那么就需要通過deferred_delete_timer_把背后的線程喚醒執(zhí)行clearDeferredDeleteList函數(shù)。這樣做的原因是避免多次喚醒,因?yàn)橛幸环N情況是線程已經(jīng)喚醒了正在執(zhí)行clearDeferredDeleteList,在這個(gè)過程中又有其他的對(duì)象需要析構(gòu)而加入到vector中。

Envoy源碼是如何分析Dispatcher的

到此為止deferredDelete的實(shí)現(xiàn)原理就基本分析完了,可以看出它的實(shí)現(xiàn)和任務(wù)隊(duì)列的實(shí)現(xiàn)很類似,只不過一個(gè)是循環(huán)執(zhí)行callback所代表的任務(wù),另一個(gè)是對(duì)對(duì)象進(jìn)行析構(gòu)。最后我們來看一下deferredDelete的應(yīng)用場(chǎng)景,卻“為何要進(jìn)行延遲析構(gòu)?”在Envoy的源代碼中經(jīng)常會(huì)看到像下面這樣的代碼片段。

ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, 
                               ConnectionSocketPtr&& socket,
                               TransportSocketPtr&& transport_socket,
                               bool connected) {
......
  }
  // 傳遞裸指針到回調(diào)中
  file_event_ = dispatcher_.createFileEvent(
      fd(), [this](uint32_t events) -> void { onFileEvent(events); }, 
      Event::FileTriggerType::Edge,
      Event::FileReadyType::Read | Event::FileReadyType::Write);
    ......
}

傳遞給Dispatchercallback都是通過裸指針的方式進(jìn)行回調(diào),如果進(jìn)行回調(diào)的時(shí)候?qū)ο笠呀?jīng)析構(gòu)了,就會(huì)出現(xiàn)野指針的問題,我相信C++水平還可以的同學(xué)都會(huì)看出這個(gè)問題,除非能在邏輯上保證Dispatcher的生命周期比所有對(duì)象都短,這樣就能保證在回調(diào)的時(shí)候?qū)ο罂隙ú粫?huì)析構(gòu),但是這不可能成立的,因?yàn)?code>Dispatcher是EventLoop的核心。

一個(gè)線程運(yùn)行一個(gè)EventLoop直到線程結(jié)束,Dispatcher對(duì)象才會(huì)析構(gòu),這意味著Dispatcher對(duì)象的生命周期是最長(zhǎng)的。所以從邏輯上沒辦法保證進(jìn)行回調(diào)的時(shí)候?qū)ο鬀]有析構(gòu)??赡苡腥藭?huì)有疑問,對(duì)象在析構(gòu)的時(shí)候把注冊(cè)的事件取消不就可以避免野指針的問題嗎? 那如果事件已經(jīng)觸發(fā)了,callback正在等待運(yùn)行呢? 又或者callback運(yùn)行了一半呢?前者libevent是可以保證的,在調(diào)用event_del的時(shí)候可以把處于等待運(yùn)行的事件取消掉,但是后者就無能為力了,這個(gè)時(shí)候如果對(duì)象析構(gòu)了,那行為就是未定義了。沿著這個(gè)思路想一想,是不是只要保證對(duì)象析構(gòu)的時(shí)候沒有callback正在運(yùn)行就可以解決問題了呢?是的,只要保證所有在執(zhí)行中的callback執(zhí)行完了,再做對(duì)象析構(gòu)就可以了??梢岳?code>Dispatcher是順序執(zhí)行所有callback的特點(diǎn),向Dispatcher中插入一個(gè)任務(wù)就是用來對(duì)象析構(gòu)的,那么當(dāng)這個(gè)任務(wù)執(zhí)行的時(shí)候是可以保證沒有其他任何callback在運(yùn)行。通過這個(gè)方法就完美解決了這里遇到的野指針問題了。

或許有人又會(huì)想,這里是不是可以用shared_ptr和shared_from_this來解這個(gè)呢? 是的,這是解決多線程環(huán)境下對(duì)象析構(gòu)的秘密武器,通過延長(zhǎng)對(duì)象的生命周期,把對(duì)象的生命周期延長(zhǎng)到和callback一樣,等callback執(zhí)行完再進(jìn)行析構(gòu),同樣可以達(dá)到效果,但是這帶來了兩個(gè)問題,第一就是對(duì)象生命周期被無限拉長(zhǎng),雖然延遲析構(gòu)也拉長(zhǎng)了生命周期,但是時(shí)間是可預(yù)期的,一旦EventLoop執(zhí)行了clearDeferredDeleteList任務(wù)就會(huì)立刻被回收,而通過shared_ptr的方式其生命周期取決于callback何時(shí)運(yùn)行,而callback何時(shí)運(yùn)行這個(gè)是沒辦法保證的,比如一個(gè)等待socket的可讀事件進(jìn)行回調(diào),如果對(duì)端一直不發(fā)送數(shù)據(jù),那么callback就一直不會(huì)被運(yùn)行,對(duì)象就一直無法被析構(gòu),長(zhǎng)時(shí)間累積會(huì)導(dǎo)致內(nèi)存使用率上漲。第二就是在使用方式上侵入性較強(qiáng),需要強(qiáng)制使用shared_ptr的方式創(chuàng)建對(duì)象。

Dispatcher總的來說其實(shí)現(xiàn)還是比較簡(jiǎn)單明了的,比較容易驗(yàn)證其正確性,同樣功能也相對(duì)較弱,和chromium的MessageLoop、boost的asio都是相似的用途,但是功能上差得比較多。好在這是專門給Envoy設(shè)計(jì)的,而且Envoy的場(chǎng)景也比較單一,不必做成那么通用的。另外一個(gè)我覺得比較奇怪的是,為什么在DeferredDeletable的實(shí)現(xiàn)中要用to_delete_1_to_delete_2_兩個(gè)隊(duì)列交替來存放,其實(shí)按照我的理解一個(gè)隊(duì)列即可,因?yàn)?code>clearDeferredDeleteList和deferredDelete是保證在同一個(gè)線程中執(zhí)行的,就和Dispatcher的任務(wù)隊(duì)列一樣,用一個(gè)隊(duì)列保存所有要執(zhí)行的任務(wù),循環(huán)的執(zhí)行即可。但是Envoy中沒有這樣做,我理解這樣設(shè)計(jì)的原因可能是因?yàn)橄啾扔谌蝿?wù)隊(duì)列來說延遲析構(gòu)的重要性更低一些,大量對(duì)象的析構(gòu)如果保存在一個(gè)隊(duì)列中循環(huán)的進(jìn)行析構(gòu)勢(shì)必會(huì)影響其他關(guān)鍵任務(wù)的執(zhí)行,所以這里拆分成兩個(gè)隊(duì)列,多個(gè)任務(wù)交替的執(zhí)行,就好比把一個(gè)大任務(wù)拆分成了好幾個(gè)小任務(wù)順序來執(zhí)行。

看完上述內(nèi)容,你們掌握Envoy源碼是如何分析Dispatcher的的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI