溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Guava中RateLimiter的實現(xiàn)原理是什么

發(fā)布時間:2021-06-24 17:19:10 來源:億速云 閱讀:181 作者:Leah 欄目:大數(shù)據(jù)

這期內(nèi)容當中小編將會給大家?guī)碛嘘P(guān)Guava中RateLimiter的實現(xiàn)原理是什么,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

1、RateLimiter基本屬性

Guava中RateLimiter的實現(xiàn)原理是什么

// RateLimiter屬性
/**
  * The underlying timer; used both to measure elapsed time and sleep as necessary. A separate
  * object to facilitate testing.
  * 用來計時, RateLimiter將實例化的時間設(shè)置為0值, 后續(xù)都是相對時間
  */
private final SleepingStopwatch stopwatch;

// Can't be initialized in the constructor because mocks don't call the constructor.
// 鎖, RateLimiter依賴于synchronized來控制并發(fā), 限流器里面屬性都沒有加volatile修飾
private volatile @Nullable Object mutexDoNotUseDirectly;
// SmoothRateLimiter屬性
/**
  * The currently stored permits.
  * 當前還有多少permits沒有被使用, 被存下來的permits數(shù)量
  */
double storedPermits;

/**
  * The maximum number of stored permits.
  * 最大允許緩存的permits數(shù)量, 也就是storedPermits能達到的最大值
  */
double maxPermits;

/**
  * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
  * per second has a stable interval of 200ms.
  * 每隔多少時間產(chǎn)生一個permit
  */
double stableIntervalMicros;

/**
  * 下一次可以獲取permits的時間, 這個時間也是相對于RateLimiter的構(gòu)造時間的
  * 
  * nextFreeTicketMicros是一個很關(guān)鍵的屬性.我們每次獲取permits的時候,先拿storedPermits的值,
  * 如果夠,storedPermits減去相應(yīng)的值就可以了,如果不夠,那么還需要將nextFreeTicketMicros往前推,
  * 表示我預(yù)占了接下來多少時間的量了.那么下一個請求來的時候,如果還沒到nextFreeTicketMicros這個時間點,
  * 需要sleep到這個點再返回,當然也要將這個值再往前推
  */
private long nextFreeTicketMicros = 0L; // could be either in the past or future

2、SmoothBursty實現(xiàn)

2.1、構(gòu)造方法

public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}

@VisibleForTesting
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    // SmoothBursty默認緩存最多1s的permits, 不可修改, 也就是說最多會緩存1 * permitsPerSecond這么多個permits到池中
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
}

SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
    super(stopwatch);
    this.maxBurstSeconds = maxBurstSeconds;
}

public final void setRate(double permitsPerSecond) {
    // 加鎖
    synchronized (mutex()) {
       doSetRate(permitsPerSecond, stopwatch.readMicros());
    }
}

@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
    // 在關(guān)鍵節(jié)點, 會先更新下storedPermits到正確的值
    resync(nowMicros);
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    // 每隔多少時間產(chǎn)生一個permit
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
}

void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    // 如果nextFreeTicketMicros已經(jīng)過掉了, 想象一下很長時間沒有再次調(diào)用limiter.acquire()的場景
    // 需要將nextFreeTicketMicros設(shè)置為當前時間, 重新計算下storedPermits
    if (nowMicros > nextFreeTicketMicros) {
      // 新生成的permits, 構(gòu)造函數(shù)中進來時生成的newPermits為無限大
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      // 構(gòu)造函數(shù)進來時maxPermits為0, 所以這里的storedPermits也是0
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
}

@Override
double coolDownIntervalMicros() {
    // 構(gòu)造函數(shù)進來時, 此值為0.0
    return stableIntervalMicros;
}
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
    double oldMaxPermits = this.maxPermits;
    // maxPermits為1秒產(chǎn)生的permits
    maxPermits = maxBurstSeconds * permitsPerSecond;
    if (oldMaxPermits == Double.POSITIVE_INFINITY) {
      // if we don't special-case this, we would get storedPermits == NaN, below
      storedPermits = maxPermits;
    } else {
      // 等比例縮放storedPermits
      storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state
                                             : storedPermits * maxPermits / oldMaxPermits;
      }
}

