您好,登錄后才能下訂單哦!
這篇“Java常見(jiàn)的限流算法怎么實(shí)現(xiàn)”文章的知識(shí)點(diǎn)大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細(xì),步驟清晰,具有一定的借鑒價(jià)值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來(lái)看看這篇“Java常見(jiàn)的限流算法怎么實(shí)現(xiàn)”文章吧。
在保證可用的情況下盡可能多增加進(jìn)入的人數(shù),其余的人在排隊(duì)等待,或者返回友好提示,保證里面的進(jìn)行系統(tǒng)的用戶可以正常使用,防止系統(tǒng)雪崩。
限流算法很多,常見(jiàn)的有三類,分別是 計(jì)數(shù)器算法 、漏桶算法、令牌桶算法 。
(1)計(jì)數(shù)器:
在一段時(shí)間間隔內(nèi),處理請(qǐng)求的最大數(shù)量固定,超過(guò)部分不做處理。
(2)漏桶:
漏桶大小固定,處理速度固定,但請(qǐng)求進(jìn)入速度不固定(在突發(fā)情況請(qǐng)求過(guò)多時(shí),會(huì)丟棄過(guò)多的請(qǐng)求)。
(3)令牌桶:
令牌桶的大小固定,令牌的產(chǎn)生速度固定,但是消耗令牌(即請(qǐng)求)速度不固定(可以應(yīng)對(duì)一些某些時(shí)間請(qǐng)求過(guò)多的情況);每個(gè)請(qǐng)求都會(huì)從令牌桶中取出令牌,如果沒(méi)有令牌則丟棄該次請(qǐng)求。
在一段時(shí)間間隔內(nèi),處理請(qǐng)求的最大數(shù)量固定,超過(guò)部分不做處理。
舉個(gè)例子,比如我們規(guī)定對(duì)于A接口,我們1分鐘的訪問(wèn)次數(shù)不能超過(guò)100次。
那么我們可以這么做:
在一開(kāi) 始的時(shí)候,我們可以設(shè)置一個(gè)計(jì)數(shù)器counter,每當(dāng)一個(gè)請(qǐng)求過(guò)來(lái)的時(shí)候,counter就加1,如果counter的值大于100并且該請(qǐng)求與第一個(gè)請(qǐng)求的間隔時(shí)間還在1分鐘之內(nèi),那么說(shuō)明請(qǐng)求數(shù)過(guò)多,拒絕訪問(wèn);
如果該請(qǐng)求與第一個(gè)請(qǐng)求的間隔時(shí)間大于1分鐘,且counter的值還在限流范圍內(nèi),那么就重置 counter,就是這么簡(jiǎn)單粗暴。
代碼實(shí)現(xiàn):
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; //計(jì)數(shù)器 限流 public class CounterLimiter { //起始時(shí)間 private static long startTime = System.currentTimeMillis(); //時(shí)間間隔1000ms private static long interval = 1000; //每個(gè)時(shí)間間隔內(nèi),限制數(shù)量 private static long limit = 3; //累加器 private static AtomicLong accumulator = new AtomicLong(); /** * true 代表放行,請(qǐng)求可已通過(guò) * false 代表限制,不讓請(qǐng)求通過(guò) */ public static boolean tryAcquire() { long nowTime = System.currentTimeMillis(); //判斷是否在上一個(gè)時(shí)間間隔內(nèi) if (nowTime < startTime + interval) { //如果還在上個(gè)時(shí)間間隔內(nèi) long count = accumulator.incrementAndGet(); if (count <= limit) { return true; } else { return false; } } else { //如果不在上一個(gè)時(shí)間間隔內(nèi) synchronized (CounterLimiter.class) { //防止重復(fù)初始化 if (nowTime > startTime + interval) { startTime = nowTime; accumulator.set(0); } } //再次進(jìn)行判斷 long count = accumulator.incrementAndGet(); if (count <= limit) { return true; } else { return false; } } } // 測(cè)試 public static void main(String[] args) { //線程池,用于多線程模擬測(cè)試 ExecutorService pool = Executors.newFixedThreadPool(10); // 被限制的次數(shù) AtomicInteger limited = new AtomicInteger(0); // 線程數(shù) final int threads = 2; // 每條線程的執(zhí)行輪數(shù) final int turns = 20; // 同步器 CountDownLatch countDownLatch = new CountDownLatch(threads); long start = System.currentTimeMillis(); for (int i = 0; i < threads; i++) { pool.submit(() -> { try { for (int j = 0; j < turns; j++) { boolean flag = tryAcquire(); if (!flag) { // 被限制的次數(shù)累積 limited.getAndIncrement(); } Thread.sleep(200); } } catch (Exception e) { e.printStackTrace(); } //等待所有線程結(jié)束 countDownLatch.countDown(); }); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } float time = (System.currentTimeMillis() - start) / 1000F; //輸出統(tǒng)計(jì)結(jié)果 System.out.println("限制的次數(shù)為:" + limited.get() + ",通過(guò)的次數(shù)為:" + (threads * turns - limited.get())); System.out.println("限制的比例為:" + (float) limited.get() / (float) (threads * turns)); System.out.println("運(yùn)行的時(shí)長(zhǎng)為:" + time + "s"); } }
計(jì)數(shù)器限流的不足:
這個(gè)算法雖然簡(jiǎn)單,但是存在臨界問(wèn)題,我們看下圖:
從上圖中我們可以看到,假設(shè)有一個(gè)惡意用戶,他在0:59時(shí),瞬間發(fā)送了100個(gè)請(qǐng)求,并且1:00又瞬間發(fā)送了100個(gè)請(qǐng)求,那么其實(shí)這個(gè)用戶在 1秒里面,瞬間發(fā)送了200個(gè)請(qǐng)求。
我們剛才規(guī)定的是1分鐘最多100個(gè)請(qǐng)求(規(guī)劃的吞吐量),也就是每秒鐘最多1.7個(gè)請(qǐng)求,用戶通過(guò)在時(shí)間窗口的重置節(jié)點(diǎn)處突發(fā)請(qǐng)求, 可以瞬間超過(guò)我們的速率限制。
用戶有可能通過(guò)算法的這個(gè)漏洞,瞬間壓垮我們的應(yīng)用。
漏桶算法限流的基本原理為:水(對(duì)應(yīng)請(qǐng)求)從進(jìn)水口進(jìn)入到漏桶里,漏桶以一定的速度出水(請(qǐng)求放行),當(dāng)水流入速度過(guò)大,桶內(nèi)的總水量大于桶容量會(huì)直接溢出,請(qǐng)求被拒絕。
大致的漏桶限流規(guī)則如下:
(1)進(jìn)水口(對(duì)應(yīng)客戶端請(qǐng)求)以任意速率流入進(jìn)入漏桶。
(2)漏桶的容量是固定的,出水(放行)速率也是固定的。
(3)漏桶容量是不變的,如果處理速度太慢,桶內(nèi)水量會(huì)超出了桶的容量,則后面流入的水滴會(huì)溢出,表示請(qǐng)求拒絕。
?漏桶算法其實(shí)很簡(jiǎn)單,可以粗略的認(rèn)為就是注水漏水過(guò)程,往桶中以任意速率流入水,以一定速率流出水,當(dāng)水超過(guò)桶容量(capacity)則丟棄,因?yàn)橥叭萘渴遣蛔兊?,保證了整體的速率。
以一定速率流出水,
削峰: 有大量流量進(jìn)入時(shí),會(huì)發(fā)生溢出,從而限流保護(hù)服務(wù)可用
緩沖: 不至于直接請(qǐng)求到服務(wù)器, 緩沖壓力
代碼實(shí)現(xiàn):
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; //漏斗限流 public class LeakBucketLimiter { //桶的大小 private static long capacity = 10; //流出速率,每秒兩個(gè) private static long rate = 2; //開(kāi)始時(shí)間 private static long startTime = System.currentTimeMillis(); //桶中剩余的水 private static AtomicLong water = new AtomicLong(); /** * true 代表放行,請(qǐng)求可已通過(guò) * false 代表限制,不讓請(qǐng)求通過(guò) */ public synchronized static boolean tryAcquire() { //如果桶的余量問(wèn)0,直接放行 if (water.get() == 0) { startTime = System.currentTimeMillis(); water.set(1); return true; } //計(jì)算從當(dāng)前時(shí)間到開(kāi)始時(shí)間流出的水,和現(xiàn)在桶中剩余的水 //桶中剩余的水 water.set(water.get() - (System.currentTimeMillis() - startTime) / 1000 * rate); //防止出現(xiàn)<0的情況 water.set(Math.max(0, water.get())); //設(shè)置新的開(kāi)始時(shí)間 startTime += (System.currentTimeMillis() - startTime) / 1000 * 1000; //如果當(dāng)前水小于容量,表示可以放行 if (water.get() < capacity) { water.incrementAndGet(); return true; } else { return false; } } // 測(cè)試 public static void main(String[] args) { //線程池,用于多線程模擬測(cè)試 ExecutorService pool = Executors.newFixedThreadPool(10); // 被限制的次數(shù) AtomicInteger limited = new AtomicInteger(0); // 線程數(shù) final int threads = 2; // 每條線程的執(zhí)行輪數(shù) final int turns = 20; // 同步器 CountDownLatch countDownLatch = new CountDownLatch(threads); long start = System.currentTimeMillis(); for (int i = 0; i < threads; i++) { pool.submit(() -> { try { for (int j = 0; j < turns; j++) { boolean flag = tryAcquire(); if (!flag) { // 被限制的次數(shù)累積 limited.getAndIncrement(); } Thread.sleep(200); } } catch (Exception e) { e.printStackTrace(); } //等待所有線程結(jié)束 countDownLatch.countDown(); }); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } float time = (System.currentTimeMillis() - start) / 1000F; //輸出統(tǒng)計(jì)結(jié)果 System.out.println("限制的次數(shù)為:" + limited.get() + ",通過(guò)的次數(shù)為:" + (threads * turns - limited.get())); System.out.println("限制的比例為:" + (float) limited.get() / (float) (threads * turns)); System.out.println("運(yùn)行的時(shí)長(zhǎng)為:" + time + "s"); } }
漏桶的不足:
漏桶的出水速度固定,也就是請(qǐng)求放行速度是固定的。
漏桶出口的速度固定,不能靈活的應(yīng)對(duì)后端能力提升。比如,通過(guò)動(dòng)態(tài)擴(kuò)容,后端流量從1000QPS提升到1WQPS,漏桶沒(méi)有辦法。
令牌桶算法中新請(qǐng)求到來(lái)時(shí)會(huì)從桶里拿走一個(gè)令牌,如果桶內(nèi)沒(méi)有令牌可拿,就拒絕服務(wù)。 當(dāng)然,令牌的數(shù)量也是有上限的。令牌的數(shù)量與時(shí)間和發(fā)放速率強(qiáng)相關(guān),時(shí)間流逝的時(shí)間越長(zhǎng),會(huì)不斷往桶里加入越多的令牌,如果令牌發(fā)放的速度比申請(qǐng)速度快,令牌桶會(huì)放滿令牌,直到令牌占滿整個(gè)令牌桶。
令牌桶限流大致的規(guī)則如下:
(1)進(jìn)水口按照某個(gè)速度,向桶中放入令牌。
(2)令牌的容量是固定的,但是放行的速度不是固定的,只要桶中還有剩余令牌,一旦請(qǐng)求過(guò)來(lái)就能申請(qǐng)成功,然后放行。
(3)如果令牌的發(fā)放速度,慢于請(qǐng)求到來(lái)速度,桶內(nèi)就無(wú)牌可領(lǐng),請(qǐng)求就會(huì)被拒絕。
總之,令牌的發(fā)送速率可以設(shè)置,從而可以對(duì)突發(fā)的出口流量進(jìn)行有效的應(yīng)對(duì)。
代碼實(shí)現(xiàn):
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; //令牌桶 public class TokenBucketLimiter { //桶的容量 private static long capacity = 10; //放入令牌的速率,每秒2個(gè) private static long rate = 2; //上次放置令牌的時(shí)間 private static long lastTime = System.currentTimeMillis(); //桶中令牌的余量 private static AtomicLong tokenNum = new AtomicLong(); /** * true 代表放行,請(qǐng)求可已通過(guò) * false 代表限制,不讓請(qǐng)求通過(guò) */ public synchronized static boolean tryAcquire() { //更新桶中剩余令牌的數(shù)量 long now = System.currentTimeMillis(); tokenNum.addAndGet((now - lastTime) / 1000 * rate); tokenNum.set(Math.min(capacity, tokenNum.get())); //更新時(shí)間 lastTime += (now - lastTime) / 1000 * 1000; //桶中還有令牌就放行 if (tokenNum.get() > 0) { tokenNum.decrementAndGet(); return true; } else { return false; } } //測(cè)試 public static void main(String[] args) { //線程池,用于多線程模擬測(cè)試 ExecutorService pool = Executors.newFixedThreadPool(10); // 被限制的次數(shù) AtomicInteger limited = new AtomicInteger(0); // 線程數(shù) final int threads = 2; // 每條線程的執(zhí)行輪數(shù) final int turns = 20; // 同步器 CountDownLatch countDownLatch = new CountDownLatch(threads); long start = System.currentTimeMillis(); for (int i = 0; i < threads; i++) { pool.submit(() -> { try { for (int j = 0; j < turns; j++) { boolean flag = tryAcquire(); if (!flag) { // 被限制的次數(shù)累積 limited.getAndIncrement(); } Thread.sleep(200); } } catch (Exception e) { e.printStackTrace(); } //等待所有線程結(jié)束 countDownLatch.countDown(); }); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } float time = (System.currentTimeMillis() - start) / 1000F; //輸出統(tǒng)計(jì)結(jié)果 System.out.println("限制的次數(shù)為:" + limited.get() + ",通過(guò)的次數(shù)為:" + (threads * turns - limited.get())); System.out.println("限制的比例為:" + (float) limited.get() / (float) (threads * turns)); System.out.println("運(yùn)行的時(shí)長(zhǎng)為:" + time + "s"); } }
令牌桶的好處:
令牌桶的好處之一就是可以方便地應(yīng)對(duì) 突發(fā)出口流量(后端能力的提升)。
比如,可以改變令牌的發(fā)放速度,算法能按照新的發(fā)送速率調(diào)大令牌的發(fā)放數(shù)量,使得出口突發(fā)流量能被處理。
以上就是關(guān)于“Java常見(jiàn)的限流算法怎么實(shí)現(xiàn)”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對(duì)大家有幫助,若想了解更多相關(guān)的知識(shí)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。