溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

spring cloud中Hystrix指標收集原理是什么

發(fā)布時間:2021-10-20 16:02:15 來源:億速云 閱讀:148 作者:柒染 欄目:大數(shù)據(jù)

這期內(nèi)容當中小編將會給大家?guī)碛嘘P(guān)spring cloud中Hystrix指標收集原理是什么,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

hystrix運行原理
 

上一篇介紹了hystrix熔斷降級的基本實現(xiàn)原理,著重點是從hystrix自身的能力方面著手,結(jié)合代碼,做了整體介紹,那么觸發(fā)熔斷的指標是如何計算的,可能前面會籠統(tǒng)的提到metrics,至于它的metrics實現(xiàn)原理是怎么樣的,在本章做重點介紹
官方圖示:
spring cloud中Hystrix指標收集原理是什么

  1. 對于使用者先構(gòu)造一個HystrixCommand對象或者HystrixObservalbeCommand

  2. 選擇queue或者execute,調(diào)用者決定是使用異步還是同步方式

  3. 根據(jù)commandKey看緩存中是否存在Observalbe,開啟緩存是為了提升性能,直接返回輸出

  4. 沒有緩存,那就開始走熔斷器的邏輯,先判斷熔斷器是不是開啟狀態(tài)

  5. 熔斷器開啟,觸發(fā)快速失敗,觸發(fā)降級,去執(zhí)行用戶提供的fallback()邏輯

  6. 判斷是不是并發(fā)超限,超限,觸發(fā)降級,則發(fā)出執(zhí)行拒絕的異常,去執(zhí)行用戶提供的fallback邏輯

  7. 執(zhí)行用戶實現(xiàn)的具體業(yè)務(wù)邏輯,是否出現(xiàn)執(zhí)行異?;蛘叱瑫r,異?;虺瑫r,則觸發(fā)降級去執(zhí)行用戶提供的fallback邏輯

  8. 執(zhí)行結(jié)束

  9. 無論是正常結(jié)束還是執(zhí)行異常,都會觸發(fā)metrics的收集,收集的結(jié)果經(jīng)過計算后,提供給熔斷器,做開啟和關(guān)閉的決策

指標收集的實現(xiàn)

這部分我們需要從以下幾個方面做分析:指標上報、指標計算、指標使用,這期間會涉及多線程的并發(fā)寫入、消息的順序到達、滑動窗口的實現(xiàn)等等

  1. 指標上報
    每一個請求線程,都會創(chuàng)建一個ExecutionResult實例,這個實例會關(guān)聯(lián)一些基礎(chǔ)事件比如開始時間、執(zhí)行延遲、事件統(tǒng)計等基礎(chǔ)信息,也就是在整個hystrix的生命周期里面,會通過指標上報的方式做數(shù)據(jù)的收集,下面看下數(shù)據(jù)上報的幾個事件:
    1.1、executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());//判斷斷路器未開啟,并發(fā)未超限,記錄執(zhí)行的開始時間
    1.2、executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);//執(zhí)行成功會增加success的事件和耗時
    1.3、HystrixEventType.SHORT_CIRCUITED//斷路器打開,會收集快速熔斷的事件和耗時
    1.4、HystrixEventType.SEMAPHORE_REJECTED//信號量方式并發(fā)數(shù)超限,會記錄該事件和耗時
    1.5、HystrixEventType.THREAD_POOL_REJECTED//線程池不可用(并發(fā)超限),會記錄該事件和耗時
    1.6、HystrixEventType.TIMEOUT//執(zhí)行超時,會收集該事件和耗時
    1.7、HystrixEventType.BAD_REQUEST//參數(shù)或狀態(tài)異常,會收集該事件和耗時
    以上整體的事件分為兩大類,成功和失敗,根據(jù)用戶邏輯代碼的執(zhí)行結(jié)果,如果是有異常,收集異常事件和耗時,執(zhí)行circuitBreaker.markNonSuccess(),否則執(zhí)行circuitBreaker.markNonSuccess()
    另外觸發(fā)熔斷器開啟和關(guān)閉,有且只有兩個途徑,如下圖:
    spring cloud中Hystrix指標收集原理是什么

  2. 指標計算
    spring cloud中Hystrix指標收集原理是什么
    這里簡單對各個步驟中涉及到多線程并發(fā)的情況以及滑動窗口的計算做一個簡單介紹:
    2.1:并發(fā)(threadLocal&SerializedSubject)
    同一個接口收到多個請求時候,也就是這些請求命中的都是同一個commandKey時(統(tǒng)計指標是按照KEY為維度),每個請求都是一個獨立的線程,每個線程內(nèi)會產(chǎn)生多個各種各樣的事件,首先同一個線程內(nèi)的event拼接封裝成HystrixCommandCompletion,上報的是一個HystrixCommandCompletion,流計算操作的也是一個個的HystrixCommandCompletion,不存在計算時候把各線程的事件混雜在一起的可能,如何保證的在下面會講到
    2.1.1:上報者是通過threadLocal線程隔離
    首先hystrix啟動后會創(chuàng)建一個threadLocal,當一個客戶端請求不管是正常結(jié)束還是異常結(jié)束,都要上報上報狀態(tài),也就是執(zhí)行handleCommandEnd,都會從threadLocal中返回一個當前線程的HystrixThreadEventStream,代碼如下:

        private void handleCommandEnd(boolean commandExecutionStarted) {
        	//省略部分代碼
            if (executionResultAtTimeOfCancellation == null) {
                //上報metrics
                metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted);
            } else {
                metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);
            }
        }


    	void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) {
            //threadLocal中放置的是HystrixThreadEventStream,因為改寫了init方法,所以無需set,直接可以獲取
            HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey);
            if (executionStarted) {
                concurrentExecutionCount.decrementAndGet();
            }
       }
        //從threadLocal中獲取事件流
        public static HystrixThreadEventStream getInstance() {
            return threadLocalStreams.get();
        }   
        //threadLocal的定義,改寫了init方法,所以不用單獨調(diào)用set
         private static final ThreadLocal<HystrixThreadEventStream> threadLocalStreams = new ThreadLocal<HystrixThreadEventStream>() {
            @Override
            protected HystrixThreadEventStream initialValue() {
                return new HystrixThreadEventStream(Thread.currentThread());
            }
        }

    2.1.2:限流隊列
    每個線程會有唯一的HystrixThreadEventStream,因為是從theadLocal獲取,每個HystrixThreadEventStream都會關(guān)聯(lián)一個由Subject實現(xiàn)的隊列,也就是每一個線程都有一個私有的隊列,這里說它提供限流是因為采用了‘背壓’的原理,所謂的‘背壓’是指按需提供,根據(jù)消費者的能力去往隊列生產(chǎn),代碼如下:

        public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {
        	//把executionResult封裝成HystrixCommandCompletion,HystrixCommandCompletion是流計算操作的基本單位
            HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey);
            //writeOnlyCommandCompletionSubject就是一個通過RXjava實現(xiàn)的限流隊列
            writeOnlyCommandCompletionSubject.onNext(event);
        }
    //省略代碼
            writeOnlyCommandCompletionSubject
                    .onBackpressureBuffer()//開啟'背壓功能'
                    .doOnNext(writeCommandCompletionsToShardedStreams)//核心是這個action的call方法
                    .unsafeSubscribe(Subscribers.empty());


    2.2:數(shù)據(jù)流串行化
    每個放入隊列的HystrixCommandCompletion,都會執(zhí)利doOnNext的Action,通過他的call方法去調(diào)用HystrixCommandCompletionStream的write方法,相同的commandKey具有同一個HystrixCommandCompletionStream實例,具體是通過currentHashMap做的實例隔離,HystrixCommandCompletionStream內(nèi)部是通過一個SerializedSubject實現(xiàn)多個HystrixCommandCompletion并行寫入的串行化,具體代碼邏輯如下:

        //限流隊列收到數(shù)據(jù)后會執(zhí)行call方法,是通過觀察者注冊了doOnnext事件
        private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = new Action1<HystrixCommandCompletion>() {
            @Override
            public void call(HystrixCommandCompletion commandCompletion) {
                //同一個commandkey對應(yīng)同一個串行隊列的實例,因為同一個commandKey必須要收集該key下所有線程的metrix事件做統(tǒng)計,才能準確
                HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey());
                commandStream.write(commandCompletion);//寫入串行隊列,這里是核心
    
                if (commandCompletion.isExecutedInThread() || commandCompletion.isResponseThreadPoolRejected()) {
                    HystrixThreadPoolCompletionStream threadPoolStream = HystrixThreadPoolCompletionStream.getInstance(commandCompletion.getThreadPoolKey());
                    threadPoolStream.write(commandCompletion);
                }
            }
        };
        //具體的write方法如下,需要重點關(guān)注writeOnlySubject的定義
        public void write(HystrixCommandCompletion event) {
            writeOnlySubject.onNext(event);
        }
        //下面是writeOnlySubject的定義,是通過SerializedSubject將并行的寫入變?yōu)榇谢?
        HystrixCommandCompletionStream(final HystrixCommandKey commandKey) {
            this.commandKey = commandKey;
    
            this.writeOnlySubject = new SerializedSubject<HystrixCommandCompletion, HystrixCommandCompletion>(PublishSubject.<HystrixCommandCompletion>create());
            this.readOnlyStream = writeOnlySubject.share();
        }


    2.3:消費訂閱
    在hystrixCommand創(chuàng)建的時候,會對HystrixCommandCompletionStream進行訂閱,目前有:
    healthCountsStream
    rollingCommandEventCounterStream
    cumulativeCommandEventCounterStream
    rollingCommandLatencyDistributionStream
    rollingCommandUserLatencyDistributionStream
    rollingCommandMaxConcurrencyStream
    這幾個消費者通過滾動窗口的形式,對數(shù)據(jù)做統(tǒng)計和指標計算,下面選取具有代表意義的healthCountsStream做講解:

        public static HealthCountsStream getInstance(HystrixCommandKey commandKey, HystrixCommandProperties properties) {
        	//統(tǒng)計計算指標的時間間隔-metricsHealthSnapshotIntervalInMilliseconds
            final int healthCountBucketSizeInMs = properties.metricsHealthSnapshotIntervalInMilliseconds().get();
            if (healthCountBucketSizeInMs == 0) {
                throw new RuntimeException("You have set the bucket size to 0ms.  Please set a positive number, so that the metric stream can be properly consumed");
            }
            //熔斷窗口滑動周期,默認10秒,保留10秒內(nèi)的統(tǒng)計數(shù)據(jù),指定窗口期內(nèi),有效進行指標計算的次數(shù)=metricsRollingStatisticalWindowInMilliseconds/metricsHealthSnapshotIntervalInMilliseconds
            final int numHealthCountBuckets = properties.metricsRollingStatisticalWindowInMilliseconds().get() / healthCountBucketSizeInMs;
    
            return getInstance(commandKey, numHealthCountBuckets, healthCountBucketSizeInMs);
        }
        //繼承關(guān)系HealthCountStream-》BucketedRollingCounterStream-》BucketedCounterStream
        //把各事件聚合成桶...省略代碼,在BucketedCounterStream完成
        this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {
                @Override
                public Observable<Bucket> call() {
                    return inputEventStream
                            .observe()
                            .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
                            .flatMap(reduceBucketToSummary)                //for a given bucket, turn it into a long array containing counts of event types
                            .startWith(emptyEventCountsToStart);           //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
                }
        }
        //聚合成桶的邏輯代碼
        public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = new Func2<long[], HystrixCommandCompletion, long[]>() {
            @Override
            public long[] call(long[] initialCountArray, HystrixCommandCompletion execution) {
                ExecutionResult.EventCounts eventCounts = execution.getEventCounts();
                for (HystrixEventType eventType: ALL_EVENT_TYPES) {
                    switch (eventType) {
                        case EXCEPTION_THROWN: break; //this is just a sum of other anyway - don't do the work here
                        default:
                            initialCountArray[eventType.ordinal()] += eventCounts.getCount(eventType);//對各類型的event做,分類匯總
                            break;
                    }
                }
                return initialCountArray;
            }
        };
        //生成計算指標,在BucketedRollingCounterStream完成,省略部分代碼
        this.sourceStream = bucketedStream      //stream broken up into buckets
                    .window(numBuckets, 1)          //emit overlapping windows of buckets
                    .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
                    .doOnSubscribe(new Action0() {
                        @Override
                        public void call() {
                            isSourceCurrentlySubscribed.set(true);
                        }
        })
        //計算指標聚合實現(xiàn),reduceWindowToSummary
        private static final Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts> healthCheckAccumulator = new Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts>() {
            @Override
            public HystrixCommandMetrics.HealthCounts call(HystrixCommandMetrics.HealthCounts healthCounts, long[] bucketEventCounts) {
                return healthCounts.plus(bucketEventCounts);//重點看該方法
            }
        };
        public HealthCounts plus(long[] eventTypeCounts) {
                long updatedTotalCount = totalCount;
                long updatedErrorCount = errorCount;
    
                long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
                long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
                long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
                long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
                long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];
                //多個線程的事件,被匯總計算以后,所有的事件相加得到總和
                updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
                //失敗的事件總和,注意只有FAIL+timeoutCount+THREAD_POOL_REJECTED+SEMAPHORE_REJECTED
                updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
                return new HealthCounts(updatedTotalCount, updatedErrorCount);
        }


     

  3. 指標使用
    指標使用比較簡單,用于控制熔斷器的關(guān)閉與開啟,邏輯如下:

                            public void onNext(HealthCounts hc) {
                                if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
    
                                } else {
                                    if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                                    } else {
                                        if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                                            circuitOpened.set(System.currentTimeMillis());
                                        }
                                    }
                                }
                            }


上述就是小編為大家分享的spring cloud中Hystrix指標收集原理是什么了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI