您好,登錄后才能下訂單哦!
這期內(nèi)容當中小編將會給大家?guī)碛嘘P(guān)Guava中RateLimiter的實現(xiàn)原理是什么,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
// RateLimiter屬性 /** * The underlying timer; used both to measure elapsed time and sleep as necessary. A separate * object to facilitate testing. * 用來計時, RateLimiter將實例化的時間設(shè)置為0值, 后續(xù)都是相對時間 */ private final SleepingStopwatch stopwatch; // Can't be initialized in the constructor because mocks don't call the constructor. // 鎖, RateLimiter依賴于synchronized來控制并發(fā), 限流器里面屬性都沒有加volatile修飾 private volatile @Nullable Object mutexDoNotUseDirectly;
// SmoothRateLimiter屬性 /** * The currently stored permits. * 當前還有多少permits沒有被使用, 被存下來的permits數(shù)量 */ double storedPermits; /** * The maximum number of stored permits. * 最大允許緩存的permits數(shù)量, 也就是storedPermits能達到的最大值 */ double maxPermits; /** * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits * per second has a stable interval of 200ms. * 每隔多少時間產(chǎn)生一個permit */ double stableIntervalMicros; /** * 下一次可以獲取permits的時間, 這個時間也是相對于RateLimiter的構(gòu)造時間的 * * nextFreeTicketMicros是一個很關(guān)鍵的屬性.我們每次獲取permits的時候,先拿storedPermits的值, * 如果夠,storedPermits減去相應(yīng)的值就可以了,如果不夠,那么還需要將nextFreeTicketMicros往前推, * 表示我預(yù)占了接下來多少時間的量了.那么下一個請求來的時候,如果還沒到nextFreeTicketMicros這個時間點, * 需要sleep到這個點再返回,當然也要將這個值再往前推 */ private long nextFreeTicketMicros = 0L; // could be either in the past or future
public static RateLimiter create(double permitsPerSecond) { return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer()); } @VisibleForTesting static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) { // SmoothBursty默認緩存最多1s的permits, 不可修改, 也就是說最多會緩存1 * permitsPerSecond這么多個permits到池中 RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); rateLimiter.setRate(permitsPerSecond); return rateLimiter; } SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) { super(stopwatch); this.maxBurstSeconds = maxBurstSeconds; } public final void setRate(double permitsPerSecond) { // 加鎖 synchronized (mutex()) { doSetRate(permitsPerSecond, stopwatch.readMicros()); } } @Override final void doSetRate(double permitsPerSecond, long nowMicros) { // 在關(guān)鍵節(jié)點, 會先更新下storedPermits到正確的值 resync(nowMicros); double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; // 每隔多少時間產(chǎn)生一個permit this.stableIntervalMicros = stableIntervalMicros; doSetRate(permitsPerSecond, stableIntervalMicros); } void resync(long nowMicros) { // if nextFreeTicket is in the past, resync to now // 如果nextFreeTicketMicros已經(jīng)過掉了, 想象一下很長時間沒有再次調(diào)用limiter.acquire()的場景 // 需要將nextFreeTicketMicros設(shè)置為當前時間, 重新計算下storedPermits if (nowMicros > nextFreeTicketMicros) { // 新生成的permits, 構(gòu)造函數(shù)中進來時生成的newPermits為無限大 double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); // 構(gòu)造函數(shù)進來時maxPermits為0, 所以這里的storedPermits也是0 storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; } } @Override double coolDownIntervalMicros() { // 構(gòu)造函數(shù)進來時, 此值為0.0 return stableIntervalMicros; }
@Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this.maxPermits; // maxPermits為1秒產(chǎn)生的permits maxPermits = maxBurstSeconds * permitsPerSecond; if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below storedPermits = maxPermits; } else { // 等比例縮放storedPermits storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state : storedPermits * maxPermits / oldMaxPermits; } }
// 常用api @CanIgnoreReturnValue public double acquire() { return acquire(1); } @CanIgnoreReturnValue public double acquire(int permits) { // 預(yù)約, 如果當前不能直接獲取到permits, 需要等待, 返回值表示需要sleep多久 long microsToWait = reserve(permits); // sleep stopwatch.sleepMicrosUninterruptibly(microsToWait); // 返回sleep的時長 return 1.0 * microsToWait / SECONDS.toMicros(1L); } final long reserve(int permits) { checkPermits(permits); synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } final long reserveAndGetWaitLength(int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros); // 返回當前線程為了拿這些許可需要睡眠多久, 如果立即可以拿到就不需要睡眠, 否則需要睡到nextFreeTicketMicros return max(momentAvailable - nowMicros, 0); } /** * 我們可以看到,獲取permits的時候,其實是獲取了兩部分,一部分來自于存量storedPermits,存量不夠的話, * 另一部分來自于預(yù)占未來的freshPermits.這里提一個關(guān)鍵點吧,我們看到,返回值是nextFreeTicketMicros * 的舊值,因為只要到這個時間點,就說明當次acquire可以成功返回了,而不管storedPermits夠不夠. * 如果storedPermits不夠,會將nextFreeTicketMicros往前推一定的時間,預(yù)占了一定的量. */ @Override final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { // 這里做一次同步, 更新storePermits和nextFreeTicketMicros(如果需要) resync(nowMicros); // 剛剛已經(jīng)更新過了 long returnValue = nextFreeTicketMicros; // storedPermits可以使用多少個 double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // storePermits不夠的部分 double freshPermits = requiredPermits - storedPermitsToSpend; // 為了這不夠的部分, 需要等待多久. // SmoothBursty中storedPermitsToWaitTime返回0, 直接就可以取 // SmoothWarmingUp的實現(xiàn)中,由于需要預(yù)熱,所以從storedPermits中取permits需要花費一定的時間 long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); // 將nextFreeTicketMicros往前推 this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 減去被拿走的部分 this.storedPermits -= storedPermitsToSpend; return returnValue; } @Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { // SmoothBursty中對于已經(jīng)存儲下來的storedPermits可以直接獲取到, 不需要等待 return 0L; }
public boolean tryAcquire(Duration timeout) { return tryAcquire(1, toNanosSaturated(timeout), TimeUnit.NANOSECONDS); } public boolean tryAcquire(int permits, long timeout, TimeUnit unit) { long timeoutMicros = max(unit.toMicros(timeout), 0); checkPermits(permits); long microsToWait; synchronized (mutex()) { long nowMicros = stopwatch.readMicros(); // 判斷一下超時時間內(nèi)能不能獲得到鎖, 不能獲得直接返回false if (!canAcquire(nowMicros, timeoutMicros)) { return false; } else { // 可以獲得的話再算下為了獲得這個許可需要等待多長時間 // 這邊上面已經(jīng)分析過了 microsToWait = reserveAndGetWaitLength(permits, nowMicros); } } stopwatch.sleepMicrosUninterruptibly(microsToWait); return true; } private boolean canAcquire(long nowMicros, long timeoutMicros) { // 就是看下nowMicros + timeoutMicros >= nextFreeTicketMicros // 意思就是看下超時時間內(nèi)有沒有達到可以獲取令牌的時間 return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; } @Override final long queryEarliestAvailable(long nowMicros) { return nextFreeTicketMicros; }
public final void setRate(double permitsPerSecond) { synchronized (mutex()) { doSetRate(permitsPerSecond, stopwatch.readMicros()); } } @Override final void doSetRate(double permitsPerSecond, long nowMicros) { resync(nowMicros); double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; this.stableIntervalMicros = stableIntervalMicros; doSetRate(permitsPerSecond, stableIntervalMicros); }
// SmoothWamingUp實現(xiàn) public static RateLimiter create(double permitsPerSecond, Duration warmupPeriod) { return create(permitsPerSecond, toNanosSaturated(warmupPeriod), TimeUnit.NANOSECONDS); } public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) { return create(permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer()); } @VisibleForTesting static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit, double coldFactor, SleepingStopwatch stopwatch) { RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor); rateLimiter.setRate(permitsPerSecond); return rateLimiter; } SmoothWarmingUp( SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) { super(stopwatch); // 為了達到從0到maxPermits花費warmupPeriodMicros的時間 this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod); this.coldFactor = coldFactor; }
public final void setRate(double permitsPerSecond) { synchronized (mutex()) { doSetRate(permitsPerSecond, stopwatch.readMicros()); } } @Override final void doSetRate(double permitsPerSecond, long nowMicros) { // 實現(xiàn)和SmoothBursty一樣 resync(nowMicros); double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; this.stableIntervalMicros = stableIntervalMicros; doSetRate(permitsPerSecond, stableIntervalMicros); } /** * 含義是storedPermits中每個permit的增長速度 * 為了達到從 0 到 maxPermits 花費 warmupPeriodMicros 的時間 */ @Override double coolDownIntervalMicros() { return warmupPeriodMicros / maxPermits; } @Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = maxPermits; // coldFactor是固定的3 // 當達到maxPermits時, 此時系統(tǒng)處于最冷卻的時候, 獲取一個permit需要coldIntervalMicros // 而如果storedPermits < thresholPermits的時候, 只需要stableIntervalMicros double coldIntervalMicros = stableIntervalMicros * coldFactor; // https://www.fangzhipeng.com/springcloud/2019/08/20/ratelimit-guava-sentinel.html // https://blog.csdn.net/forezp/article/details/100060686 // 梯形的面積等于2倍的長方形的面積 thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros; maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros); // 計算斜線的斜率 slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits); if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below storedPermits = 0.0; } else { // 等比例縮放 storedPermits = (oldMaxPermits == 0.0) ? maxPermits // initial state is cold : storedPermits * maxPermits / oldMaxPermits; } }
// storedPermits是上面兩個圖中最右側(cè)那條綠色線, 表示RateLimiter已經(jīng)存儲了多少許可證, 那么獲取storedPermits許可證 // 時應(yīng)當也是從最右側(cè)開始拿, 從右往左減許可證. 因此就會出現(xiàn)3種情況 // 1) storedPermits已經(jīng)大于thresholdPermits, 而且所需的許可證permitsToTake右側(cè)部分已經(jīng)足夠提供, 對應(yīng)上圖2 // 2) storedPermits已經(jīng)大于thresholdPermits, 而且所需的許可證右側(cè)不夠, 還需要從左側(cè)拿, 對應(yīng)上圖1 // 3) storedPermits小于thresholdPermits, 此時獲取每個許可證所需的時間是固定的, 對應(yīng)下面if邏輯返回false的情況 @Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { double availablePermitsAboveThreshold = storedPermits - thresholdPermits; long micros = 0; // 計算梯形部分的面積 if (availablePermitsAboveThreshold > 0.0) { double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake); double length = permitsToTime(availablePermitsAboveThreshold) + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake); micros = (long) (permitsAboveThresholdToTake * length / 2.0); permitsToTake -= permitsAboveThresholdToTake; } // 計算長方形部分的面積 micros += (long) (stableIntervalMicros * permitsToTake); return micros; }
上述就是小編為大家分享的Guava中RateLimiter的實現(xiàn)原理是什么了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。