您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)如何解析Hystrix核心原理和斷路器源碼,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。
構(gòu)造一個(gè)HystrixCommand或HystrixObservableCommand對(duì)象
執(zhí)行命令。
檢查是否已命中緩存,如果命中直接返回。
檢查斷路器開關(guān)是否打開,如果打開,直接熔斷,走fallback
邏輯。
檢查線程池/隊(duì)列/信號(hào)量是否已滿,如果已滿,直接拒絕請(qǐng)求,走fallback
邏輯。
上面條件都不滿足,調(diào)用HystrixObservableCommand.construct()
方法HystrixCommand.run()
方法,執(zhí)行業(yè)務(wù)邏輯。
判斷運(yùn)行業(yè)務(wù)邏輯方法是否出現(xiàn)異?;蛘叱瑫r(shí),如果出現(xiàn),直接降級(jí),走fallback
邏輯。
上報(bào)統(tǒng)計(jì)數(shù)據(jù),用戶計(jì)算斷路器狀態(tài)。
返回結(jié)果
從流程圖可以發(fā)現(xiàn),只有出現(xiàn)
5
和7
兩種情況時(shí),才會(huì)上報(bào)錯(cuò)誤統(tǒng)計(jì)數(shù)據(jù)。
斷路器的開關(guān)控制邏輯如下:
在一個(gè)統(tǒng)計(jì)時(shí)間窗口內(nèi)(HystrixCommandProperties.metricsRollingStatisticalWindowInMilliseconds()
),處理的請(qǐng)求數(shù)量達(dá)到設(shè)定的最小閾值(HystrixCommandProperties.circuitBreakerRequestVolumeThreshold()
),并且錯(cuò)誤百分比超過設(shè)定的最大閾值(HystrixCommandProperties.circuitBreakerErrorThresholdPercentage()
),這時(shí)斷路器開關(guān)就會(huì)打開,斷路器狀態(tài)從轉(zhuǎn)換CLOSED
切換為OPEN
。
當(dāng)斷路器為打開狀態(tài)時(shí),它他會(huì)直接熔斷所有請(qǐng)求(快速失?。?,走fallback
邏輯。
經(jīng)過一個(gè)睡眠窗口時(shí)間后(HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds()
),Hystrix會(huì)放行一個(gè)請(qǐng)到后續(xù)服務(wù),并將斷路器開關(guān)切換為半開狀態(tài)(HALF-OPEN
)。如果該請(qǐng)求失敗,則斷路器會(huì)將熔斷開關(guān)切換為打開狀態(tài)(OPEN
),繼續(xù)熔斷所有請(qǐng)求,直到下一個(gè)睡眠時(shí)間窗口的到來;如果該請(qǐng)求成功,則斷路器會(huì)切換到關(guān)閉狀態(tài)(CLOSED
),這時(shí)將允許所有請(qǐng)求通過,直到出現(xiàn)1
步驟的情況,斷路器開關(guān)會(huì)切換為打開狀態(tài)(OPEN
)。
Hystrix斷路器的實(shí)現(xiàn)類是HystrixCircuitBreaker,源碼如下:
/** * Circuit-breaker logic that is hooked into {@link HystrixCommand} execution and will stop allowing executions if failures have gone past the defined threshold. * 斷路器,在HystrixCommand執(zhí)行時(shí)會(huì)調(diào)用斷路器邏輯,如果故障超過定義的閾值,斷路器熔斷開關(guān)將會(huì)打開,這時(shí)將阻止任務(wù)執(zhí)行。 * <p> * The default (and only) implementation will then allow a single retry after a defined sleepWindow until the execution * succeeds at which point it will again close the circuit and allow executions again. * <p> * 默認(rèn)(且唯一)實(shí)現(xiàn)將允許在定義的sleepWindow之后進(jìn)行單次重試,直到執(zhí)行成功,此時(shí)它將再次關(guān)閉電路并允許再次執(zhí)行。 */ public interface HystrixCircuitBreaker { /** * Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not. It is idempotent and does * not modify any internal state, and takes into account the half-open logic which allows some requests through * after the circuit has been opened * <p> * 每個(gè)HystrixCommand請(qǐng)求都會(huì)詢問是否允許繼續(xù)(當(dāng)斷路器開關(guān)為OPEN和HALF_OPEN都時(shí)返回false,當(dāng)斷路器開關(guān)是CLOSE時(shí)或者到了下一個(gè)睡眠窗口時(shí)返回true)。 * 它是冪等的,不會(huì)修改任何內(nèi)部狀態(tài),并考慮到半開邏輯,當(dāng)一個(gè)睡眠窗口到來時(shí)他會(huì)放行一些請(qǐng)求到后續(xù)邏輯 * * @return boolean whether a request should be permitted (是否應(yīng)允許請(qǐng)求) */ boolean allowRequest(); /** * Whether the circuit is currently open (tripped). * 判斷熔斷開關(guān)是否打開(如果是OPEN或HALF_OPEN時(shí)都返回true,如果為CLOSE時(shí)返回false,無副作用,是冪等方式)。 * * @return boolean state of circuit breaker(返回?cái)嗦菲鞯臓顟B(tài)) */ boolean isOpen(); /** * Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state. * <p> * 斷路器在處于半開狀態(tài)時(shí),作為反饋機(jī)制的一部分,從HystrixCommand成功執(zhí)行時(shí)調(diào)用。 */ void markSuccess(); /** * Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state. * 斷路器當(dāng)處于半開狀態(tài)時(shí),作為反饋機(jī)制的一部分,從HystrixCommand執(zhí)行不成功的調(diào)用。 */ void markNonSuccess(); /** * Invoked at start of command execution to attempt an execution. This is non-idempotent - it may modify internal * state. * <p> * 在命令執(zhí)行開始時(shí)調(diào)用以嘗試執(zhí)行,主要所用時(shí)判斷該請(qǐng)求是否可以執(zhí)行。這是非冪等的 - 它可能會(huì)修改內(nèi)部狀態(tài)。 */ boolean attemptExecution(); }
斷路器的默認(rèn)實(shí)現(xiàn)就是它的一個(gè)內(nèi)部類:
/** * @ExcludeFromJavadoc * @ThreadSafe */ class Factory { // String is HystrixCommandKey.name() (we can't use HystrixCommandKey directly as we can't guarantee it implements hashcode/equals correctly) // key是HystrixCommandKey.name()(我們不能直接使用HystrixCommandKey,因?yàn)槲覀儫o法保證它正確實(shí)現(xiàn)hashcode / equals) private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>(); /** * 根據(jù)HystrixCommandKey獲取HystrixCircuitBreaker * Get the {@link HystrixCircuitBreaker} instance for a given {@link HystrixCommandKey}. * <p> * This is thread-safe and ensures only 1 {@link HystrixCircuitBreaker} per {@link HystrixCommandKey}. * * @param key {@link HystrixCommandKey} of {@link HystrixCommand} instance requesting the {@link HystrixCircuitBreaker} * @param group Pass-thru to {@link HystrixCircuitBreaker} * @param properties Pass-thru to {@link HystrixCircuitBreaker} * @param metrics Pass-thru to {@link HystrixCircuitBreaker} * @return {@link HystrixCircuitBreaker} for {@link HystrixCommandKey} */ public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { // this should find it for all but the first time // 根據(jù)HystrixCommandKey獲取斷路器 HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name()); if (previouslyCached != null) { return previouslyCached; } // if we get here this is the first time so we need to initialize // Create and add to the map ... use putIfAbsent to atomically handle the possible race-condition of // 2 threads hitting this point at the same time and let ConcurrentHashMap provide us our thread-safety // If 2 threads hit here only one will get added and the other will get a non-null response instead. // 第一次沒有獲取到斷路器,那么我們需要取初始化它 // 這里直接利用ConcurrentHashMap的putIfAbsent方法,它是原子操作,加入有兩個(gè)線程執(zhí)行到這里,將會(huì)只有一個(gè)線程將值放到容器中 // 讓我們省掉了加鎖的步驟 HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics)); if (cbForCommand == null) { // this means the putIfAbsent step just created a new one so let's retrieve and return it return circuitBreakersByCommand.get(key.name()); } else { // this means a race occurred and while attempting to 'put' another one got there before // and we instead retrieved it and will now return it return cbForCommand; } } /** * 根據(jù)HystrixCommandKey獲取HystrixCircuitBreaker,如果沒有返回NULL * Get the {@link HystrixCircuitBreaker} instance for a given {@link HystrixCommandKey} or null if none exists. * * @param key {@link HystrixCommandKey} of {@link HystrixCommand} instance requesting the {@link HystrixCircuitBreaker} * @return {@link HystrixCircuitBreaker} for {@link HystrixCommandKey} */ public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) { return circuitBreakersByCommand.get(key.name()); } /** * Clears all circuit breakers. If new requests come in instances will be recreated. * 清除所有斷路器。如果有新的請(qǐng)求將會(huì)重新創(chuàng)建斷路器放到容器。 */ /* package */ static void reset() { circuitBreakersByCommand.clear(); } } /** * 默認(rèn)的斷路器實(shí)現(xiàn) * The default production implementation of {@link HystrixCircuitBreaker}. * * @ExcludeFromJavadoc * @ThreadSafe */ /* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { private final HystrixCommandProperties properties; private final HystrixCommandMetrics metrics; enum Status { // 斷路器狀態(tài),關(guān)閉,打開,半開 CLOSED, OPEN, HALF_OPEN; } // 賦值操作不是線程安全的。若想不用鎖來實(shí)現(xiàn),可以用AtomicReference<V>這個(gè)類,實(shí)現(xiàn)對(duì)象引用的原子更新。 // AtomicReference 原子引用,保證Status原子性修改 private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED); // 記錄斷路器打開的時(shí)間點(diǎn)(時(shí)間戳),如果這個(gè)時(shí)間大于0表示斷路器處于打開狀態(tài)或半開狀態(tài) private final AtomicLong circuitOpened = new AtomicLong(-1); private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null); protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.properties = properties; this.metrics = metrics; //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur // 在定時(shí)器上,當(dāng)命令執(zhí)行發(fā)生時(shí),這將在OPEN / CLOSED之間設(shè)置電路 Subscription s = subscribeToStream(); activeSubscription.set(s); } private Subscription subscribeToStream() { /* * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream * 此流將重新計(jì)算運(yùn)行狀況流中每個(gè)onNext上的OPEN / CLOSED狀態(tài) */ return metrics.getHealthCountsStream() .observe() .subscribe(new Subscriber<HealthCounts>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(HealthCounts hc) { // check if we are past the statisticalWindowVolumeThreshold // 檢查一個(gè)時(shí)間窗口內(nèi)的最小請(qǐng)求數(shù) if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { // we are not past the minimum volume threshold for the stat window, // so no change to circuit status. // if it was CLOSED, it stays CLOSED // IF IT WAS HALF-OPEN, WE NEED TO WAIT FOR A SUCCESSFUL COMMAND EXECUTION // if it was open, we need to wait for sleep window to elapse // 我們沒有超過統(tǒng)計(jì)窗口的最小音量閾值,所以我們不會(huì)去改變斷路器狀態(tài),如果是closed狀態(tài),他將保持這個(gè)狀態(tài) // 如果是半開狀態(tài),那么她需要等到一個(gè)成功的 Command執(zhí)行 // 如果是打開狀態(tài),那么它需要等到這個(gè)時(shí)間窗口過去 } else { // 檢查錯(cuò)誤比例閥值 if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { //we are not past the minimum error threshold for the stat window, // so no change to circuit status. // if it was CLOSED, it stays CLOSED // if it was half-open, we need to wait for a successful command execution // if it was open, we need to wait for sleep window to elapse } else { // our failure rate is too high, we need to set the state to OPEN // 我們的失敗率太高,我們需要將狀態(tài)設(shè)置為OPEN if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { circuitOpened.set(System.currentTimeMillis()); } } } } }); } @Override public void markSuccess() { // 斷路器是處理半開并且HystrixCommand執(zhí)行成功,將狀態(tài)設(shè)置成關(guān)閉 if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) { //This thread wins the race to close the circuit - it resets the stream to start it over from 0 //該線程贏得了關(guān)閉電路的競(jìng)爭(zhēng) - 它重置流以從0開始 metrics.resetStream(); Subscription previousSubscription = activeSubscription.get(); if (previousSubscription != null) { previousSubscription.unsubscribe(); } Subscription newSubscription = subscribeToStream(); activeSubscription.set(newSubscription); circuitOpened.set(-1L); } } @Override public void markNonSuccess() { // 斷路器是處理半開并且HystrixCommand執(zhí)行成功,將狀態(tài)設(shè)置成打開 if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) { //This thread wins the race to re-open the circuit - it resets the start time for the sleep window // 該線程贏得了重新打開電路的競(jìng)爭(zhēng) - 它重置了睡眠窗口的開始時(shí)間 circuitOpened.set(System.currentTimeMillis()); } } @Override public boolean isOpen() { // 獲取配置判斷斷路器是否強(qiáng)制打開 if (properties.circuitBreakerForceOpen().get()) { return true; } // 獲取配置判斷斷路器是否強(qiáng)制關(guān)閉 if (properties.circuitBreakerForceClosed().get()) { return false; } return circuitOpened.get() >= 0; } @Override public boolean allowRequest() { // 獲取配置判斷斷路器是否強(qiáng)制打開 if (properties.circuitBreakerForceOpen().get()) { return false; } // 獲取配置判斷斷路器是否強(qiáng)制關(guān)閉 if (properties.circuitBreakerForceClosed().get()) { return true; } if (circuitOpened.get() == -1) { return true; } else { // 如果是半開狀態(tài)則返回不允許Command執(zhí)行 if (status.get().equals(Status.HALF_OPEN)) { return false; } else { // 檢查睡眠窗口是否過了 return isAfterSleepWindow(); } } } private boolean isAfterSleepWindow() { final long circuitOpenTime = circuitOpened.get(); final long currentTime = System.currentTimeMillis(); // 獲取配置的一個(gè)睡眠的時(shí)間窗口 final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get(); return currentTime > circuitOpenTime + sleepWindowTime; } @Override public boolean attemptExecution() { // 獲取配置判斷斷路器是否強(qiáng)制打開 if (properties.circuitBreakerForceOpen().get()) { return false; } // 獲取配置判斷斷路器是否強(qiáng)制關(guān)閉 if (properties.circuitBreakerForceClosed().get()) { return true; } if (circuitOpened.get() == -1) { return true; } else { if (isAfterSleepWindow()) { //only the first request after sleep window should execute //if the executing command succeeds, the status will transition to CLOSED //if the executing command fails, the status will transition to OPEN //if the executing command gets unsubscribed, the status will transition to OPEN // 只有一個(gè)睡眠窗口后的第一個(gè)請(qǐng)求會(huì)被執(zhí)行 // 如果執(zhí)行命令成功,狀態(tài)將轉(zhuǎn)換為CLOSED // 如果執(zhí)行命令失敗,狀態(tài)將轉(zhuǎn)換為OPEN // 如果執(zhí)行命令取消訂閱,狀態(tài)將過渡到OPEN if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) { return true; } else { return false; } } else { return false; } } } }
isOpen()
:判斷熔斷開關(guān)是否打開(該方法是否冪等和Hystrix版本相關(guān))。
allowRequest()
:每個(gè)HystrixCommand請(qǐng)求都會(huì)詢問是否允許繼續(xù)執(zhí)行(當(dāng)斷路器開關(guān)為OPEN
和HALF_OPEN
都時(shí)返回false,當(dāng)斷路器開關(guān)是CLOSE
或到了下一個(gè)睡眠窗口時(shí)返回true),它是冪等的,不會(huì)修改任何內(nèi)部狀態(tài),并考慮到半開邏輯,當(dāng)一個(gè)睡眠窗口到來時(shí)他會(huì)放行一些請(qǐng)求到后續(xù)邏輯。
attemptExecution()
:在命令執(zhí)行開始時(shí)調(diào)用以嘗試執(zhí)行,主要所用時(shí)判斷該請(qǐng)求是否可以執(zhí)行。這是非冪等的,它可能會(huì)修改內(nèi)部狀態(tài)。
這里需要注意的是
allowRequest()
方法時(shí)冪等的,可以重復(fù)調(diào)用;attemptExecution()
方法是有副作用的,不可以重復(fù)調(diào)用;isOpen()
是否冪等和Hystrix版本有關(guān)。
關(guān)于如何解析Hystrix核心原理和斷路器源碼就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。