溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何解析Hystrix核心原理和斷路器源碼

發(fā)布時(shí)間:2021-10-21 13:56:21 來源:億速云 閱讀:151 作者:柒染 欄目:大數(shù)據(jù)

這篇文章將為大家詳細(xì)講解有關(guān)如何解析Hystrix核心原理和斷路器源碼,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。

Hystrix運(yùn)行原理

如何解析Hystrix核心原理和斷路器源碼

  1. 構(gòu)造一個(gè)HystrixCommand或HystrixObservableCommand對(duì)象

  2. 執(zhí)行命令。

  3. 檢查是否已命中緩存,如果命中直接返回。

  4. 檢查斷路器開關(guān)是否打開,如果打開,直接熔斷,走fallback邏輯。

  5. 檢查線程池/隊(duì)列/信號(hào)量是否已滿,如果已滿,直接拒絕請(qǐng)求,走fallback邏輯。

  6. 上面條件都不滿足,調(diào)用HystrixObservableCommand.construct()方法HystrixCommand.run()方法,執(zhí)行業(yè)務(wù)邏輯。

  7. 判斷運(yùn)行業(yè)務(wù)邏輯方法是否出現(xiàn)異?;蛘叱瑫r(shí),如果出現(xiàn),直接降級(jí),走fallback邏輯。

  8. 上報(bào)統(tǒng)計(jì)數(shù)據(jù),用戶計(jì)算斷路器狀態(tài)。

  9. 返回結(jié)果

從流程圖可以發(fā)現(xiàn),只有出現(xiàn)57兩種情況時(shí),才會(huì)上報(bào)錯(cuò)誤統(tǒng)計(jì)數(shù)據(jù)。

斷路器運(yùn)行原理

如何解析Hystrix核心原理和斷路器源碼

斷路器的開關(guān)控制邏輯如下:

  1. 在一個(gè)統(tǒng)計(jì)時(shí)間窗口內(nèi)(HystrixCommandProperties.metricsRollingStatisticalWindowInMilliseconds(),處理的請(qǐng)求數(shù)量達(dá)到設(shè)定的最小閾值(HystrixCommandProperties.circuitBreakerRequestVolumeThreshold()),并且錯(cuò)誤百分比超過設(shè)定的最大閾值(HystrixCommandProperties.circuitBreakerErrorThresholdPercentage()),這時(shí)斷路器開關(guān)就會(huì)打開,斷路器狀態(tài)從轉(zhuǎn)換CLOSED切換為OPEN。

  2. 當(dāng)斷路器為打開狀態(tài)時(shí),它他會(huì)直接熔斷所有請(qǐng)求(快速失?。?,走fallback邏輯。

  3. 經(jīng)過一個(gè)睡眠窗口時(shí)間后(HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds()),Hystrix會(huì)放行一個(gè)請(qǐng)到后續(xù)服務(wù),并將斷路器開關(guān)切換為半開狀態(tài)(HALF-OPEN)。如果該請(qǐng)求失敗,則斷路器會(huì)將熔斷開關(guān)切換為打開狀態(tài)(OPEN),繼續(xù)熔斷所有請(qǐng)求,直到下一個(gè)睡眠時(shí)間窗口的到來;如果該請(qǐng)求成功,則斷路器會(huì)切換到關(guān)閉狀態(tài)(CLOSED),這時(shí)將允許所有請(qǐng)求通過,直到出現(xiàn)1步驟的情況,斷路器開關(guān)會(huì)切換為打開狀態(tài)(OPEN)。

斷路器源碼

Hystrix斷路器的實(shí)現(xiàn)類是HystrixCircuitBreaker,源碼如下:

/**
 * Circuit-breaker logic that is hooked into {@link HystrixCommand} execution and will stop allowing executions if failures have gone past the defined threshold.
 * 斷路器,在HystrixCommand執(zhí)行時(shí)會(huì)調(diào)用斷路器邏輯,如果故障超過定義的閾值,斷路器熔斷開關(guān)將會(huì)打開,這時(shí)將阻止任務(wù)執(zhí)行。
 * <p>
 * The default (and only) implementation  will then allow a single retry after a defined sleepWindow until the execution
 * succeeds at which point it will again close the circuit and allow executions again.
 * <p>
 * 默認(rèn)(且唯一)實(shí)現(xiàn)將允許在定義的sleepWindow之后進(jìn)行單次重試,直到執(zhí)行成功,此時(shí)它將再次關(guān)閉電路并允許再次執(zhí)行。
 */
public interface HystrixCircuitBreaker {

    /**
     * Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not.  It is idempotent and does
     * not modify any internal state, and takes into account the half-open logic which allows some requests through
     * after the circuit has been opened
     * <p>
     * 每個(gè)HystrixCommand請(qǐng)求都會(huì)詢問是否允許繼續(xù)(當(dāng)斷路器開關(guān)為OPEN和HALF_OPEN都時(shí)返回false,當(dāng)斷路器開關(guān)是CLOSE時(shí)或者到了下一個(gè)睡眠窗口時(shí)返回true)。
     * 它是冪等的,不會(huì)修改任何內(nèi)部狀態(tài),并考慮到半開邏輯,當(dāng)一個(gè)睡眠窗口到來時(shí)他會(huì)放行一些請(qǐng)求到后續(xù)邏輯
     *
     * @return boolean whether a request should be permitted (是否應(yīng)允許請(qǐng)求)
     */
    boolean allowRequest();

    /**
     * Whether the circuit is currently open (tripped).
     * 判斷熔斷開關(guān)是否打開(如果是OPEN或HALF_OPEN時(shí)都返回true,如果為CLOSE時(shí)返回false,無副作用,是冪等方式)。
     *
     * @return boolean state of circuit breaker(返回?cái)嗦菲鞯臓顟B(tài))
     */
    boolean isOpen();

    /**
     * Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
     * <p>
     * 斷路器在處于半開狀態(tài)時(shí),作為反饋機(jī)制的一部分,從HystrixCommand成功執(zhí)行時(shí)調(diào)用。
     */
    void markSuccess();

    /**
     * Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
     * 斷路器當(dāng)處于半開狀態(tài)時(shí),作為反饋機(jī)制的一部分,從HystrixCommand執(zhí)行不成功的調(diào)用。
     */
    void markNonSuccess();

    /**
     * Invoked at start of command execution to attempt an execution.  This is non-idempotent - it may modify internal
     * state.
     * <p>
     * 在命令執(zhí)行開始時(shí)調(diào)用以嘗試執(zhí)行,主要所用時(shí)判斷該請(qǐng)求是否可以執(zhí)行。這是非冪等的 - 它可能會(huì)修改內(nèi)部狀態(tài)。
     */
    boolean attemptExecution();
}

斷路器的默認(rèn)實(shí)現(xiàn)就是它的一個(gè)內(nèi)部類:

/**
 * @ExcludeFromJavadoc
 * @ThreadSafe
 */
class Factory {
    // String is HystrixCommandKey.name() (we can't use HystrixCommandKey directly as we can't guarantee it implements hashcode/equals correctly)
    // key是HystrixCommandKey.name()(我們不能直接使用HystrixCommandKey,因?yàn)槲覀儫o法保證它正確實(shí)現(xiàn)hashcode / equals)
    private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

    /**
     * 根據(jù)HystrixCommandKey獲取HystrixCircuitBreaker
     * Get the {@link HystrixCircuitBreaker} instance for a given {@link HystrixCommandKey}.
     * <p>
     * This is thread-safe and ensures only 1 {@link HystrixCircuitBreaker} per {@link HystrixCommandKey}.
     *
     * @param key        {@link HystrixCommandKey} of {@link HystrixCommand} instance requesting the {@link HystrixCircuitBreaker}
     * @param group      Pass-thru to {@link HystrixCircuitBreaker}
     * @param properties Pass-thru to {@link HystrixCircuitBreaker}
     * @param metrics    Pass-thru to {@link HystrixCircuitBreaker}
     * @return {@link HystrixCircuitBreaker} for {@link HystrixCommandKey}
     */
    public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        // this should find it for all but the first time
        // 根據(jù)HystrixCommandKey獲取斷路器
        HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
        if (previouslyCached != null) {
            return previouslyCached;
        }

        // if we get here this is the first time so we need to initialize

        // Create and add to the map ... use putIfAbsent to atomically handle the possible race-condition of
        // 2 threads hitting this point at the same time and let ConcurrentHashMap provide us our thread-safety
        // If 2 threads hit here only one will get added and the other will get a non-null response instead.
        // 第一次沒有獲取到斷路器,那么我們需要取初始化它
        // 這里直接利用ConcurrentHashMap的putIfAbsent方法,它是原子操作,加入有兩個(gè)線程執(zhí)行到這里,將會(huì)只有一個(gè)線程將值放到容器中
        // 讓我們省掉了加鎖的步驟
        HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
        if (cbForCommand == null) {
            // this means the putIfAbsent step just created a new one so let's retrieve and return it
            return circuitBreakersByCommand.get(key.name());
        } else {
            // this means a race occurred and while attempting to 'put' another one got there before
            // and we instead retrieved it and will now return it
            return cbForCommand;
        }
    }

    /**
     * 根據(jù)HystrixCommandKey獲取HystrixCircuitBreaker,如果沒有返回NULL
     * Get the {@link HystrixCircuitBreaker} instance for a given {@link HystrixCommandKey} or null if none exists.
     *
     * @param key {@link HystrixCommandKey} of {@link HystrixCommand} instance requesting the {@link HystrixCircuitBreaker}
     * @return {@link HystrixCircuitBreaker} for {@link HystrixCommandKey}
     */
    public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) {
        return circuitBreakersByCommand.get(key.name());
    }

    /**
     * Clears all circuit breakers. If new requests come in instances will be recreated.
     * 清除所有斷路器。如果有新的請(qǐng)求將會(huì)重新創(chuàng)建斷路器放到容器。
     */
    /* package */
    static void reset() {
        circuitBreakersByCommand.clear();
    }
}


/**
 * 默認(rèn)的斷路器實(shí)現(xiàn)
 * The default production implementation of {@link HystrixCircuitBreaker}.
 *
 * @ExcludeFromJavadoc
 * @ThreadSafe
 */
/* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
    private final HystrixCommandProperties properties;
    private final HystrixCommandMetrics metrics;

    enum Status {
        // 斷路器狀態(tài),關(guān)閉,打開,半開
        CLOSED, OPEN, HALF_OPEN;
    }

    // 賦值操作不是線程安全的。若想不用鎖來實(shí)現(xiàn),可以用AtomicReference<V>這個(gè)類,實(shí)現(xiàn)對(duì)象引用的原子更新。
    // AtomicReference 原子引用,保證Status原子性修改
    private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
    // 記錄斷路器打開的時(shí)間點(diǎn)(時(shí)間戳),如果這個(gè)時(shí)間大于0表示斷路器處于打開狀態(tài)或半開狀態(tài)
    private final AtomicLong circuitOpened = new AtomicLong(-1);
    private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);

    protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        this.properties = properties;
        this.metrics = metrics;

        //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
        // 在定時(shí)器上,當(dāng)命令執(zhí)行發(fā)生時(shí),這將在OPEN / CLOSED之間設(shè)置電路
        Subscription s = subscribeToStream();
        activeSubscription.set(s);
    }

    private Subscription subscribeToStream() {
        /*
         * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
         * 此流將重新計(jì)算運(yùn)行狀況流中每個(gè)onNext上的OPEN / CLOSED狀態(tài)
         */
        return metrics.getHealthCountsStream()
                .observe()
                .subscribe(new Subscriber<HealthCounts>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(HealthCounts hc) {
                        // check if we are past the statisticalWindowVolumeThreshold
                        // 檢查一個(gè)時(shí)間窗口內(nèi)的最小請(qǐng)求數(shù)
                        if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                            // we are not past the minimum volume threshold for the stat window,
                            // so no change to circuit status.
                            // if it was CLOSED, it stays CLOSED
                            // IF IT WAS HALF-OPEN, WE NEED TO WAIT FOR A SUCCESSFUL COMMAND EXECUTION
                            // if it was open, we need to wait for sleep window to elapse
                            // 我們沒有超過統(tǒng)計(jì)窗口的最小音量閾值,所以我們不會(huì)去改變斷路器狀態(tài),如果是closed狀態(tài),他將保持這個(gè)狀態(tài)
                            // 如果是半開狀態(tài),那么她需要等到一個(gè)成功的 Command執(zhí)行
                            // 如果是打開狀態(tài),那么它需要等到這個(gè)時(shí)間窗口過去
                        } else {
                            // 檢查錯(cuò)誤比例閥值
                            if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                                //we are not past the minimum error threshold for the stat window,
                                // so no change to circuit status.
                                // if it was CLOSED, it stays CLOSED
                                // if it was half-open, we need to wait for a successful command execution
                                // if it was open, we need to wait for sleep window to elapse
                            } else {
                                // our failure rate is too high, we need to set the state to OPEN
                                // 我們的失敗率太高,我們需要將狀態(tài)設(shè)置為OPEN
                                if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                                    circuitOpened.set(System.currentTimeMillis());
                                }
                            }
                        }
                    }
                });
    }

    @Override
    public void markSuccess() {
        // 斷路器是處理半開并且HystrixCommand執(zhí)行成功,將狀態(tài)設(shè)置成關(guān)閉
        if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
            //This thread wins the race to close the circuit - it resets the stream to start it over from 0
            //該線程贏得了關(guān)閉電路的競(jìng)爭(zhēng) - 它重置流以從0開始
            metrics.resetStream();
            Subscription previousSubscription = activeSubscription.get();
            if (previousSubscription != null) {
                previousSubscription.unsubscribe();
            }
            Subscription newSubscription = subscribeToStream();
            activeSubscription.set(newSubscription);
            circuitOpened.set(-1L);
        }
    }

    @Override
    public void markNonSuccess() {
        // 斷路器是處理半開并且HystrixCommand執(zhí)行成功,將狀態(tài)設(shè)置成打開
        if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
            //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
            // 該線程贏得了重新打開電路的競(jìng)爭(zhēng) - 它重置了睡眠窗口的開始時(shí)間
            circuitOpened.set(System.currentTimeMillis());
        }
    }

    @Override
    public boolean isOpen() {
        // 獲取配置判斷斷路器是否強(qiáng)制打開
        if (properties.circuitBreakerForceOpen().get()) {
            return true;
        }
        // 獲取配置判斷斷路器是否強(qiáng)制關(guān)閉
        if (properties.circuitBreakerForceClosed().get()) {
            return false;
        }
        return circuitOpened.get() >= 0;
    }

    @Override
    public boolean allowRequest() {
        // 獲取配置判斷斷路器是否強(qiáng)制打開
        if (properties.circuitBreakerForceOpen().get()) {
            return false;
        }
        // 獲取配置判斷斷路器是否強(qiáng)制關(guān)閉
        if (properties.circuitBreakerForceClosed().get()) {
            return true;
        }
        if (circuitOpened.get() == -1) {
            return true;
        } else {
            // 如果是半開狀態(tài)則返回不允許Command執(zhí)行
            if (status.get().equals(Status.HALF_OPEN)) {
                return false;
            } else {
                // 檢查睡眠窗口是否過了
                return isAfterSleepWindow();
            }
        }
    }

    private boolean isAfterSleepWindow() {
        final long circuitOpenTime = circuitOpened.get();
        final long currentTime = System.currentTimeMillis();
        // 獲取配置的一個(gè)睡眠的時(shí)間窗口
        final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
        return currentTime > circuitOpenTime + sleepWindowTime;
    }

    @Override
    public boolean attemptExecution() {
        // 獲取配置判斷斷路器是否強(qiáng)制打開
        if (properties.circuitBreakerForceOpen().get()) {
            return false;
        }
        // 獲取配置判斷斷路器是否強(qiáng)制關(guān)閉
        if (properties.circuitBreakerForceClosed().get()) {
            return true;
        }
        if (circuitOpened.get() == -1) {
            return true;
        } else {
            if (isAfterSleepWindow()) {
                //only the first request after sleep window should execute
                //if the executing command succeeds, the status will transition to CLOSED
                //if the executing command fails, the status will transition to OPEN
                //if the executing command gets unsubscribed, the status will transition to OPEN
                // 只有一個(gè)睡眠窗口后的第一個(gè)請(qǐng)求會(huì)被執(zhí)行
                // 如果執(zhí)行命令成功,狀態(tài)將轉(zhuǎn)換為CLOSED
                // 如果執(zhí)行命令失敗,狀態(tài)將轉(zhuǎn)換為OPEN
                // 如果執(zhí)行命令取消訂閱,狀態(tài)將過渡到OPEN
                if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                    return true;
                } else {
                    return false;
                }
            } else {
                return false;
            }
        }
    }
}
  • isOpen():判斷熔斷開關(guān)是否打開(該方法是否冪等和Hystrix版本相關(guān))。

  • allowRequest():每個(gè)HystrixCommand請(qǐng)求都會(huì)詢問是否允許繼續(xù)執(zhí)行(當(dāng)斷路器開關(guān)為OPENHALF_OPEN都時(shí)返回false,當(dāng)斷路器開關(guān)是CLOSE或到了下一個(gè)睡眠窗口時(shí)返回true),它是冪等的,不會(huì)修改任何內(nèi)部狀態(tài),并考慮到半開邏輯,當(dāng)一個(gè)睡眠窗口到來時(shí)他會(huì)放行一些請(qǐng)求到后續(xù)邏輯。

  • attemptExecution():在命令執(zhí)行開始時(shí)調(diào)用以嘗試執(zhí)行,主要所用時(shí)判斷該請(qǐng)求是否可以執(zhí)行。這是非冪等的,它可能會(huì)修改內(nèi)部狀態(tài)。

這里需要注意的是allowRequest()方法時(shí)冪等的,可以重復(fù)調(diào)用;attemptExecution()方法是有副作用的,不可以重復(fù)調(diào)用;isOpen()是否冪等和Hystrix版本有關(guān)。

關(guān)于如何解析Hystrix核心原理和斷路器源碼就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI