您好,登錄后才能下訂單哦!
來源:Fate/stay night [Heaven's Feel] lost butterfly
在前一篇文章《聊聊 TCP 長連接和心跳那些事》中,我們已經(jīng)聊過了 TCP 中的 KeepAlive,以及在應(yīng)用層設(shè)計心跳的意義,但卻對長連接心跳的設(shè)計方案沒有做詳細(xì)地介紹。事實上,設(shè)計一個好的心跳機(jī)制并不是一件容易的事,就我所熟知的幾個 RPC 框架,它們的心跳機(jī)制可以說大相徑庭,這篇文章我將探討一下如何設(shè)計一個優(yōu)雅的心跳機(jī)制,主要從 Dubbo 的現(xiàn)有方案以及一個改進(jìn)方案來做分析。
因為后續(xù)我們將從源碼層面來進(jìn)行介紹,所以一些服務(wù)治理框架的細(xì)節(jié)還需要提前交代一下,方便大家理解。
高性能的 RPC 框架幾乎都會選擇使用 Netty 來作為通信層的組件,非阻塞式通信的高效不需要我做過多的介紹。但也由于非阻塞的特性,導(dǎo)致其發(fā)送數(shù)據(jù)和接收數(shù)據(jù)是一個異步的過程,所以當(dāng)存在服務(wù)端異常、網(wǎng)絡(luò)問題時,客戶端接是接收不到響應(yīng)的,那我們?nèi)绾闻袛嘁淮?RPC 調(diào)用是失敗的呢?
誤區(qū)一:Dubbo 調(diào)用不是默認(rèn)同步的嗎?
Dubbo 在通信層是異步的,呈現(xiàn)給使用者同步的錯覺是因為內(nèi)部做了阻塞等待,實現(xiàn)了異步轉(zhuǎn)同步。
誤區(qū)二: Channel.writeAndFlush
會返回一個 channelFuture
,我只需要判斷 channelFuture.isSuccess
就可以判斷請求是否成功了。
注意,writeAndFlush 成功并不代表對端接受到了請求,返回值為 true 只能保證寫入網(wǎng)絡(luò)緩沖區(qū)成功,并不代表發(fā)送成功。
避開上述兩個誤區(qū),我們再來回到本小節(jié)的標(biāo)題:客戶端如何得知請求失?。?strong>正確的邏輯應(yīng)當(dāng)是以客戶端接收到失敗響應(yīng)為判斷依據(jù)。等等,前面不還在說在失敗的場景中,服務(wù)端是不會返回響應(yīng)的嗎?沒錯,既然服務(wù)端不會返回,那就只能客戶端自己造了。
一個常見的設(shè)計是:客戶端發(fā)起一個 RPC 請求,會設(shè)置一個超時時間 client_timeout
,發(fā)起調(diào)用的同時,客戶端會開啟一個延遲 client_timeout
的定時器
接收到正常響應(yīng)時,移除該定時器。
定時器倒計時完畢,還沒有被移除,則認(rèn)為請求超時,構(gòu)造一個失敗的響應(yīng)傳遞給客戶端。
Dubbo 中的超時判定邏輯:
public static DefaultFuture newFuture(Channel channel, Request request, int timeout) { final DefaultFuture future = new DefaultFuture(channel, request, timeout); // timeout check timeoutCheck(future); return future; } private static void timeoutCheck(DefaultFuture future) { TimeoutCheckTask task = new TimeoutCheckTask(future); TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS); } private static class TimeoutCheckTask implements TimerTask { private DefaultFuture future; TimeoutCheckTask(DefaultFuture future) { this.future = future; } @Override public void run(Timeout timeout) { if (future == null || future.isDone()) { return; } // create exception response. Response timeoutResponse = new Response(future.getId()); // set timeout status. timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); // handle response. DefaultFuture.received(future.getChannel(), timeoutResponse); } }
主要邏輯涉及的類: DubboInvoker
, HeaderExchangeChannel
, DefaultFuture
,通過上述代碼,我們可以得知一個細(xì)節(jié),無論是何種調(diào)用,都會經(jīng)過這個定時器的檢測,超時即調(diào)用失敗,一次 RPC 調(diào)用的失敗,必須以客戶端收到失敗響應(yīng)為準(zhǔn)。
網(wǎng)絡(luò)通信永遠(yuǎn)要考慮到最壞的情況,一次心跳失敗,不能認(rèn)定為連接不通,多次心跳失敗,才能采取相應(yīng)的措施。
忙檢測的對立面是空閑檢測,我們做心跳的初衷,是為了保證連接的可用性,以保證及時采取斷連,重連等措施。如果一條通道上有頻繁的 RPC 調(diào)用正在進(jìn)行,我們不應(yīng)該為通道增加負(fù)擔(dān)去發(fā)送心跳包。心跳扮演的角色應(yīng)當(dāng)是晴天收傘,雨天送傘。
本文的源碼對應(yīng) Dubbo 2.7.x 版本,在 apache 孵化的該版本中,心跳機(jī)制得到了增強(qiáng)。
介紹完了一些基礎(chǔ)的概念,我們便來看看 Dubbo 是如何設(shè)計應(yīng)用層心跳的。Dubbo 的心跳是雙向心跳,客戶端會給服務(wù)端發(fā)送心跳,反之,服務(wù)端也會向客戶端發(fā)送心跳。
public class HeaderExchangeClient implements ExchangeClient { private int heartbeat; private int heartbeatTimeout; private HashedWheelTimer heartbeatTimer; public HeaderExchangeClient(Client client, boolean needHeartbeat) { this.client = client; this.channel = new HeaderExchangeChannel(client); this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); if (needHeartbeat) { <1> long tickDuration = calculateLeastDuration(heartbeat); heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-heartbeat", true), tickDuration, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL); <2> startHeartbeatTimer(); } } }
不僅 HeaderExchangeClient
客戶端開起了定時器, HeaderExchangeServer
服務(wù)端同樣開起了定時器,由于服務(wù)端的邏輯和客戶端幾乎一致,所以后續(xù)我并不會重復(fù)粘貼服務(wù)端的代碼。
Dubbo 在早期版本版本中使用的是 shedule 方案,在 2.7.x 中替換成了 HashWheelTimer。
private void startHeartbeatTimer() {
long heartbeatTick = calculateLeastDuration(heartbeat);
long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat); <1>
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout); <2>
heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
}
Dubbo 在 startHeartbeatTimer
方法中主要開啟了兩個定時器: HeartbeatTimerTask
, ReconnectTimerTask
至于方法中的其他代碼,其實也是本文的重要分析內(nèi)容,先容我賣個關(guān)子,后面再來看追溯。
詳細(xì)解析下心跳檢測定時任務(wù)的邏輯 HeartbeatTimerTask#doTask
:
protected void doTask(Channel channel) { Long lastRead = lastRead(channel); Long lastWrite = lastWrite(channel); if ((lastRead != null && now() - lastRead > heartbeat) || (lastWrite != null && now() - lastWrite > heartbeat)) { Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setEvent(Request.HEARTBEAT_EVENT); channel.send(req); } } }
前面已經(jīng)介紹過,Dubbo 采取的是設(shè)計是雙向心跳,即服務(wù)端會向客戶端發(fā)送心跳,客戶端也會向服務(wù)端發(fā)送心跳,接收的一方更新 lastRead 字段,發(fā)送的一方更新 lastWrite 字段,超過心跳間隙的時間,便發(fā)送心跳請求給對端。這里的 lastRead/lastWrite 同樣會被同一個通道上的普通調(diào)用更新,通過更新這兩個字段,實現(xiàn)了只在連接空閑時才會真正發(fā)送空閑報文的機(jī)制,符合我們一開始科普的做法。
注意:不僅僅心跳請求會更新 lastRead 和 lastWrite,普通請求也會。這對應(yīng)了我們預(yù)備知識中的空閑檢測機(jī)制。
繼續(xù)研究下重連和斷連定時器都實現(xiàn)了什么 ReconnectTimerTask#doTask
。
protected void doTask(Channel channel) { Long lastRead = lastRead(channel); Long now = now(); if (lastRead != null && now - lastRead > heartbeatTimeout) { if (channel instanceof Client) { ((Client) channel).reconnect(); } else { channel.close(); } } }
第二個定時器則負(fù)責(zé)根據(jù)客戶端、服務(wù)端類型來對連接做不同的處理,當(dāng)超過設(shè)置的心跳總時間之后,客戶端選擇的是重新連接,服務(wù)端則是選擇直接斷開連接。這樣的考慮是合理的,客戶端調(diào)用是強(qiáng)依賴可用連接的,而服務(wù)端可以等待客戶端重新建立連接。
細(xì)心的朋友會發(fā)現(xiàn),這個類被命名為 ReconnectTimerTask 是不太準(zhǔn)確的,因為它處理的是重連和斷連兩個邏輯。
在 Dubbo 的 issue 中曾經(jīng)有人反饋過定時不精確的問題,我們來看看是怎么一回事。
Dubbo 中默認(rèn)的心跳周期是 60s,設(shè)想如下的時序:
第 0 秒,心跳檢測發(fā)現(xiàn)連接活躍
第 1 秒,連接實際斷開
第 60 秒,心跳檢測發(fā)現(xiàn)連接不活躍
由于時間窗口的問題,死鏈不能夠被及時檢測出來,最壞情況為一個心跳周期。
為了解決上述問題,我們再倒回去看一下上面的 startHeartbeatTimer()
方法
long heartbeatTick = calculateLeastDuration(heartbeat); long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
其中 calculateLeastDuration
根據(jù)心跳時間和超時時間分別計算出了一個 tick 時間,實際上就是將兩個變量除以了 3,使得他們的值縮小,并傳入了 HashWeelTimer
的第二個參數(shù)之中
heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS); heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
tick 的含義便是定時任務(wù)執(zhí)行的頻率。這樣,通過減少檢測間隔時間,增大了及時發(fā)現(xiàn)死鏈的概率,原先的最壞情況是 60s,如今變成了 20s。這個頻率依舊可以加快,但需要考慮資源消耗的問題。
定時不準(zhǔn)確的問題出現(xiàn)在 Dubbo 的兩個定時任務(wù)之中,所以都做了 tick 操作。事實上,所有的定時檢測的邏輯都存在類似的問題。
Dubbo 對于建立的每一個連接,同時在客戶端和服務(wù)端開啟了 2 個定時器,一個用于定時發(fā)送心跳,一個用于定時重連、斷連,執(zhí)行的頻率均為各自檢測周期的 1/3。定時發(fā)送心跳的任務(wù)負(fù)責(zé)在連接空閑時,向?qū)Χ税l(fā)送心跳包。定時重連、斷連的任務(wù)負(fù)責(zé)檢測 lastRead 是否在超時周期內(nèi)仍未被更新,如果判定為超時,客戶端處理的邏輯是重連,服務(wù)端則采取斷連的措施。
先不急著判斷這個方案好不好,再來看看改進(jìn)方案是怎么設(shè)計的。
實際上我們可以更優(yōu)雅地實現(xiàn)心跳機(jī)制,本小節(jié)開始,我將介紹一個新的心跳機(jī)制。
Netty 對空閑連接的檢測提供了天然的支持,使用 IdleStateHandler
可以很方便的實現(xiàn)空閑檢測邏輯。
public IdleStateHandler( long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {}
readerIdleTime:讀超時時間
writerIdleTime:寫超時時間
allIdleTime:所有類型的超時時間
IdleStateHandler
這個類會根據(jù)設(shè)置的超時參數(shù),循環(huán)檢測 channelRead 和 write 方法多久沒有被調(diào)用。當(dāng)在 pipeline 中加入 IdleSateHandler
之后,可以在此 pipeline 的任意 Handler 的 userEventTriggered
方法之中檢測 IdleStateEvent
事件,
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { //do something } ctx.fireUserEventTriggered(evt); }
為什么需要介紹 IdleStateHandler
呢?其實提到它的空閑檢測 + 定時的時候,大家應(yīng)該能夠想到了,這不天然是給心跳機(jī)制服務(wù)的嗎?很多服務(wù)治理框架都選擇了借助 IdleStateHandler
來實現(xiàn)心跳。
IdleStateHandler 內(nèi)部使用了 eventLoop.schedule(task) 的方式來實現(xiàn)定時任務(wù),使用 eventLoop 線程的好處是還同時保證了線程安全,這里是一個小細(xì)節(jié)。
首先是將 IdleStateHandler
加入 pipeline 中。
客戶端:
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast("clientIdleHandler", new IdleStateHandler(60, 0, 0)); } });
服務(wù)端:
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast("serverIdleHandler",new IdleStateHandler(0, 0, 200)); } }
客戶端配置了 read 超時為 60s,服務(wù)端配置了 write/read 超時為 200s,先在此埋下兩個伏筆:
為什么客戶端和服務(wù)端配置的超時時間不一致?
為什么客戶端檢測的是讀超時,而服務(wù)端檢測的是讀寫超時?
對于空閑超時的處理邏輯,客戶端和服務(wù)端是不同的。首先來看客戶端
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { // send heartbeat sendHeartBeat(); } else { super.userEventTriggered(ctx, evt); } }
檢測到空閑超時之后,采取的行為是向服務(wù)端發(fā)送心跳包,具體是如何發(fā)送,以及處理響應(yīng)的呢?偽代碼如下
public void sendHeartBeat() { Invocation invocation = new Invocation(); invocation.setInvocationType(InvocationType.HEART_BEAT); channel.writeAndFlush(invocation).addListener(new CallbackFuture() { @Override public void callback(Future future) { RPCResult result = future.get(); //超時 或者 寫失敗 if (result.isError()) { channel.addFailedHeartBeatTimes(); if (channel.getFailedHeartBeatTimes() >= channel.getMaxHeartBeatFailedTimes()) { channel.reconnect(); } } else { channel.clearHeartBeatFailedTimes(); } } }); }
行為并不復(fù)雜,構(gòu)造一個心跳包發(fā)送到服務(wù)端,接受響應(yīng)結(jié)果
響應(yīng)成功,清空請求失敗標(biāo)記
響應(yīng)失敗,心跳失敗標(biāo)記+1,如果超過配置的失敗次數(shù),則重新連接
不僅僅是心跳,普通請求返回成功響應(yīng)時也會清空標(biāo)記
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { channel.close(); } else { super.userEventTriggered(ctx, evt); } }
服務(wù)端處理空閑連接的方式非常簡單粗暴,直接關(guān)閉連接。
為什么客戶端和服務(wù)端配置的超時時間不一致?
因為客戶端有重試邏輯,不斷發(fā)送心跳失敗 n 次之后,才認(rèn)為是連接斷開;而服務(wù)端是直接斷開,留給服務(wù)端時間得長一點。60 * 3 < 200 還說明了一個問題,雙方都擁有斷開連接的能力,但連接的創(chuàng)建是由客戶端主動發(fā)起的,那么客戶端也更有權(quán)利去主動斷開連接。
為什么客戶端檢測的是讀超時,而服務(wù)端檢測的是讀寫超時?
這其實是一個心跳的共識了,仔細(xì)思考一下,定時邏輯是由客戶端發(fā)起的,所以整個鏈路中不通的情況只有可能是:服務(wù)端接收,服務(wù)端發(fā)送,客戶端接收。也就是說,只有客戶端的 pong,服務(wù)端的 ping,pong 的檢測是有意義的。
主動追求別人的是你,主動說分手的也是你。
利用 IdleStateHandler
實現(xiàn)心跳機(jī)制可以說是十分優(yōu)雅的,借助 Netty 提供的空閑檢測機(jī)制,利用客戶端維護(hù)單向心跳,在收到 3 次心跳失敗響應(yīng)之后,客戶端斷開連接,交由異步線程重連,本質(zhì)還是表現(xiàn)為客戶端重連。服務(wù)端在連接空閑較長時間后,主動斷開連接,以避免無謂的資源浪費。
私下請教過美團(tuán)點評的長連接負(fù)責(zé)人:俞超(閃電俠),美點使用的心跳方案和 Dubbo 改進(jìn)方案幾乎一致,可以該方案是標(biāo)準(zhǔn)實現(xiàn)了。
鑒于 Dubbo 存在一些其他通信層的實現(xiàn),所以可以保留現(xiàn)有的定時發(fā)送心跳的邏輯。
建議改動點一:
雙向心跳的設(shè)計是不必要的,兼容現(xiàn)有的邏輯,可以讓客戶端在連接空閑時發(fā)送單向心跳,服務(wù)端定時檢測連接可用性。定時時間盡量保證:客戶端超時時間 * 3 ≈ 服務(wù)端超時時間
建議改動點二:
去除處理重連和斷連的定時任務(wù),Dubbo 可以判斷心跳請求是否響應(yīng)失敗,可以借鑒改進(jìn)方案的設(shè)計,在連接級別維護(hù)一個心跳失敗次數(shù)的標(biāo)記,任意響應(yīng)成功,清除標(biāo)記;連續(xù)心跳失敗 n 次,客戶端發(fā)起重連。這樣可以減少一個不必要的定時器,任何輪詢的方式,都是不優(yōu)雅的。
最后再聊聊可擴(kuò)展性這個話題。其實我是建議把定時器交給更加底層的 Netty 去做,也就是完全使用 IdleStateHandler
,其他通信層組件各自實現(xiàn)自己的空閑檢測邏輯,但是 Dubbo 中 mina,grizzy 的兼容問題囿住了我的拳腳,但試問一下,如今的 2019 年,又有多少人在使用 mina 和 grizzy?因為一些不太可能用的特性,而限制了主流用法的優(yōu)化,這肯定不是什么好事。抽象,功能,可擴(kuò)展性并不是越多越好,開源產(chǎn)品的人力資源是有限的,框架使用者的理解能力也是有限的,能解決大多數(shù)人問題的設(shè)計,才是好的設(shè)計。哎,mina、grizzy,學(xué)不動了。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。