溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Java常見(jiàn)的限流算法怎么實(shí)現(xiàn)

發(fā)布時(shí)間:2022-04-08 10:07:00 來(lái)源:億速云 閱讀:198 作者:iii 欄目:開(kāi)發(fā)技術(shù)

這篇“Java常見(jiàn)的限流算法怎么實(shí)現(xiàn)”文章的知識(shí)點(diǎn)大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細(xì),步驟清晰,具有一定的借鑒價(jià)值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來(lái)看看這篇“Java常見(jiàn)的限流算法怎么實(shí)現(xiàn)”文章吧。

為什么要限流

在保證可用的情況下盡可能多增加進(jìn)入的人數(shù),其余的人在排隊(duì)等待,或者返回友好提示,保證里面的進(jìn)行系統(tǒng)的用戶可以正常使用,防止系統(tǒng)雪崩。

限流算法

限流算法很多,常見(jiàn)的有三類,分別是 計(jì)數(shù)器算法 、漏桶算法、令牌桶算法 。

(1)計(jì)數(shù)器:

          在一段時(shí)間間隔內(nèi),處理請(qǐng)求的最大數(shù)量固定,超過(guò)部分不做處理。

(2)漏桶:

          漏桶大小固定,處理速度固定,但請(qǐng)求進(jìn)入速度不固定(在突發(fā)情況請(qǐng)求過(guò)多時(shí),會(huì)丟棄過(guò)多的請(qǐng)求)。

(3)令牌桶:

          令牌桶的大小固定,令牌的產(chǎn)生速度固定,但是消耗令牌(即請(qǐng)求)速度不固定(可以應(yīng)對(duì)一些某些時(shí)間請(qǐng)求過(guò)多的情況);每個(gè)請(qǐng)求都會(huì)從令牌桶中取出令牌,如果沒(méi)有令牌則丟棄該次請(qǐng)求。

計(jì)數(shù)器限流

在一段時(shí)間間隔內(nèi),處理請(qǐng)求的最大數(shù)量固定,超過(guò)部分不做處理。

舉個(gè)例子,比如我們規(guī)定對(duì)于A接口,我們1分鐘的訪問(wèn)次數(shù)不能超過(guò)100次。

那么我們可以這么做:

在一開(kāi) 始的時(shí)候,我們可以設(shè)置一個(gè)計(jì)數(shù)器counter,每當(dāng)一個(gè)請(qǐng)求過(guò)來(lái)的時(shí)候,counter就加1,如果counter的值大于100并且該請(qǐng)求與第一個(gè)請(qǐng)求的間隔時(shí)間還在1分鐘之內(nèi),那么說(shuō)明請(qǐng)求數(shù)過(guò)多,拒絕訪問(wèn);

如果該請(qǐng)求與第一個(gè)請(qǐng)求的間隔時(shí)間大于1分鐘,且counter的值還在限流范圍內(nèi),那么就重置 counter,就是這么簡(jiǎn)單粗暴。

Java常見(jiàn)的限流算法怎么實(shí)現(xiàn)

代碼實(shí)現(xiàn): 

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

//計(jì)數(shù)器 限流
public class CounterLimiter {

    //起始時(shí)間
    private static long startTime = System.currentTimeMillis();

    //時(shí)間間隔1000ms
    private static long interval = 1000;

    //每個(gè)時(shí)間間隔內(nèi),限制數(shù)量
    private static long limit = 3;

    //累加器
    private static AtomicLong accumulator = new AtomicLong();

    /**
     * true 代表放行,請(qǐng)求可已通過(guò)
     * false 代表限制,不讓請(qǐng)求通過(guò)
     */
    public static boolean tryAcquire() {
        long nowTime = System.currentTimeMillis();
        //判斷是否在上一個(gè)時(shí)間間隔內(nèi)
        if (nowTime < startTime + interval) {
            //如果還在上個(gè)時(shí)間間隔內(nèi)
            long count = accumulator.incrementAndGet();
            if (count <= limit) {
                return true;
            } else {
                return false;
            }
        } else {
            //如果不在上一個(gè)時(shí)間間隔內(nèi)
            synchronized (CounterLimiter.class) {
                //防止重復(fù)初始化
                if (nowTime > startTime + interval) {
                    startTime = nowTime;
                    accumulator.set(0);
                }
            }
            //再次進(jìn)行判斷
            long count = accumulator.incrementAndGet();
            if (count <= limit) {
                return true;
            } else {
                return false;
            }
        }
    }


    // 測(cè)試
    public static void main(String[] args) {

        //線程池,用于多線程模擬測(cè)試
        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 被限制的次數(shù)
        AtomicInteger limited = new AtomicInteger(0);
        // 線程數(shù)
        final int threads = 2;
        // 每條線程的執(zhí)行輪數(shù)
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        boolean flag = tryAcquire();
                        if (!flag) {
                            // 被限制的次數(shù)累積
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有線程結(jié)束
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //輸出統(tǒng)計(jì)結(jié)果
        System.out.println("限制的次數(shù)為:" + limited.get() +
                ",通過(guò)的次數(shù)為:" + (threads * turns - limited.get()));
        System.out.println("限制的比例為:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("運(yùn)行的時(shí)長(zhǎng)為:" + time + "s");
    }

}

計(jì)數(shù)器限流的不足: 

這個(gè)算法雖然簡(jiǎn)單,但是存在臨界問(wèn)題,我們看下圖:

Java常見(jiàn)的限流算法怎么實(shí)現(xiàn)

從上圖中我們可以看到,假設(shè)有一個(gè)惡意用戶,他在0:59時(shí),瞬間發(fā)送了100個(gè)請(qǐng)求,并且1:00又瞬間發(fā)送了100個(gè)請(qǐng)求,那么其實(shí)這個(gè)用戶在 1秒里面,瞬間發(fā)送了200個(gè)請(qǐng)求。

我們剛才規(guī)定的是1分鐘最多100個(gè)請(qǐng)求(規(guī)劃的吞吐量),也就是每秒鐘最多1.7個(gè)請(qǐng)求,用戶通過(guò)在時(shí)間窗口的重置節(jié)點(diǎn)處突發(fā)請(qǐng)求, 可以瞬間超過(guò)我們的速率限制。

用戶有可能通過(guò)算法的這個(gè)漏洞,瞬間壓垮我們的應(yīng)用。

漏桶限流

漏桶算法限流的基本原理為:水(對(duì)應(yīng)請(qǐng)求)從進(jìn)水口進(jìn)入到漏桶里,漏桶以一定的速度出水(請(qǐng)求放行),當(dāng)水流入速度過(guò)大,桶內(nèi)的總水量大于桶容量會(huì)直接溢出,請(qǐng)求被拒絕。

大致的漏桶限流規(guī)則如下:

(1)進(jìn)水口(對(duì)應(yīng)客戶端請(qǐng)求)以任意速率流入進(jìn)入漏桶。

(2)漏桶的容量是固定的,出水(放行)速率也是固定的。

(3)漏桶容量是不變的,如果處理速度太慢,桶內(nèi)水量會(huì)超出了桶的容量,則后面流入的水滴會(huì)溢出,表示請(qǐng)求拒絕。

Java常見(jiàn)的限流算法怎么實(shí)現(xiàn)

?漏桶算法其實(shí)很簡(jiǎn)單,可以粗略的認(rèn)為就是注水漏水過(guò)程,往桶中以任意速率流入水,以一定速率流出水,當(dāng)水超過(guò)桶容量(capacity)則丟棄,因?yàn)橥叭萘渴遣蛔兊?,保證了整體的速率。

以一定速率流出水,

Java常見(jiàn)的限流算法怎么實(shí)現(xiàn)

削峰: 有大量流量進(jìn)入時(shí),會(huì)發(fā)生溢出,從而限流保護(hù)服務(wù)可用

緩沖: 不至于直接請(qǐng)求到服務(wù)器, 緩沖壓力

代碼實(shí)現(xiàn): 

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

//漏斗限流
public class LeakBucketLimiter {

    //桶的大小
    private static long capacity = 10;
    //流出速率,每秒兩個(gè)
    private static long rate = 2;
    //開(kāi)始時(shí)間
    private static long startTime = System.currentTimeMillis();
    //桶中剩余的水
    private static AtomicLong water = new AtomicLong();

    /**
     * true 代表放行,請(qǐng)求可已通過(guò)
     * false 代表限制,不讓請(qǐng)求通過(guò)
     */
    public synchronized static boolean tryAcquire() {
        //如果桶的余量問(wèn)0,直接放行
        if (water.get() == 0) {
            startTime = System.currentTimeMillis();
            water.set(1);
            return true;
        }
        //計(jì)算從當(dāng)前時(shí)間到開(kāi)始時(shí)間流出的水,和現(xiàn)在桶中剩余的水
        //桶中剩余的水
        water.set(water.get() - (System.currentTimeMillis() - startTime) / 1000 * rate);
        //防止出現(xiàn)<0的情況
        water.set(Math.max(0, water.get()));
        //設(shè)置新的開(kāi)始時(shí)間
        startTime += (System.currentTimeMillis() - startTime) / 1000 * 1000;
        //如果當(dāng)前水小于容量,表示可以放行
        if (water.get() < capacity) {
            water.incrementAndGet();
            return true;
        } else {
            return false;
        }
    }


    // 測(cè)試
    public static void main(String[] args) {

        //線程池,用于多線程模擬測(cè)試
        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 被限制的次數(shù)
        AtomicInteger limited = new AtomicInteger(0);
        // 線程數(shù)
        final int threads = 2;
        // 每條線程的執(zhí)行輪數(shù)
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        boolean flag = tryAcquire();
                        if (!flag) {
                            // 被限制的次數(shù)累積
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有線程結(jié)束
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //輸出統(tǒng)計(jì)結(jié)果
        System.out.println("限制的次數(shù)為:" + limited.get() +
                ",通過(guò)的次數(shù)為:" + (threads * turns - limited.get()));
        System.out.println("限制的比例為:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("運(yùn)行的時(shí)長(zhǎng)為:" + time + "s");
    }

}

漏桶的不足: 

漏桶的出水速度固定,也就是請(qǐng)求放行速度是固定的。

漏桶出口的速度固定,不能靈活的應(yīng)對(duì)后端能力提升。比如,通過(guò)動(dòng)態(tài)擴(kuò)容,后端流量從1000QPS提升到1WQPS,漏桶沒(méi)有辦法。

令牌桶限流

令牌桶算法中新請(qǐng)求到來(lái)時(shí)會(huì)從桶里拿走一個(gè)令牌,如果桶內(nèi)沒(méi)有令牌可拿,就拒絕服務(wù)。 當(dāng)然,令牌的數(shù)量也是有上限的。令牌的數(shù)量與時(shí)間和發(fā)放速率強(qiáng)相關(guān),時(shí)間流逝的時(shí)間越長(zhǎng),會(huì)不斷往桶里加入越多的令牌,如果令牌發(fā)放的速度比申請(qǐng)速度快,令牌桶會(huì)放滿令牌,直到令牌占滿整個(gè)令牌桶。

令牌桶限流大致的規(guī)則如下:

(1)進(jìn)水口按照某個(gè)速度,向桶中放入令牌。

(2)令牌的容量是固定的,但是放行的速度不是固定的,只要桶中還有剩余令牌,一旦請(qǐng)求過(guò)來(lái)就能申請(qǐng)成功,然后放行。

(3)如果令牌的發(fā)放速度,慢于請(qǐng)求到來(lái)速度,桶內(nèi)就無(wú)牌可領(lǐng),請(qǐng)求就會(huì)被拒絕。

總之,令牌的發(fā)送速率可以設(shè)置,從而可以對(duì)突發(fā)的出口流量進(jìn)行有效的應(yīng)對(duì)。

Java常見(jiàn)的限流算法怎么實(shí)現(xiàn)

代碼實(shí)現(xiàn): 

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

//令牌桶
public class TokenBucketLimiter {
    //桶的容量
    private static long capacity = 10;
    //放入令牌的速率,每秒2個(gè)
    private static long rate = 2;
    //上次放置令牌的時(shí)間
    private static long lastTime = System.currentTimeMillis();
    //桶中令牌的余量
    private static AtomicLong tokenNum = new AtomicLong();

    /**
     * true 代表放行,請(qǐng)求可已通過(guò)
     * false 代表限制,不讓請(qǐng)求通過(guò)
     */
    public synchronized static boolean tryAcquire() {
        //更新桶中剩余令牌的數(shù)量
        long now = System.currentTimeMillis();
        tokenNum.addAndGet((now - lastTime) / 1000 * rate);
        tokenNum.set(Math.min(capacity, tokenNum.get()));
        //更新時(shí)間
        lastTime += (now - lastTime) / 1000 * 1000;
        //桶中還有令牌就放行
        if (tokenNum.get() > 0) {
            tokenNum.decrementAndGet();
            return true;
        } else {
            return false;
        }
    }


    //測(cè)試
    public static void main(String[] args) {

        //線程池,用于多線程模擬測(cè)試
        ExecutorService pool = Executors.newFixedThreadPool(10);
        // 被限制的次數(shù)
        AtomicInteger limited = new AtomicInteger(0);
        // 線程數(shù)
        final int threads = 2;
        // 每條線程的執(zhí)行輪數(shù)
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        boolean flag = tryAcquire();
                        if (!flag) {
                            // 被限制的次數(shù)累積
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有線程結(jié)束
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //輸出統(tǒng)計(jì)結(jié)果
        System.out.println("限制的次數(shù)為:" + limited.get() +
                ",通過(guò)的次數(shù)為:" + (threads * turns - limited.get()));
        System.out.println("限制的比例為:" + (float) limited.get() / (float) (threads * turns));
        System.out.println("運(yùn)行的時(shí)長(zhǎng)為:" + time + "s");
    }

}

令牌桶的好處: 

令牌桶的好處之一就是可以方便地應(yīng)對(duì) 突發(fā)出口流量(后端能力的提升)。

比如,可以改變令牌的發(fā)放速度,算法能按照新的發(fā)送速率調(diào)大令牌的發(fā)放數(shù)量,使得出口突發(fā)流量能被處理。

以上就是關(guān)于“Java常見(jiàn)的限流算法怎么實(shí)現(xiàn)”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對(duì)大家有幫助,若想了解更多相關(guān)的知識(shí)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI