您好,登錄后才能下訂單哦!
小編這次要給大家分享的是C++如何實現(xiàn)線程安全的頻率限制器,文章內容豐富,感興趣的小伙伴可以來了解一下,希望大家閱讀完這篇文章之后能夠有所收獲。
很早以前,在學習使用 Python 的deque
容器時,我寫了一篇文章python3 deque 雙向隊列創(chuàng)建與使用方法分析。最近需要壓測線上服務的性能,又不愿意總是在 QA 那邊排隊,于是需要自己寫一個壓測用的客戶端。其中一個核心需求就是要實現(xiàn) QPS 限制。
于是,終究逃不開要在 C++ 中實現(xiàn)一個線程安全的頻率限制器。
設計思路
所謂頻率限制,就是要在一個時間段(inteval)中,限制操作的次數(shù)(limit)。這又可以引出兩種強弱不同的表述:
不難發(fā)現(xiàn),強表述通過「滑動窗口」的方式,不僅限制了頻率,還要求了操作在時間上的均勻性。前作的頻率限制器,實際上對應了這里的強表述。但實際工程中,我們通常只需要實現(xiàn)弱表述的頻率限制器就足夠使用了。
對于弱表述,實現(xiàn)起來主要思路如下:
當操作計數(shù)(count)小于限制(limit)時直接放行;
單線程實現(xiàn)
在不考慮線程安全時,不難給出這樣的實現(xiàn)。
struct ms_clock { using rep = std::chrono::milliseconds::rep; using period = std::chrono::milliseconds::period; using duration = std::chrono::duration<rep, period>; using time_point = std::chrono::time_point<ms_clock, duration>; static time_point now() noexcept { return time_point(std::chrono::duration_cast<duration>( std::chrono::steady_clock::now().time_since_epoch())); } }; } // namespace __details class RateLimiter { public: using clock = __details::ms_clock; // 1. private: const uint64_t limit_; const clock::duration interval_; const clock::rep interval_count_; mutable uint64_t count_{std::numeric_limits<uint64_t>::max()}; // 2.a. mutable clock::rep timestamp_{0}; // 2.b. public: constexpr RateLimiter(uint64_t limit, clock::duration interval) : limit_(limit), interval_(interval), interval_count_(interval_.count()) {} RateLimiter(const RateLimiter&) = delete; // 3.a. RateLimiter& operator=(const RateLimiter&) = delete; // 3.b. RateLimiter(RateLimiter&&) = delete; // 3.c. RateLimiter& operator=(RateLimiter&&) = delete; // 3.d. bool operator()() const; double qps() const { return 1000.0 * this->limit_ / this->interval_count_; } }; bool RateLimiter::operator()() const { auto orig_count = this->count_++; if (orig_count < this->limit_) { // 4. return true; } else { auto ts = this->timestamp_; auto now = clock::now().time_since_epoch().count(); if (now - ts < this->interval_count_) { // 5. return false; } this->timestamp_ = now; this->count_ = 1; return true; } }
這里,
(1) 表明頻率限制器使用單位為毫秒的時鐘。在實際使用時,也可以按需改成微妙甚至納秒。
(2) 使用mutable
修飾內部狀態(tài)count_
和timestamp_
。這是因為兩個limit_
和interval_
相同的頻率限制器,在邏輯上是等價的,但他們的內部狀態(tài)卻不一定相同。因此,為了讓const
成員函數(shù)能夠修改內部狀態(tài)(而不改變邏輯等價),我們要給內部狀態(tài)數(shù)據(jù)成員加上mutable
修飾。
(2.a) 處將count_
設置為類型可表示的最大值是為了讓
(4) 的判斷失敗,而對timestamp_
進行初始化。
(2.b) 處將timestamp_
設置為0
則是基于同樣的原因,讓 (5) 的判斷失敗。
(2.a) 和 (2.b) 處通過巧妙的初值設計,將初始化狀態(tài)與后續(xù)正常工作狀態(tài)的邏輯統(tǒng)一了起來,而無須丑陋的判斷。
(3) 禁止了對象的拷貝和移動。這是因為一個頻率限制器應綁定一組操作,而不應由兩組或更多組操作共享(對于拷貝的情形),或是中途失效(對于移動的情形)。
如此一來,函數(shù)調用運算符就忠實地實現(xiàn)了前述邏輯。
多線程改造
第一步改造
當有多線程同時調用RateLimiter::operator()
時,顯而易見,在count_
和timestamp_
上會產生競爭。我們有兩種辦法解決這個問題:要不然加鎖,要不然把count_
和timestamp_
設為原子變量然后用原子操作解決問題。于是,對函數(shù)調用運算符,我們有如下第一步的改造。
class RateLimiter { // 其余保持不變 private: mutable std::atomic<uint64_t> count_{std::numeric_limits<uint64_t>::max()}; // 1.a. mutable std::atomic<clock::rep> timestamp_{0}; // 1.b. // 其余保持不變 }; bool RateLimiter::operator()() const { auto orig_count = this->count_.fetch_add(1UL); // 2. if (orig_count < this->limit_) { return true; } else { auto ts = this->timestamp_.load(); // 3. auto now = clock::now().time_since_epoch().count(); if (now - ts < this->interval_count_) { return false; } this->timestamp_.store(now); // 4. this->count_.store(1UL); // 5. return true; } }
這里,
count_
和timestamp_
聲明為原子的,從而方便后續(xù)進行原子操作。這樣看起來很完美,但其實是有 bug 的。我們重點關注 (4) 這里。(4) 的本意是更新timestamp_
,以備下一次orig_count >= this->limit_
時進行判斷。準確設置這一timestamp
是頻率限制器正確工作的基石。但是,如果有兩個(或更多)線程,同時走到了 (4)會發(fā)生什么?
timestamp_
的值究竟是什么,我們完全不可預期。count_
置為1
。但是你想,頻率限制器先后放行了兩次操作,但為什么count_
是1
呢?這是不是就「偷跑」了一次操作?為此,我們要保證只有一個線程會真正設置timestamp_
,而拒絕其他同樣走到 (4) 位置的線程的操作,以避免其重復設置timestamp_
以及錯誤地將count_
再次置為1
。
第二步改進
于是有以下改進。
bool RateLimiter::operator()() const { auto orig_count = this->count_.fetch_add(1UL); // 3. if (orig_count < this->limit_) { // 4. return true; } else { auto ts = this->timestamp_.load(); auto now = clock::now().time_since_epoch().count(); if (now - ts < this->interval_count_) { // 5. return false; } if (not this->timestamp_.compare_and_exchange_strong(ts, now)) { // 1. return false; } this->count_.store(1UL); // 2. return true; } }
這里,(1) 是一個 CAS 原子操作。它會原子地比較timestamp_
和ts
的值(Compare):若他們相等,則將now
的值寫入timestamp_
(Swap),并返回true
;若他們不相等,則將timestamp_
的值寫入ts
,并返回false
。如果沒有其他線程搶先修改timestamp_
的值,那么 CAS 操作應該成功并返回true
,繼續(xù)執(zhí)行后面的代碼;否則,說明其他線程已經搶先修改了timestamp_
,當前線程的操作被記入上一個周期而被頻率限制器拒絕。
現(xiàn)在要考慮 (2)。如果執(zhí)行完 (1) 之后立即立刻馬上沒有任何延遲地執(zhí)行 (2),那么當然一切大吉。但如果這時候當前線程被切出去,會發(fā)生什么?我們要分情況討論。
如果ts == 0
,也就是「當前線程」是頻率限制器第一次修改timestamp_
。于是,當前線程可能會在 (3) 處將count_
(溢出地)自增為0
,從而可能有其他線程通過 (4)。此時,當前線程在當前分片有可能應當被拒絕操作。為此,我們需要在 (1) 和 (2) 之間做額外的判斷。
if (ts == 0) { auto orig_count = this->count.fetch_add(1UL); return (orig_count < this->limit_); }
如果ts != 0
,也就是「當前線程」并非頻率限制器第一次修改timestamp_
。那么其他線程在 (4) 處必然判斷失敗,但在 (5) 處的判斷可能成功,從而可能繼續(xù)成功執(zhí)行 (1),從而接連兩次執(zhí)行 (2)。但這不影響正確性。因為通過 (5) 表明相對當前線程填入的timestamp_
,已經由過了一個時間段(interval),而在這個時間段里,只有當前線程的一次操作會被接受。
第三次改進
由此,我們得到:
bool RateLimiter::operator()() const { auto orig_count = this->count_.fetch_add(1UL); if (orig_count < this->limit_) { return true; } else { auto ts = this->timestamp_.load(); auto now = clock::now().time_since_epoch().count(); if (now - ts < this->interval_count_) { return false; } if (not this->timestamp_.compare_and_exchange_strong(ts, now)) { return false; } if (ts == 0) { auto orig_count = this->count.fetch_add(1UL); return (orig_count < this->limit_); } this->count_.store(1UL); return true; } }
至此,我們的代碼在邏輯上已經成立了,接下來要做一些性能方面的優(yōu)化。
原子操作默認采用std::memory_order_seq_cst
的內存模型。這是 C++ 中最嚴格的內存模型,它有兩個保證:
為了實現(xiàn)順序一致性(sequential consistency),編譯器會使用很多對抗編譯器優(yōu)化和 CPU 亂序執(zhí)行(Out-of-Order Execution)的手段,因而性能較差。對于此處的count_
,我們無需順序一致性模型,只需要獲取釋放模型(Aquire-Release)模型就足夠了。
第四次改進
于是有第四次改進:
bool RateLimiter::operator()() const { auto orig_count = this->count_.fetch_add(1UL, std::memory_order_acq_rel); if (orig_count < this->limit_) { return true; } else { auto ts = this->timestamp_.load(); auto now = clock::now().time_since_epoch().count(); if (now - ts < this->interval_count_) { return false; } if (not this->timestamp_.compare_and_exchange_strong(ts, now)) { return false; } if (ts == 0) { auto orig_count = this->count.fetch_add(1UL, std::memory_order_acq_rel); return (orig_count < this->limit_); } this->count_.store(1UL, std::memory_order_release); return true; } }
至此,我們就完整實現(xiàn)了一個頻率限制器,可以愉快地開始拉 QPS 壓測了!
看完這篇關于C++如何實現(xiàn)線程安全的頻率限制器的文章,如果覺得文章內容寫得不錯的話,可以把它分享出去給更多人看到。
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。