溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么深入研究阿里sentinel源碼

發(fā)布時間:2021-10-11 10:18:41 來源:億速云 閱讀:93 作者:柒染 欄目:大數(shù)據(jù)

這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)怎么深入研究阿里sentinel源碼,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

1. 阿里sentinel源碼研究深入

1.1. 前言

  • 根據(jù)我目前的實踐,限流和降級規(guī)則似乎不能一同起效,還不知道原因,下面繼續(xù)探索

1.2. 源碼

1.2.1. 流控降級監(jiān)控等的構(gòu)建

  • 首先客戶端而言,我關(guān)注的是我寫的代碼SphU.entry,這明顯是很關(guān)鍵的方法,下圖的內(nèi)容就是這里構(gòu)建的 -Sentinel工作主流程就包含在上面一個方法里,通過鏈?zhǔn)秸{(diào)用的方式,經(jīng)過了建立樹狀結(jié)構(gòu),保存統(tǒng)計簇點,異常日志記錄,實時數(shù)據(jù)統(tǒng)計,負(fù)載保護,權(quán)限認(rèn)證,流量控制,熔斷降級等Slot

怎么深入研究阿里sentinel源碼

  • 進入鏈?zhǔn)椒椒ǖ娜肟跒镃tSph類,try方法大括號內(nèi)

Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }

1.2.2. 修改控制臺規(guī)則是如何通知客戶端的?

  • 看sentinel-transport-simple-http包中的HttpEventTask類,它開啟了一個線程,轉(zhuǎn)么用來做為socket連接,控制臺通過socket請求通知客戶端,從而更新客戶端規(guī)則,更改規(guī)則核心代碼如下

// Find the matching command handler.
            CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
            if (commandHandler != null) {
                CommandResponse<?> response = commandHandler.handle(request);
                handleResponse(response, printWriter, outputStream);
            } else {
                // No matching command handler.
                badRequest(printWriter, "Unknown command `" + commandName + '`');
            }

通過命令模式,commandName為setRules時,更新規(guī)則

1.2.3. 既然它建立連接用的socket,為什么不用netty呢?

  • 帶著這個疑問,我本想在issues里找下,突然發(fā)現(xiàn)它的源碼中有個sentinel-transport-netty-http這個包和sentinel-transport-simple-http處于同級,官方的例子用的simple-http,但明顯它也準(zhǔn)備了netty-http,于是我替換成了netty-http,運行后效果和原先一樣,至于效率上有沒有提升,我就不清楚了^_^

1.2.4. 流量規(guī)則如何檢查?

  • 該規(guī)則檢查類為FlowRuleChecker,在core核心包中,核心檢查方法如下

    private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {
            return true;
        }

        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }

1.2.5. 熔斷降級如何判斷?

  • 判斷類為DegradeRuleManager,在core核心包,核心內(nèi)容如下,再深入就是它判斷的算法了,感興趣的自己去看如下的passCheck

    public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
        throws BlockException {

        Set<DegradeRule> rules = degradeRules.get(resource.getName());
        if (rules == null) {
            return;
        }

        for (DegradeRule rule : rules) {
            if (!rule.passCheck(context, node, count)) {
                throw new DegradeException(rule.getLimitApp(), rule);
            }
        }
    }

1.2.6. 默認(rèn)的鏈條構(gòu)建在哪?

  • 核心類為DefaultSlotChainBuilder,構(gòu)建了如下的slot

public class DefaultSlotChainBuilder implements SlotChainBuilder {

    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new LogSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new FlowSlot());
        chain.addLast(new DegradeSlot());

        return chain;
    }

}

1.2.7. 既然已經(jīng)知道了它是如何構(gòu)建鏈?zhǔn)降奶幚砉?jié)點的,我們是否何可自己重新構(gòu)建?

  • 發(fā)現(xiàn)類SlotChainProvider中的構(gòu)建方法如下

