您好,登錄后才能下訂單哦!
這篇文章主要介紹“Sentinel如何攔截異常流量”,在日常操作中,相信很多人在Sentinel如何攔截異常流量問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Sentinel如何攔截異常流量”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
各位在家里用電的過程中,一定也經(jīng)歷過「跳閘」。這個「閘」就是在電量超過負荷的時候用來保護我們用電安全的,也被稱為「斷路器」,還有個響亮的英文名 -- CircuitBreaker。
和用電安全一樣,對于「限流」、「降級」、「熔斷」...,你我應(yīng)該也都耳熟能詳。我們開發(fā)的各類軟件、系統(tǒng)、互聯(lián)網(wǎng)應(yīng)用等為了不被異常流量壓垮,也需要一個斷路器。
在 Spring 應(yīng)用中,使用斷路器很方便,我們可以使用 Spring Cloud CircuitBreaker。
Spring Cloud Circuit Breaker 是啥?如果你熟悉 Spring 是什么人的話,你能猜個八九不離十。和Spring Data JPA 這些類似,Spring 他又搞了個抽象的,標(biāo)準(zhǔn)的API 出來。這次他抽象的是關(guān)于降級熔斷的「斷路器」。有了這一層,具體實現(xiàn)是誰可以方便的更換,我們使用的代碼里改動基本為0。
我們先來從官方Demo有個初步印象:
@RestControllerpublic class DemoController { private CircuitBreakerFactory circuitBreakerFactory; private HttpBinService httpBin; public DemoController(CircuitBreakerFactory circuitBreakerFactory, HttpBinService httpBinService) { this.circuitBreakerFactory = circuitBreakerFactory; this.httpBin = httpBinService; } @GetMapping("/delay/{seconds}") public Map delay(@PathVariable int seconds) { return circuitBreakerFactory.create("delay").run(httpBin.delaySuppplier(seconds), t -> { Map<String, String> fallback = new HashMap<>(); fallback.put("hello", "world"); return fallback; }); }}
千言萬語,總結(jié)出來這樣一句circuitBreakerFactory.create("delay").run()
因為是抽象,對應(yīng)的實現(xiàn)就有好多種啦。
目前支持的實現(xiàn)有:
Hystrix
Resilience4j
Sentinel
Spring Retry
而抽象相當(dāng)于定了個標(biāo)準(zhǔn),像JDBC一樣,無論我們把數(shù)據(jù)庫換成了MySQL,Oracle 還是SQLite,接口等非特定類型的代碼都不需要改變。斷路器也一樣。
這里的斷路器工廠,創(chuàng)建方法都是標(biāo)準(zhǔn)的。具體這里執(zhí)行業(yè)務(wù)邏輯的時候斷路器實現(xiàn)要怎樣進行攔截降級,就可以交給具體的實現(xiàn)來完成。
這次,我們以開源的 Sentinel 為例,來看看他們是怎樣攔住異常流量的。
首先,因為是Spring Cloud,所以還會基于 Spring Boot 的 Autoconfiguration。以下是配置類,我們看到生成了一個工廠。
public class SentinelCircuitBreakerAutoConfiguration { @Bean @ConditionalOnMissingBean(CircuitBreakerFactory.class) public CircuitBreakerFactory sentinelCircuitBreakerFactory() { return new SentinelCircuitBreakerFactory(); } }
在我們實際代碼執(zhí)行邏輯的時候,create
出來的是什么呢?
是個斷路器 CircuitBreaker
,用來執(zhí)行代碼。
public interface CircuitBreaker {
default <T> T run(Supplier<T> toRun) {
return run(toRun, throwable -> {
throw new NoFallbackAvailableException("No fallback available.", throwable);
});
};
<T> T run(Supplier<T> toRun, Function<Throwable, T> fallback);
}
包含兩個執(zhí)行的方法,需要在的時候可以指定fallback
邏輯。具體到 Sentinel 是這樣的:
public CircuitBreaker create(String id) { SentinelConfigBuilder.SentinelCircuitBreakerConfiguration conf = getConfigurations() .computeIfAbsent(id, defaultConfiguration); return new SentinelCircuitBreaker(id, conf.getEntryType(), conf.getRules()); }
你會看到創(chuàng)建了一個SentinelCircuitBreaker
。我們的業(yè)務(wù)邏輯,就會在這個斷路器里執(zhí)行,run
方法就是各個具體實現(xiàn)的舞臺。
@Override public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback) { Entry entry = null; try { entry = SphU.entry(resourceName, entryType); // If the SphU.entry() does not throw `BlockException`, it means that the // request can pass. return toRun.get(); } catch (BlockException ex) { // SphU.entry() may throw BlockException which indicates that // the request was rejected (flow control or circuit breaking triggered). // So it should not be counted as the business exception. return fallback.apply(ex); } catch (Exception ex) { // For other kinds of exceptions, we'll trace the exception count via // Tracer.trace(ex). Tracer.trace(ex); return fallback.apply(ex); } finally { // Guarantee the invocation has been completed. if (entry != null) { entry.exit(); } } }
OK,到此為止, Spring Cloud CircuitBreaker 已經(jīng)展現(xiàn)完了。其它的細節(jié)都放到了具體實現(xiàn)的「盒子」里。下面我們把這個盒子打開。
Sentinel 是個熔斷降級框架,官方這樣自我介紹:
面向分布式服務(wù)架構(gòu)的高可用流量控制組件,主要以流量為切入點,從流量控制、熔斷降級、系統(tǒng)自適應(yīng)保護等多個維度來幫助用戶保障微服務(wù)的穩(wěn)定性。
官網(wǎng)的這張代碼截圖簡潔的說明了他是怎樣工作的
擋在業(yè)務(wù)代碼的前面,有事兒先沖它來,能通過之后才走業(yè)務(wù)邏輯,和各類闖關(guān)還真類似。
在上面 CircuitBreaker 的 run 方法里,咱們一定都注意到了這句
entry = SphU.entry(resourceName, entryType);
這就是一切攔截的秘密。
無論我們是通過前面的CircuitBreaker的方式,還是 @SentinelResource 這種注解形式,還是通過 Interceptor 的方式,沒什么本質(zhì)區(qū)別。只是觸發(fā)點不一樣。最后都是通過SphU來搞定。
既然是攔截,那一定要攔下來做這樣或那樣的檢查。
實際檢查的時候,entry 里核心代碼有這些:
Entry entryWithPriority(ResourceWrapper resourceWrapper, ...) throws BlockException { ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); Entry e = new CtEntry(resourceWrapper, chain, context); try { chain.entry(context, resourceWrapper,...); } catch (BlockException e1) { e.exit(count, args); throw e1; } return e; }
注意這里的ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
會在請求過來處理的時候,如果未初始化處理鏈,則進行初始化,將各種first,next設(shè)置好,后面的請求都會按這個來處理。所有需要攔截的Slot,都會加到這個 chain 里面,再逐個執(zhí)行 chain 里的 slot。和Servlet Filter 類似。
chain里都加了些啥呢?
public class HotParamSlotChainBuilder implements SlotChainBuilder { public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new ParamFlowSlot()); chain.addLast(new SystemSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); return chain; }
初始的時候,first 指向一個匿名內(nèi)部類,這些加進來的slot,會在每次addLast的時候,做為鏈的next,
AbstractLinkedProcessorSlot<?> end = first;
@Override
public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
protocolProcessor.setNext(first.getNext());
first.setNext(protocolProcessor);
if (end == first) {
end = protocolProcessor;
}
}
@Override
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
end.setNext(protocolProcessor);
end = protocolProcessor;
}
而每個 slot,有自己的特定用處,處理完自己的邏輯之后,會通過 fireEntry 來觸發(fā)下一個 slot的執(zhí)行。
給你一張長長的線程調(diào)用棧就會過分的明顯了:
java.lang.Thread.State: RUNNABLE at com.alibaba.csp.sentinel.slots.block.flow.FlowSlot.checkFlow(FlowSlot.java:168) at com.alibaba.csp.sentinel.slots.block.flow.FlowSlot.entry(FlowSlot.java:161) at com.alibaba.csp.sentinel.slots.block.flow.FlowSlot.entry(FlowSlot.java:139) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot.entry(AuthoritySlot.java:39) at com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot.entry(AuthoritySlot.java:33) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.system.SystemSlot.entry(SystemSlot.java:36) at com.alibaba.csp.sentinel.slots.system.SystemSlot.entry(SystemSlot.java:30) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot.entry(ParamFlowSlot.java:39) at com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot.entry(ParamFlowSlot.java:33) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.statistic.StatisticSlot.entry(StatisticSlot.java:57) at com.alibaba.csp.sentinel.slots.statistic.StatisticSlot.entry(StatisticSlot.java:50) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.logger.LogSlot.entry(LogSlot.java:35) at com.alibaba.csp.sentinel.slots.logger.LogSlot.entry(LogSlot.java:29) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot.entry(ClusterBuilderSlot.java:101) at com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot.entry(ClusterBuilderSlot.java:47) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot.entry(NodeSelectorSlot.java:171) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain$1.entry(DefaultProcessorSlotChain.java:31) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain.entry(DefaultProcessorSlotChain.java:75) at com.alibaba.csp.sentinel.CtSph.entryWithPriority(CtSph.java:148) at com.alibaba.csp.sentinel.CtSph.entryWithType(CtSph.java:347) at com.alibaba.csp.sentinel.CtSph.entryWithType(CtSph.java:340) at com.alibaba.csp.sentinel.SphU.entry(SphU.java:285)
降級有三種類型
每種類型,都會根據(jù)對應(yīng)的配置項數(shù)據(jù)比對,不符合就中斷,中斷之后也不能一直斷著,啥時候再恢復(fù)呢?就根據(jù)配置的時間窗口,會啟動一個恢復(fù)線程,到時間就會調(diào)度,把中斷標(biāo)識恢復(fù)。
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) { if (cut.get()) { return false; } ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource()); if (clusterNode == null) { return true; } if (grade == RuleConstant.DEGRADE_GRADE_RT) { double rt = clusterNode.avgRt(); if (rt < this.count) { passCount.set(0); return true; } // Sentinel will degrade the service only if count exceeds. if (passCount.incrementAndGet() < rtSlowRequestAmount) { return true; } } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) { double exception = clusterNode.exceptionQps(); double success = clusterNode.successQps(); double total = clusterNode.totalQps(); // If total amount is less than minRequestAmount, the request will pass. if (total < minRequestAmount) { return true; } // In the same aligned statistic time window, // "success" (aka. completed count) = exception count + non-exception count (realSuccess) double realSuccess = success - exception; if (realSuccess <= 0 && exception < minRequestAmount) { return true; } if (exception / success < count) { return true; } } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) { double exception = clusterNode.totalException(); if (exception < count) { return true; } } if (cut.compareAndSet(false, true)) { ResetTask resetTask = new ResetTask(this); pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS); } return false; }
恢復(fù)做了兩件事:一、把passCount設(shè)置成0,二、中斷標(biāo)識還原
上面介紹了對請求的攔截處理,這其中最核心的,也就是我們最主要配置的,一個是「流控」,一個是「降級」。這兩個對應(yīng)的Slot,會在處理請求的時候,根據(jù)配置好的 「規(guī)則」rule 來判斷。比如我們上面看到的時間窗口、熔斷時間等,以及流控的線程數(shù),QPS數(shù)這些。
這些規(guī)則默認的配置在內(nèi)存里,也可以通過不同的數(shù)據(jù)源加載進來。同時啟用了Sentinel 控制臺的話,在控制臺 也可以配置規(guī)則。這些規(guī)則,會通過 HTTP 發(fā)送給對應(yīng)使用了 sentinel 的應(yīng)用實例節(jié)點。
收到更新內(nèi)容后,實例里的「規(guī)則管理器」會重新加載一次 rule,下次請求處理就直接生效了。
到此,關(guān)于“Sentinel如何攔截異常流量”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。