溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

java中常用的限流框架有哪些

發(fā)布時間:2021-09-09 11:21:09 來源:億速云 閱讀:260 作者:小新 欄目:編程語言

這篇文章主要為大家展示了“java中常用的限流框架有哪些”,內(nèi)容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“java中常用的限流框架有哪些”這篇文章吧。

作為應(yīng)對高并發(fā)的手段之一,限流并不是一個新鮮的話題了。從Guava的Ratelimiter到Hystrix,以及Sentinel都可作為限流的工具。

自適應(yīng)限流

一般的限流常常需要指定一個固定值(qps)作為限流開關(guān)的閾值,這個值一是靠經(jīng)驗判斷,二是靠通過大量的測試數(shù)據(jù)得出。但這個閾值,在流量激增、系統(tǒng)自動伸縮或者某某commit了一段有毒代碼后就有可能變得不那么合適了。并且一般業(yè)務(wù)方也不太能夠正確評估自己的容量,去設(shè)置一個合適的限流閾值。

而此時自適應(yīng)限流就是解決這樣的問題的,限流閾值不需要手動指定,也不需要去預(yù)估系統(tǒng)的容量,并且閾值能夠隨著系統(tǒng)相關(guān)指標(biāo)變化而變化。

自適應(yīng)限流算法借鑒了TCP擁塞算法,根據(jù)各種指標(biāo)預(yù)估限流的閾值,并且不斷調(diào)整。大致獲得的效果如下:

java中常用的限流框架有哪些

從圖上可以看到,首先以一個降低的初始并發(fā)值發(fā)送請求,同時通過增大限流窗口來探測系統(tǒng)更高的并發(fā)性。而一旦延遲增加到一定程度了,又會退回到較小的限流窗口。循環(huán)往復(fù)持續(xù)探測并發(fā)極限,從而產(chǎn)生類似鋸齒狀的時間關(guān)系函數(shù)。

TCP Vegas

vegas是一種主動調(diào)整cwnd的擁塞控制算法,主要是設(shè)置兩個閾值alpha 和 beta,然后通過計算目標(biāo)速率和實際速率的差diff,再比較差diff與alpha和beta的關(guān)系,對cwnd進(jìn)行調(diào)節(jié)。偽代碼如下:

diff = cwnd*(1-baseRTT/RTT)  if (diff < alpha)  set: cwndcwnd = cwnd + 1   else if (diff >= beta)  set: cwndcwnd = cwnd - 1  else  set: cwndcwnd = cwnd

其中baseRTT指的是測量的最小往返時間,RTT指的是當(dāng)前測量的往返時間,cwnd指的是當(dāng)前的TCP窗口大小。通常在tcp中alpha會被設(shè)置成2-3,beta會被設(shè)置成4-6。這樣子,cwnd就保持在了一個平衡的狀態(tài)。

netflix-concuurency-limits

concuurency-limits是netflix推出的自適應(yīng)限流組件,借鑒了TCP相關(guān)擁塞控制算法,主要是根據(jù)請求延時,及其直接影響到的排隊長度來進(jìn)行限流窗口的動態(tài)調(diào)整。

alpha , beta & threshold

vegas算法實現(xiàn)在了VegasLimit類中。先看一下初始化相關(guān)代碼:

private int initialLimit = 20;          private int maxConcurrency = 1000;          private MetricRegistry registry = EmptyMetricRegistry.INSTANCE;          private double smoothing = 1.0;                 private Function<Integer, Integer> alphaFunc = (limit) -> 3 * LOG10.apply(limit.intValue());          private Function<Integer, Integer> betaFunc = (limit) -> 6 * LOG10.apply(limit.intValue());          private Function<Integer, Integer> thresholdFunc = (limit) -> LOG10.apply(limit.intValue());          private Function<Double, Double> increaseFunc = (limit) -> limit + LOG10.apply(limit.intValue());          private Function<Double, Double> decreaseFunc = (limit) -> limit - LOG10.apply(limit.intValue());

這里首先定義了一個初始化值initialLimit為20,以及極大值maxConcurrency1000。其次是三個閾值函數(shù)alphaFunc,betaFunc以及thresholdFunc。最后是兩個增減函數(shù)increaseFunc和decreaseFunc。

函數(shù)都是基于當(dāng)前的并發(fā)值limit做運算的。

alphaFunc可類比vegas算法中的alpha,此處的實現(xiàn)是3*log limit。limit值從初始20增加到極大1000時候,相應(yīng)的alpha從3.9增加到了9。

betaFunc則可類比為vegas算法中的beta,此處的實現(xiàn)是6*log limit。limit值從初始20增加到極大1000時候,相應(yīng)的alpha從7.8增加到了18。

thresholdFunc算是新增的一個函數(shù),表示一個較為初始的閾值,小于這個值的時候limit會采取激進(jìn)一些的增量算法。這里的實現(xiàn)是1倍的log limit。mit值從初始20增加到極大1000時候,相應(yīng)的alpha從1.3增加到了3。

這三個函數(shù)值可以認(rèn)為確定了動態(tài)調(diào)整函數(shù)的四個區(qū)間范圍。當(dāng)變量queueSize = limit &times; (1 &minus; RTTnoLoad/RTTactual)落到這四個區(qū)間的時候應(yīng)用不同的調(diào)整函數(shù)。

變量queueSize

其中變量為queueSize,計算方法即為limit &times; (1 &minus; RTTnoLoad/RTTactual),為什么這么計算其實稍加領(lǐng)悟一下即可。

java中常用的限流框架有哪些

我們把系統(tǒng)處理請求的過程想象為一個水管,到來的請求是往這個水管灌水,當(dāng)系統(tǒng)處理順暢的時候,請求不需要排隊,直接從水管中穿過,這個請求的RT是最短的,即RTTnoLoad;

反之,當(dāng)請求堆積的時候,那么處理請求的時間則會變?yōu)椋号抨爼r間+最短處理時間,即RTTactual = inQueueTime + RTTnoLoad。而顯然排隊的隊列長度為

總排隊時間/每個請求的處理時間及queueSize = (limit * inQueueTime) / (inQueueTime + RTTnoLoad) = limit &times; (1 &minus; RTTnoLoad/RTTactual)。

再舉個栗子,因為假設(shè)當(dāng)前延時即為最佳延時,那么自然是不用排隊的,即queueSize=0。而假設(shè)當(dāng)前延時為最佳延時的一倍的時候,可以認(rèn)為處理能力折半,100個流量進(jìn)來會有一半即50個請求在排隊,及queueSize= 100 * (1 &minus; 1/2)=50。

動態(tài)調(diào)整函數(shù)

調(diào)整函數(shù)中最重要的即增函數(shù)與減函數(shù)。從初始化的代碼中得知,增函數(shù)increaseFunc實現(xiàn)為limit+log limit,減函數(shù)decreaseFunc實現(xiàn)為limit-log limit,相對來說增減都是比較保守的。

看一下應(yīng)用動態(tài)調(diào)整函數(shù)的相關(guān)代碼:

private int updateEstimatedLimit(long rtt, int inflight, boolean didDrop) {          final int queueSize = (int) Math.ceil(estimatedLimit * (1 - (double)rtt_noload / rtt));          double newLimit;          // Treat any drop (i.e timeout) as needing to reduce the limit          // 發(fā)現(xiàn)錯誤直接應(yīng)用減函數(shù)decreaseFunc          if (didDrop) {              newLimit = decreaseFunc.apply(estimatedLimit);          // Prevent upward drift if not close to the limit          } else if (inflight * 2 < estimatedLimit) {              return (int)estimatedLimit;          } else {              int alpha = alphaFunc.apply((int)estimatedLimit);              int beta = betaFunc.apply((int)estimatedLimit);              int threshold = this.thresholdFunc.apply((int)estimatedLimit);              // Aggressive increase when no queuing              if (queueSize <= threshold) {                  newLimit = estimatedLimit + beta;              // Increase the limit if queue is still manageable              } else if (queueSize < alpha) {                  newLimit = increaseFunc.apply(estimatedLimit);              // Detecting latency so decrease              } else if (queueSize > beta) {                  newLimit = decreaseFunc.apply(estimatedLimit);              // We're within he sweet spot so nothing to do              } else {                  return (int)estimatedLimit;              }          }          newLimit = Math.max(1, Math.min(maxLimit, newLimit));          newLimit = (1 - smoothing) * estimatedLimit + smoothing * newLimit;          if ((int)newLimit != (int)estimatedLimit && LOG.isDebugEnabled()) {              LOG.debug("New limit={} minRtt={} ms winRtt={} ms queueSize={}",                      (int)newLimit,                      TimeUnit.NANOSECONDS.toMicros(rtt_noload) / 1000.0,                      TimeUnit.NANOSECONDS.toMicros(rtt) / 1000.0,                      queueSize);          }          estimatedLimit = newLimit;          return (int)estimatedLimit;      }

動態(tài)調(diào)整函數(shù)規(guī)則如下:

當(dāng)變量queueSize < threshold時,選取較激進(jìn)的增量函數(shù),newLimit = limit+beta

當(dāng)變量queueSize < alpha時,需要增大限流窗口,選擇增函數(shù)increaseFunc,即newLimit = limit + log limit

當(dāng)變量queueSize處于alpha,beta之間時候,limit不變

當(dāng)變量queueSize大于beta時候,需要收攏限流窗口,選擇減函數(shù)decreaseFunc,即newLimit = limit - log limit

平滑遞減 smoothingDecrease

注意到可以設(shè)置變量smoothing,這里初始值為1,表示平滑遞減不起作用。如果有需要的話可以按需設(shè)置,比如設(shè)置smoothing為0.5時候,那么效果就是采用減函數(shù)decreaseFunc時候效果減半,實現(xiàn)方式為newLimitAfterSmoothing = 0.5 newLimit + 0.5 limit。

以上是“java中常用的限流框架有哪些”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI