您好,登錄后才能下訂單哦!
首先解釋一下mqtt協(xié)議的session的概念,因?yàn)橹挥杏辛藄ession才會存在消息質(zhì)量保證一說
如果清理會話(CleanSession)標(biāo)志被設(shè)置為0,服務(wù)端必須基于當(dāng)前會話(使用客戶端標(biāo)識符識別)的狀態(tài)恢復(fù)與客戶端的通信。如果沒有與這個客戶端標(biāo)識符關(guān)聯(lián)的會話,服務(wù)端必須創(chuàng)建一個新的會話。在連接斷開之后,當(dāng)連接斷開后,客戶端和服務(wù)端必須保存會話信息 [MQTT-3.1.2-4]。當(dāng)清理會話標(biāo)志為0的會話連接斷開之后,服務(wù)端必須將之后的QoS 1和QoS 2級別的消息保存為會話狀態(tài)的一部分,如果這些消息匹配斷開連接時客戶端的任何訂閱 [MQTT-3.1.2-5]。服務(wù)端也可以保存滿足相同條件的QoS 0級別的消息。
如果清理會話(CleanSession)標(biāo)志被設(shè)置為1,客戶端和服務(wù)端必須丟棄之前的任何會話并開始一個新的會話。會話僅持續(xù)和網(wǎng)絡(luò)連接同樣長的時間。與這個會話關(guān)聯(lián)的狀態(tài)數(shù)據(jù)不能被任何之后的會話重用 [MQTT-3.1.2-6]。
客戶端的會話狀態(tài)包括:
● 已經(jīng)發(fā)送給服務(wù)端,但是還沒有完成確認(rèn)的QoS 1和QoS 2級別的消息
● 已從服務(wù)端接收,但是還沒有完成確認(rèn)的QoS 2級別的消息。
服務(wù)端的會話狀態(tài)包括:
● 會話是否存在,即使會話狀態(tài)的其它部分都是空。
● 客戶端的訂閱信息。
● 已經(jīng)發(fā)送給客戶端,但是還沒有完成確認(rèn)的QoS 1和QoS 2級別的消息。
● 即將傳輸給客戶端的QoS 1和QoS 2級別的消息。
● 已從客戶端接收,但是還沒有完成確認(rèn)的QoS 2級別的消息。
● 可選,準(zhǔn)備發(fā)送給客戶端的QoS 0級別的消息。
保留消息不是服務(wù)端會話狀態(tài)的一部分,會話終止時不能刪除保留消息 [MQTT-3.1.2.7]。
有關(guān)狀態(tài)存儲的限制和細(xì)節(jié)見第 4.1節(jié)。
當(dāng)清理會話標(biāo)志被設(shè)置為1時,客戶端和服務(wù)端的狀態(tài)刪除不需要是原子操作。
非規(guī)范評注
為了確保在發(fā)生故障時狀態(tài)的一致性,客戶端應(yīng)該使用會話狀態(tài)標(biāo)志1重復(fù)請求連接,直到連接成功。
非規(guī)范評注
一般來說,客戶端連接時總是將清理會話標(biāo)志設(shè)置為0或1,并且不交替使用兩種值。這個選擇取決于具體的應(yīng)用。清理會話標(biāo)志設(shè)置為1的客戶端不會收到舊的應(yīng)用消息,而且在每次連接成功后都需要重新訂閱任何相關(guān)的主題。清理會話標(biāo)志設(shè)置為0的客戶端會收到所有在它連接斷開期間發(fā)布的QoS 1和QoS 2級別的消息。因此,要確保不丟失連接斷開期間的消息,需要使用QoS 1或 QoS 2級別,同時將清理會話標(biāo)志設(shè)置為0。
非規(guī)范評注
清理會話標(biāo)志0的客戶端連接時,它請求服務(wù)端在連接斷開后保留它的MQTT會話狀態(tài)。如果打算在之后的某個時間點(diǎn)重連到這個服務(wù)端,客戶端連接應(yīng)該只使用清理會話標(biāo)志0。當(dāng)客戶端決定之后不再使用這個會話時,應(yīng)該將清理會話標(biāo)志設(shè)置為1最后再連接一次,然后斷開連接。
一。mqtt協(xié)議支持三種消息等級,分別是:
1.0--AT_MOST_ONCE,至多一次
2.1--AT_LEAST_ONCE,至少一次
3.2--EXACTLY_ONCE,有且僅有一次
二。其實(shí)AMQP協(xié)議也支持這三種。我們來看一下mqtt對于這三種消息的實(shí)現(xiàn)方式,對于理解AMQP的消息等級也有幫助
AT_MOST_ONCE比較簡單,這里不說了,只說后兩種
先看消息等級為1,moquette是怎么實(shí)現(xiàn)的。
先上一張圖,便于理解,
moquette對于session的實(shí)現(xiàn)在這里:這里以session存在在內(nèi)存的實(shí)現(xiàn)來講解
io.moquette.persistence.MemorySessionStore
class Session {
final String clientID;
final ClientSession clientSession;
final Map<Topic, Subscription> subscriptions = new ConcurrentHashMap<>();
final AtomicReference<PersistentSession> persistentSession = new AtomicReference<>(null);
final BlockingQueue<StoredMessage> queue = new ArrayBlockingQueue<>(Constants.MAX_MESSAGE_QUEUE);
final Map<Integer, StoredMessage> secondPhaseStore = new ConcurrentHashMap<>();
final Map<Integer, StoredMessage> outboundFlightMessages =
Collections.synchronizedMap(new HashMap<Integer, StoredMessage>());
final Map<Integer, StoredMessage> inboundFlightMessages = new ConcurrentHashMap<>();
Session(String clientID, ClientSession clientSession) {
this.clientID = clientID;
this.clientSession = clientSession;
}
}
對于broker分發(fā)的qos1消息,在沒有收到ack消息之前是存儲在outboundFlightMessages里面的。
結(jié)合上面的流程圖,很好理解了
這里補(bǔ)充說明一點(diǎn),moquette的實(shí)現(xiàn)并沒有對以發(fā)送出去,而沒有收到ack消息做處理,按理說這是session的一部分,如果客戶端要求保持會話,客戶端斷線重連之后,是應(yīng)該進(jìn)行重發(fā)的,當(dāng)然大家也可以自己實(shí)現(xiàn)。
再看消息等級為2,看下面的圖
結(jié)合上面的圖就很好理解了,這里注意,moquette 其實(shí)在分發(fā)消息之前,是應(yīng)該先從nflightBound..remove(messageId_1)的,但是它沒有這么做,這其實(shí)是個bug,大家如果沒用到cleanSession和qos2消息還好,用到了就有可能存在內(nèi)存泄露
三。末尾說一下mqtt的重發(fā)機(jī)制,
mqtt- broker只會在下面一種情況下重發(fā)消息:
必須滿足兩個條件:
1.客戶端要求broker保持會話,就是說qos1和qos2消息是會話的一部分。(上一次連接的時候要求保存會話)
2.客戶端重連成功,就是說連接正常能夠通信,也就是說broker對于qos1和qos2消息只做支持,不做保證(tcp連接正常,四層傳輸正常)
因?yàn)樗鼪]法做保證,網(wǎng)絡(luò)不通,機(jī)器斷電,磁盤損害等。qos1和qos2只在連接正常的前提之下做出保證。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。