private static void resolveSlotChainBuilder() {
        List<SlotChainBuilder> list = new ArrayList<SlotChainBuilder>();
        boolean hasOther = false;
        for (SlotChainBuilder builder : LOADER) {
            if (builder.getClass() != DefaultSlotChainBuilder.class) {
                hasOther = true;
                list.add(builder);
            }
        }
        if (hasOther) {
            builder = list.get(0);
        } else {
            // No custom builder, using default.
            builder = new DefaultSlotChainBuilder();
        }

        RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
            + builder.getClass().getCanonicalName());
    }
  • 也就是說,我們?nèi)绻?code>LOADER中加入了其他的非默認(rèn)實現(xiàn)就可以替代原來的DefaultSlotChainBuilder,那LOADER怎么來的?看代碼,如下的全局變量,也就是需要自定義實現(xiàn)SlotChainBuilder接口的實現(xiàn)類

private static final ServiceLoader<SlotChainBuilder> LOADER = ServiceLoader.load(SlotChainBuilder.class);

1.2.8. 如何實現(xiàn)SlotChainBuilder接口呢?

  • 這里要注意的是它使用了ServiceLoader,也就是SPI,全稱Service Provider Interface,加載它需要特定的配合,比如我自定義實現(xiàn)一個Slot

/**
 * @author laoliangliang
 * @date 2019/7/25 14:13
 */
public class MySlotChainBuilder implements SlotChainBuilder {
    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new LogSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new FlowSlot());
        chain.addLast(new DegradeSlot());
        //自定義的
        chain.addLast(new CarerSlot());

        return chain;
    }
}
/**
 * @author laoliangliang
 * @date 2019/7/25 14:15
 */
@Slf4j
public class CarerSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        log.info(JSON.toJSONString(resourceWrapper));
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }
}
  • 這里我自定義了CarerSlot,那是否能被加載到呢?事實上還不夠,需要在META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder建這樣一個文件,內(nèi)容如下

怎么深入研究阿里sentinel源碼

  • 好了,這樣配置過后,它就能讀到我們自定義的實現(xiàn)類代替它原先的類了

1.2.9. 該命令模式最初的初始化階段在哪?

  • 用過sentinel的都會感受到,只有當(dāng)有第一個sentinel監(jiān)控的請求過來時,sentinel客戶端才會正式初始化,這樣看來,這個初始化步驟應(yīng)該在哪呢?

  • 我通過不斷反向跟蹤上述的命令模式最初的初始化,找到了最初初始化的地方如下

public class Env {

    public static final Sph sph = new CtSph();

    static {
        // If init fails, the process will exit.
        InitExecutor.doInit();
    }

}
  • 有沒有覺得很熟悉?doInit就是很多初始化的起點,當(dāng)Env被調(diào)用時會運行static代碼塊,那么只有可能是sph被調(diào)用時

  • 只要你debug過我上述第一條SphU.entry的源碼,就會發(fā)現(xiàn),如下,該方法一進入不就是先獲取Env的sph,再調(diào)用的entry嗎,所以初始化的地方也就找到了,第一次調(diào)用SphU.entry的地方,或者你不用這個,使用的注解,里面同樣有這個方法

    public static Entry entry(String name) throws BlockException {
        return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
    }

1.2.10. 注解是如何實現(xiàn)熔斷降級的?

  • 這個其實是比較容易理解的,既然通過SphU.entry包裹可以實現(xiàn)熔斷降級,通過注解的形式包裹代碼方法應(yīng)該是比較容易的,那么在哪里實現(xiàn)和配置的呢

  • 看過我前一篇文章的應(yīng)該看到了,有存在如下配置

    @Bean
    public SentinelResourceAspect sentinelResourceAspect() {
        pushlish();
        return new SentinelResourceAspect();
    }
  • 很明顯的注解切面,通過spring注解的形式注入,我覺得這還是比較優(yōu)雅的注入方式了,點進入就可以看到如下

    @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
    public void sentinelResourceAnnotationPointcut() {
    }

@SentinelResource注解進行了處理

1.2.11. 什么是直接失敗?

  • 這個很好理解,qps超過設(shè)置的值,直接失敗

怎么深入研究阿里sentinel源碼

1.2.12. 什么是排隊等待?

  • 這個似乎看字面意思很好理解,但是一旦你點了這個選項,下面還有個參數(shù)的 怎么深入研究阿里sentinel源碼

  • 所以這個排隊等待是有超時時間的,達到峰值后勻速通過,采用的漏桶算法,流控圖

怎么深入研究阿里sentinel源碼

1.2.13. 什么是慢啟動模式?

  • 以下是核心算法,Warm Up模式不看算法細(xì)節(jié),看它的中文說明應(yīng)該就能理解是怎么回事了吧;所謂慢啟動模式,要求系統(tǒng)的QPS請求增速不能超過一定的速率,否則會被壓制超過部分請求失敗,應(yīng)該是為了避免一啟動就有大流量的請求進入導(dǎo)致系統(tǒng)一下子就宕機卡主或直接進入了熔斷

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        long passQps = (long) node.passQps();

        long previousQps = (long) node.previousPassQps();
        syncToken(previousQps);

        // 開始計算它的斜率
        // 如果進入了警戒線,開始調(diào)整他的qps
        long restToken = storedTokens.get();
        if (restToken >= warningToken) {
            long aboveToken = restToken - warningToken;
            // 消耗的速度要比warning快,但是要比慢
            // current interval = restToken*slope+1/count
            double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            if (passQps + acquireCount <= warningQps) {
                return true;
            }
        } else {
            if (passQps + acquireCount <= count) {
                return true;
            }
        }

        return false;
    }
  • 配置如下時,測試流控 怎么深入研究阿里sentinel源碼

  • 流控圖

怎么深入研究阿里sentinel源碼

1.2.14. 模式總結(jié)

  • 你會發(fā)現(xiàn)直接失敗和排隊等待的區(qū)別在流控圖上并不明顯,那差別在哪呢?我重慶給個請求參數(shù),5秒內(nèi)模擬100個人輪流請求10次 怎么深入研究阿里sentinel源碼

  • sentinel控制臺設(shè)置 怎么深入研究阿里sentinel源碼

  • 流控圖

怎么深入研究阿里sentinel源碼

怎么深入研究阿里sentinel源碼

  • 總結(jié):我設(shè)置了超時時間是5秒,而100個線程10次輪詢也就是1000個請求,可以看出,它并不是一定要在5秒內(nèi)解決這些請求,有了延時后,代表只要響應(yīng)時間在5秒以內(nèi),不管多少請求都不會拒絕;

  • 幾個模式有利有弊,默認(rèn)的快速失敗使我們可以最大程度的控制系統(tǒng)的QPS,避免造成系統(tǒng)壓力過大,但同時可能造成用于的體驗效果變差

  • 慢啟動上面說過了

  • 排隊等待在設(shè)置合理的超時時間后可以最大程度的避免求情的失敗,但同時可能造成線程壓力過大

  • 綜上,在我看來排隊等待模式是比較適合線上運行的,只是需要設(shè)置合理的超時時間,大公司機器不愁那就設(shè)小點,業(yè)界一般標(biāo)準(zhǔn)是200ms用戶無感知,中小型可以設(shè)500ms甚至更大,看機器情況動態(tài)調(diào)整了

1.2.15. 提醒

  • 像我是用apollo來持久化規(guī)則的,你也可以用nacos,redis,zookeeper等,當(dāng)控制臺未啟動時,你啟動客戶端規(guī)則也會生效,只是沒了控制臺實時監(jiān)控數(shù)據(jù)。

上述就是小編為大家分享的怎么深入研究阿里sentinel源碼了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI