您好,登錄后才能下訂單哦!
這篇文章給大家分享的是有關(guān)RocketMQ設(shè)計之故障規(guī)避機制的示例分析的內(nèi)容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。
NameServer
為了簡化和客戶端通信,發(fā)現(xiàn)Broker故障時并不會立即通知客戶端。故障規(guī)避機制就是用來解決當Broker出現(xiàn)故障,Producer
不能及時感知而導致消息發(fā)送失敗的問題。默認不開啟,如果開啟,消息發(fā)送失敗的時候會將失敗的Broker暫時排除在隊列選擇列表外
MQFaultStrategy類的:
public class MQFaultStrategy { private final static InternalLogger log = ClientLogger.getLog(); private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); private boolean sendLatencyFaultEnable = false; private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; public long[] getNotAvailableDuration() { return notAvailableDuration; } public void setNotAvailableDuration(final long[] notAvailableDuration) { this.notAvailableDuration = notAvailableDuration; } public long[] getLatencyMax() { return latencyMax; } public void setLatencyMax(final long[] latencyMax) { this.latencyMax = latencyMax; } public boolean isSendLatencyFaultEnable() { return sendLatencyFaultEnable; } public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { this.sendLatencyFaultEnable = sendLatencyFaultEnable; } public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //是否開啟故障延遲機制 if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); //判斷Queue是否可用 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } //默認輪詢 return tpInfo.selectOneMessageQueue(lastBrokerName); } public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; } }
在選擇查找路由時,選擇消息隊列的關(guān)鍵步驟:
先按輪詢算法選擇一個消息隊列
從故障列表判斷該消息隊列是否可用
LatencyFaultToleranceImpl中判斷是否可用:
@Override public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { return faultItem.isAvailable(); } return true; } public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; }
判斷是否在故障列表中,不在故障列表中代表可用。
在故障列表中判斷當前時間是否大于等于故障規(guī)避的開始時間startTimestamp
在消息發(fā)送結(jié)束后和發(fā)送出現(xiàn)異常時調(diào)用updateFaultItem()
方法來更新故障列表,computeNotAvailableDuration()
根據(jù)響應(yīng)時間來計算故障周期時長,響應(yīng)時間越長故障周期越長。網(wǎng)絡(luò)異常、Broker異常、客戶端異常都是固定響應(yīng)時長30s,它們故障周期時長為10分鐘。消息發(fā)送成功或線程中斷異常響應(yīng)時間在100毫秒以內(nèi),故障周期時長為0。
LatencyFaultToleranceImpl類的updateFaultItem方法:
@Override public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { FaultItem old = this.faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); //加入故障列表 old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } }
FaultItem
存儲Broker名稱、響應(yīng)時長、故障規(guī)避開始時間,最重要的是故障規(guī)避開始時間,用來判斷Queue是否可用
感謝各位的閱讀!關(guān)于“RocketMQ設(shè)計之故障規(guī)避機制的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。