2.2、acquire

// 常用api
@CanIgnoreReturnValue
public double acquire() {
    return acquire(1);
}

@CanIgnoreReturnValue
public double acquire(int permits) {
    // 預(yù)約, 如果當前不能直接獲取到permits, 需要等待, 返回值表示需要sleep多久
    long microsToWait = reserve(permits);
    // sleep
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    // 返回sleep的時長
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

final long reserve(int permits) {
    checkPermits(permits);
    synchronized (mutex()) {
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
}

final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    // 返回當前線程為了拿這些許可需要睡眠多久, 如果立即可以拿到就不需要睡眠, 否則需要睡到nextFreeTicketMicros
    return max(momentAvailable - nowMicros, 0);
}

/**
   * 我們可以看到,獲取permits的時候,其實是獲取了兩部分,一部分來自于存量storedPermits,存量不夠的話,
   * 另一部分來自于預(yù)占未來的freshPermits.這里提一個關(guān)鍵點吧,我們看到,返回值是nextFreeTicketMicros
   * 的舊值,因為只要到這個時間點,就說明當次acquire可以成功返回了,而不管storedPermits夠不夠.
   * 如果storedPermits不夠,會將nextFreeTicketMicros往前推一定的時間,預(yù)占了一定的量.
   */
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    // 這里做一次同步, 更新storePermits和nextFreeTicketMicros(如果需要)
    resync(nowMicros);
    // 剛剛已經(jīng)更新過了
    long returnValue = nextFreeTicketMicros;
    // storedPermits可以使用多少個
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    // storePermits不夠的部分
    double freshPermits = requiredPermits - storedPermitsToSpend;
    // 為了這不夠的部分, 需要等待多久.
    // SmoothBursty中storedPermitsToWaitTime返回0, 直接就可以取
    // SmoothWarmingUp的實現(xiàn)中,由于需要預(yù)熱,所以從storedPermits中取permits需要花費一定的時間
    long waitMicros =
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);

    // 將nextFreeTicketMicros往前推
    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    // 減去被拿走的部分
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
}

@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
    // SmoothBursty中對于已經(jīng)存儲下來的storedPermits可以直接獲取到, 不需要等待
    return 0L;
}

2.3、tryAcquire

public boolean tryAcquire(Duration timeout) {
    return tryAcquire(1, toNanosSaturated(timeout), TimeUnit.NANOSECONDS);
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
    long timeoutMicros = max(unit.toMicros(timeout), 0);
    checkPermits(permits);
    long microsToWait;
    synchronized (mutex()) {
        long nowMicros = stopwatch.readMicros();
	// 判斷一下超時時間內(nèi)能不能獲得到鎖, 不能獲得直接返回false
        if (!canAcquire(nowMicros, timeoutMicros)) {
            return false;
        } else {
	    // 可以獲得的話再算下為了獲得這個許可需要等待多長時間
	    // 這邊上面已經(jīng)分析過了
            microsToWait = reserveAndGetWaitLength(permits, nowMicros);
        }
    }
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return true;
}

private boolean canAcquire(long nowMicros, long timeoutMicros) {
    // 就是看下nowMicros + timeoutMicros >= nextFreeTicketMicros
    // 意思就是看下超時時間內(nèi)有沒有達到可以獲取令牌的時間
    return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}

@Override
final long queryEarliestAvailable(long nowMicros) {
    return nextFreeTicketMicros;
}

2.4、setRate

public final void setRate(double permitsPerSecond) {
    synchronized (mutex()) {
       doSetRate(permitsPerSecond, stopwatch.readMicros());
    }
}

@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
    resync(nowMicros);
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
}

3、SmoothWarmingUp實現(xiàn)

3.1、構(gòu)造方法

Guava中RateLimiter的實現(xiàn)原理是什么

// SmoothWamingUp實現(xiàn)
public static RateLimiter create(double permitsPerSecond, Duration warmupPeriod) {
    return create(permitsPerSecond, toNanosSaturated(warmupPeriod), TimeUnit.NANOSECONDS);
}

public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
    return create(permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer());
}

@VisibleForTesting
static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit,
                          double coldFactor, SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
}

SmoothWarmingUp( SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {
    super(stopwatch);
    // 為了達到從0到maxPermits花費warmupPeriodMicros的時間
    this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
    this.coldFactor = coldFactor;
}

3.2、setRate

public final void setRate(double permitsPerSecond) {
    synchronized (mutex()) {
       doSetRate(permitsPerSecond, stopwatch.readMicros());
    }
}

@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
    // 實現(xiàn)和SmoothBursty一樣
    resync(nowMicros);
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
}

/**
  * 含義是storedPermits中每個permit的增長速度
  * 為了達到從 0 到 maxPermits 花費 warmupPeriodMicros 的時間
  */
@Override
double coolDownIntervalMicros() {
    return warmupPeriodMicros / maxPermits;
}

@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
    double oldMaxPermits = maxPermits;
    // coldFactor是固定的3
    // 當達到maxPermits時, 此時系統(tǒng)處于最冷卻的時候, 獲取一個permit需要coldIntervalMicros
    // 而如果storedPermits < thresholPermits的時候, 只需要stableIntervalMicros
    double coldIntervalMicros = stableIntervalMicros * coldFactor;

    // https://www.fangzhipeng.com/springcloud/2019/08/20/ratelimit-guava-sentinel.html
    // https://blog.csdn.net/forezp/article/details/100060686
    // 梯形的面積等于2倍的長方形的面積
    thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;

    maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);

    // 計算斜線的斜率
    slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
    if (oldMaxPermits == Double.POSITIVE_INFINITY) {
      // if we don't special-case this, we would get storedPermits == NaN, below
      storedPermits = 0.0;
    } else {
      // 等比例縮放
      storedPermits = (oldMaxPermits == 0.0) ? maxPermits // initial state is cold
                                             : storedPermits * maxPermits / oldMaxPermits;
    }
}

Guava中RateLimiter的實現(xiàn)原理是什么

// storedPermits是上面兩個圖中最右側(cè)那條綠色線, 表示RateLimiter已經(jīng)存儲了多少許可證, 那么獲取storedPermits許可證
// 時應(yīng)當也是從最右側(cè)開始拿, 從右往左減許可證. 因此就會出現(xiàn)3種情況
// 1) storedPermits已經(jīng)大于thresholdPermits, 而且所需的許可證permitsToTake右側(cè)部分已經(jīng)足夠提供, 對應(yīng)上圖2
// 2) storedPermits已經(jīng)大于thresholdPermits, 而且所需的許可證右側(cè)不夠, 還需要從左側(cè)拿, 對應(yīng)上圖1
// 3) storedPermits小于thresholdPermits, 此時獲取每個許可證所需的時間是固定的, 對應(yīng)下面if邏輯返回false的情況
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
    double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
    long micros = 0;
    // 計算梯形部分的面積
    if (availablePermitsAboveThreshold > 0.0) {
      double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
      double length = permitsToTime(availablePermitsAboveThreshold)
                      + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
      micros = (long) (permitsAboveThresholdToTake * length / 2.0);
      permitsToTake -= permitsAboveThresholdToTake;
    }
    // 計算長方形部分的面積
    micros += (long) (stableIntervalMicros * permitsToTake);
    return micros;
}

上述就是小編為大家分享的Guava中RateLimiter的實現(xiàn)原理是什么了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI