溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

C++如何實現(xiàn)線程安全的頻率限制器

發(fā)布時間:2020-07-23 16:45:56 來源:億速云 閱讀:304 作者:小豬 欄目:編程語言

小編這次要給大家分享的是C++如何實現(xiàn)線程安全的頻率限制器,文章內容豐富,感興趣的小伙伴可以來了解一下,希望大家閱讀完這篇文章之后能夠有所收獲。

很早以前,在學習使用 Python 的deque容器時,我寫了一篇文章python3 deque 雙向隊列創(chuàng)建與使用方法分析。最近需要壓測線上服務的性能,又不愿意總是在 QA 那邊排隊,于是需要自己寫一個壓測用的客戶端。其中一個核心需求就是要實現(xiàn) QPS 限制。

于是,終究逃不開要在 C++ 中實現(xiàn)一個線程安全的頻率限制器。

設計思路

所謂頻率限制,就是要在一個時間段(inteval)中,限制操作的次數(shù)(limit)。這又可以引出兩種強弱不同的表述:

  • 強表述:在任意一個長度等于設定的時間段(interval)的滑動窗口中,頻率限制器放行的操作次數(shù)(count)都不高于限制次數(shù)(limit)。
  • 弱表述:在一組長度等于設定的時間段(interval)且緊密相連的固定窗口中,每一個窗口里頻率限制器放行的操作次數(shù)(count)都不高于限制次數(shù)(limit)。

不難發(fā)現(xiàn),強表述通過「滑動窗口」的方式,不僅限制了頻率,還要求了操作在時間上的均勻性。前作的頻率限制器,實際上對應了這里的強表述。但實際工程中,我們通常只需要實現(xiàn)弱表述的頻率限制器就足夠使用了。

對于弱表述,實現(xiàn)起來主要思路如下:

當操作計數(shù)(count)小于限制(limit)時直接放行;

單線程實現(xiàn)

在不考慮線程安全時,不難給出這樣的實現(xiàn)。

struct ms_clock {
 using rep = std::chrono::milliseconds::rep;
 using period = std::chrono::milliseconds::period;
 using duration = std::chrono::duration<rep, period>;
 using time_point = std::chrono::time_point<ms_clock, duration>;

 static
 time_point now() noexcept {
 return time_point(std::chrono::duration_cast<duration>(
   std::chrono::steady_clock::now().time_since_epoch()));
 }
};
} // namespace __details

class RateLimiter {
 public:
 using clock = __details::ms_clock; // 1.

 private:
 const uint64_t limit_;
 const clock::duration interval_;
 const clock::rep interval_count_;

 mutable uint64_t count_{std::numeric_limits<uint64_t>::max()}; // 2.a.
 mutable clock::rep timestamp_{0};     // 2.b.

 public:
 constexpr RateLimiter(uint64_t limit, clock::duration interval) :
 limit_(limit), interval_(interval), interval_count_(interval_.count()) {}

 RateLimiter(const RateLimiter&) = delete;  // 3.a.
 RateLimiter& operator=(const RateLimiter&) = delete; // 3.b.
 RateLimiter(RateLimiter&&) = delete;   // 3.c.
 RateLimiter& operator=(RateLimiter&&) = delete; // 3.d.

 bool operator()() const;
 double qps() const {
 return 1000.0 * this->limit_ / this->interval_count_;
 }
};

bool RateLimiter::operator()() const {
 auto orig_count = this->count_++;
 if (orig_count < this->limit_) { // 4.
 return true;
 } else {
 auto ts = this->timestamp_;
 auto now = clock::now().time_since_epoch().count();
 if (now - ts < this->interval_count_) { // 5.
 return false;
 }
 this->timestamp_ = now;
 this->count_ = 1;
 return true;
 }
}

這里,

(1) 表明頻率限制器使用單位為毫秒的時鐘。在實際使用時,也可以按需改成微妙甚至納秒。

(2) 使用mutable修飾內部狀態(tài)count_timestamp_。這是因為兩個limit_interval_相同的頻率限制器,在邏輯上是等價的,但他們的內部狀態(tài)卻不一定相同。因此,為了讓const成員函數(shù)能夠修改內部狀態(tài)(而不改變邏輯等價),我們要給內部狀態(tài)數(shù)據(jù)成員加上mutable修飾。

(2.a) 處將count_設置為類型可表示的最大值是為了讓

(4) 的判斷失敗,而對timestamp_進行初始化。

(2.b) 處將timestamp_設置為0則是基于同樣的原因,讓 (5) 的判斷失敗。

(2.a) 和 (2.b) 處通過巧妙的初值設計,將初始化狀態(tài)與后續(xù)正常工作狀態(tài)的邏輯統(tǒng)一了起來,而無須丑陋的判斷。

(3) 禁止了對象的拷貝和移動。這是因為一個頻率限制器應綁定一組操作,而不應由兩組或更多組操作共享(對于拷貝的情形),或是中途失效(對于移動的情形)。

如此一來,函數(shù)調用運算符就忠實地實現(xiàn)了前述邏輯。

多線程改造

第一步改造

當有多線程同時調用RateLimiter::operator()時,顯而易見,在count_timestamp_上會產生競爭。我們有兩種辦法解決這個問題:要不然加鎖,要不然把count_timestamp_設為原子變量然后用原子操作解決問題。于是,對函數(shù)調用運算符,我們有如下第一步的改造。

class RateLimiter {
 // 其余保持不變
 private:
 mutable std::atomic<uint64_t> count_{std::numeric_limits<uint64_t>::max()}; // 1.a.
 mutable std::atomic<clock::rep> timestamp_{0};     // 1.b.
 // 其余保持不變
};

bool RateLimiter::operator()() const {
 auto orig_count = this->count_.fetch_add(1UL); // 2.
 if (orig_count < this->limit_) {
 return true;
 } else {
 auto ts = this->timestamp_.load(); // 3.
 auto now = clock::now().time_since_epoch().count();
 if (now - ts < this->interval_count_) {
 return false;
 }
 this->timestamp_.store(now); // 4.
 this->count_.store(1UL); // 5.
 return true;
 }
}

這里,

  • (1) 將count_timestamp_聲明為原子的,從而方便后續(xù)進行原子操作。
  • (2) -- (5) 則將原有操作分別改為對應的原子操作。

這樣看起來很完美,但其實是有 bug 的。我們重點關注 (4) 這里。(4) 的本意是更新timestamp_,以備下一次orig_count >= this->limit_時進行判斷。準確設置這一timestamp是頻率限制器正確工作的基石。但是,如果有兩個(或更多)線程,同時走到了 (4)會發(fā)生什么?

  • 因為原子操作的存在,兩個線程會先后執(zhí)行 (4)。于是timestamp_的值究竟是什么,我們完全不可預期。
  • 此外,兩個線程會先后執(zhí)行 (5),即原子地將count_置為1。但是你想,頻率限制器先后放行了兩次操作,但為什么count_1呢?這是不是就「偷跑」了一次操作?

為此,我們要保證只有一個線程會真正設置timestamp_,而拒絕其他同樣走到 (4) 位置的線程的操作,以避免其重復設置timestamp_以及錯誤地將count_再次置為1。

第二步改進

于是有以下改進。

bool RateLimiter::operator()() const {
 auto orig_count = this->count_.fetch_add(1UL); // 3.
 if (orig_count < this->limit_) { // 4.
 return true;
 } else {
 auto ts = this->timestamp_.load();
 auto now = clock::now().time_since_epoch().count();
 if (now - ts < this->interval_count_) { // 5.
 return false;
 }
 if (not this->timestamp_.compare_and_exchange_strong(ts, now)) { // 1.
 return false;
 }
 this->count_.store(1UL); // 2.
 return true;
 }
}

這里,(1) 是一個 CAS 原子操作。它會原子地比較timestamp_ts的值(Compare):若他們相等,則將now的值寫入timestamp_(Swap),并返回true;若他們不相等,則將timestamp_的值寫入ts,并返回false。如果沒有其他線程搶先修改timestamp_的值,那么 CAS 操作應該成功并返回true,繼續(xù)執(zhí)行后面的代碼;否則,說明其他線程已經搶先修改了timestamp_,當前線程的操作被記入上一個周期而被頻率限制器拒絕。

現(xiàn)在要考慮 (2)。如果執(zhí)行完 (1) 之后立即立刻馬上沒有任何延遲地執(zhí)行 (2),那么當然一切大吉。但如果這時候當前線程被切出去,會發(fā)生什么?我們要分情況討論。

如果ts == 0,也就是「當前線程」是頻率限制器第一次修改timestamp_。于是,當前線程可能會在 (3) 處將count_(溢出地)自增為0,從而可能有其他線程通過 (4)。此時,當前線程在當前分片有可能應當被拒絕操作。為此,我們需要在 (1) 和 (2) 之間做額外的判斷。

if (ts == 0) {
 auto orig_count = this->count.fetch_add(1UL);
 return (orig_count < this->limit_);
}

如果ts != 0,也就是「當前線程」并非頻率限制器第一次修改timestamp_。那么其他線程在 (4) 處必然判斷失敗,但在 (5) 處的判斷可能成功,從而可能繼續(xù)成功執(zhí)行 (1),從而接連兩次執(zhí)行 (2)。但這不影響正確性。因為通過 (5) 表明相對當前線程填入的timestamp_,已經由過了一個時間段(interval),而在這個時間段里,只有當前線程的一次操作會被接受。

第三次改進

由此,我們得到:

bool RateLimiter::operator()() const {
 auto orig_count = this->count_.fetch_add(1UL);
 if (orig_count < this->limit_) {
 return true;
 } else {
 auto ts = this->timestamp_.load();
 auto now = clock::now().time_since_epoch().count();
 if (now - ts < this->interval_count_) {
 return false;
 }
 if (not this->timestamp_.compare_and_exchange_strong(ts, now)) {
 return false;
 }
 if (ts == 0) {
 auto orig_count = this->count.fetch_add(1UL);
 return (orig_count < this->limit_);
 }
 this->count_.store(1UL);
 return true;
 }
}

至此,我們的代碼在邏輯上已經成立了,接下來要做一些性能方面的優(yōu)化。

原子操作默認采用std::memory_order_seq_cst的內存模型。這是 C++ 中最嚴格的內存模型,它有兩個保證:

  • 程序指令和源碼順序一致;
  • 所有線程的所有操作都有一致的順序。

為了實現(xiàn)順序一致性(sequential consistency),編譯器會使用很多對抗編譯器優(yōu)化和 CPU 亂序執(zhí)行(Out-of-Order Execution)的手段,因而性能較差。對于此處的count_,我們無需順序一致性模型,只需要獲取釋放模型(Aquire-Release)模型就足夠了。

第四次改進

于是有第四次改進:

bool RateLimiter::operator()() const {
 auto orig_count = this->count_.fetch_add(1UL, std::memory_order_acq_rel);
 if (orig_count < this->limit_) {
 return true;
 } else {
 auto ts = this->timestamp_.load();
 auto now = clock::now().time_since_epoch().count();
 if (now - ts < this->interval_count_) {
 return false;
 }
 if (not this->timestamp_.compare_and_exchange_strong(ts, now)) {
 return false;
 }
 if (ts == 0) {
 auto orig_count = this->count.fetch_add(1UL, std::memory_order_acq_rel);
 return (orig_count < this->limit_);
 }
 this->count_.store(1UL, std::memory_order_release);
 return true;
 }
}

至此,我們就完整實現(xiàn)了一個頻率限制器,可以愉快地開始拉 QPS 壓測了!

看完這篇關于C++如何實現(xiàn)線程安全的頻率限制器的文章,如果覺得文章內容寫得不錯的話,可以把它分享出去給更多人看到。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。

AI