溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

mqtt協(xié)議-broker之moqutte源碼研究二之Connect報文處理

發(fā)布時間:2020-06-29 10:43:46 來源:網(wǎng)絡(luò) 閱讀:7561 作者:xingyuntian 欄目:開發(fā)技術(shù)

先上一個圖,大概說明一下moquette 的類之間的關(guān)系
mqtt協(xié)議-broker之moqutte源碼研究二之Connect報文處理

一.ProtocolProcessor類
該類是moquette里面的最終要的類,負(fù)責(zé)所有報文的處理,持有所有各模塊功能的實現(xiàn)對象的引用, 下面詳細(xì)介紹

    protected ConnectionDescriptorStore connectionDescriptors;//所有的連接描述符文存儲,即clientId與通道之間的映射集合
protected ConcurrentMap<RunningSubscription, SubscriptionState> subscriptionInCourse;//所有當(dāng)前正在處理的
    訂閱關(guān)系的存儲,之所以有這個是過濾無效的訂閱請求
private SubscriptionsDirectory subscriptions;//訂閱目錄,本質(zhì)上是topic樹
private ISubscriptionsStore subscriptionStore;//所有的訂閱的集合
private boolean allowAnonymous;//是否允許匿名連接
private boolean allowZeroByteClientId;//是否允許clientId為空
private IAuthorizator m_authorizator; //對topic的讀寫權(quán)限認(rèn)證

private IMessagesStore m_messagesStore;//retainMessage的存儲

private ISessionsStore m_sessionsStore;//session 存儲

private IAuthenticator m_authenticator;//連接時候的鑒權(quán)認(rèn)證
private BrokerInterceptor m_interceptor;//各個層面的攔截器

private Qos0PublishHandler qos0PublishHandler;//qos0攔截器
private Qos1PublishHandler qos1PublishHandler;//qos1攔截器
private Qos2PublishHandler qos2PublishHandler;/qos2攔截器
private MessagesPublisher messagesPublisher;//分發(fā)消息,遺愿消息,以及集權(quán)間同步消息
private InternalRepublisher internalRepublisher;//保留消息,qos1,qos2消息重發(fā)器
    ConcurrentMap<String, WillMessage> m_willStore//遺愿消息存儲

    幾乎所有的功能的源頭都在這個類里面

二.對14種報文的處理,都在ProtocolProcessor類,后面會分篇挨個講解moquette對這14個報文的處理
具體哪14中文報文如下

名字                   值           報文流動方向                      描述

Reserved 0 禁止 保留
CONNECT 1 客戶端到服務(wù)端 客戶端請求連接服務(wù)端
CONNACK 2 服務(wù)端到客戶端 連接報文確認(rèn)
PUBLISH 3 兩個方向都允許 發(fā)布消息
PUBACK 4 兩個方向都允許 QoS 1消息發(fā)布收到確認(rèn)
PUBREC 5 兩個方向都允許 發(fā)布收到(保證交付第一步)
PUBREL 6 兩個方向都允許 發(fā)布釋放(保證交付第二步)
PUBCOMP 7 兩個方向都允許 QoS 2消息發(fā)布完成(保證交互第三步)
SUBSCRIBE 8 客戶端到服務(wù)端 客戶端訂閱請求
SUBACK 9 服務(wù)端到客戶端 訂閱請求報文確認(rèn)
UNSUBSCRIBE 10 客戶端到服務(wù)端 客戶端取消訂閱請求
UNSUBACK 11 服務(wù)端到客戶端 取消訂閱報文確認(rèn)
PINGREQ 12 客戶端到服務(wù)端 心跳請求
PINGRESP 13 服務(wù)端到客戶端 心跳響應(yīng)
DISCONNECT 14 客戶端到服務(wù)端 客戶端斷開連接
Reserved 15 禁止 保留

或者到這里看更詳細(xì)的mqtt中文翻譯
https://github.com/mcxiaoke/mqtt/blob/master/mqtt/02-ControlPacketFormat.md
非常感謝作者的辛勞工作和無私分享

三.debug跟蹤moquette 對CONNECT報文的處理
大概分為以下幾步
1.驗證協(xié)議版本,如果不是mqtt-3.1或者mqtt-3.1.1則拒絕連接
2.驗證clientId是否為空,如果為空,但是配置的時候(在上篇介紹的moquette.cof里面配置)要求不允許唯恐,即上面的allowZeroByteClientId或者cleanSession為false即要求保存會話,則視為不合法,拒絕連接,否則由moquette生成clientId
3.驗證是否有登錄的權(quán)限
這里面貼上源碼講解一下
private boolean login(Channel channel, MqttConnectMessage msg, final String clientId) {
// handle user authentication
if (msg.variableHeader().hasUserName()) {
byte[] pwd = null;
if (msg.variableHeader().hasPassword()) {
pwd = msg.payload().passwordInBytes();
} else if (!this.allowAnonymous) {
LOG.error("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", clientId);
failedCredentials(channel);
return false;
}
if (!m_authenticator.checkValid(clientId, msg.payload().userName(), pwd)) {
LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}, password={}",
clientId, msg.payload().userName(), pwd);
failedCredentials(channel);
return false;
}
NettyUtils.userName(channel, msg.payload().userName());
} else if (!this.allowAnonymous) {
LOG.error("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
failedCredentials(channel);
return false;
}
return true;
}

3.1.如果CONNETCT報文里面的可變頭里面沒有用戶名,直接返回true
3.2.如果有用戶名,同時有密碼,從可變頭取出密碼,調(diào)用m_authenticator進(jìn)行驗證
3.3 如果有用戶名,沒有密碼,認(rèn)證失敗,拒絕連接
3.4 如果沒有用戶名,同時配置為不允許匿名,則認(rèn)證失敗

4.創(chuàng)建連接描述符,連接描述符包括clientId,channel,isCleanSession,ConnectState,同時判斷連接描述符集合里面是否包括該連接描述符,如果包含,代表該連接以及建立,斷開連接
5.根據(jù)CONNECT報文里面的Keep Alive time 來設(shè)置tcp參數(shù)
6.根據(jù)CONNECT報文遺愿消息標(biāo)志位,覺得是否存儲遺愿消息
7.返回CONNACK報文,這里面把返回CONNACK報文單獨講解一下

        private boolean sendAck(ConnectionDescriptor descriptor, MqttConnectMessage msg, final String clientId) {
    LOG.info("Sending connect ACK. CId={}", clientId);
    final boolean success = descriptor.assignState(DISCONNECTED, SENDACK);
    if (!success) {
        return false;
    }

    MqttConnAckMessage okResp;
    ClientSession clientSession = m_sessionsStore.sessionForClient(clientId);
    boolean isSessionAlreadyStored = clientSession != null;
    if (!msg.variableHeader().isCleanSession() && isSessionAlreadyStored) {
        okResp = connAckWithSessionPresent(CONNECTION_ACCEPTED);
    } else {
        okResp = connAck(CONNECTION_ACCEPTED);
    }

    if (isSessionAlreadyStored) {
        LOG.info("Cleaning session. CId={}", clientId);
        clientSession.cleanSession(msg.variableHeader().isCleanSession());
    }
    descriptor.writeAndFlush(okResp);
    LOG.info("The connect ACK has been sent. CId={}", clientId);
    return true;
}

        7.1 判斷當(dāng)前連接的狀態(tài),怎么判斷的呢?這里面用了AtomicReference<ConnectionState>通過調(diào)用原子引用類  compareAndSet(DISCONNECTED, SENDACK)來解決并發(fā)修改連接狀態(tài)的問題。
        7.2如果狀態(tài)是disConnect,將狀態(tài)修改為sendAck
        7.3 如果CONNETCT報文里面的CleanSession標(biāo)識設(shè)置為0同時broker已經(jīng)有了client的會話,將CONNACK報文里面的連接確認(rèn)標(biāo)志設(shè)為1,告訴客戶端,broker已經(jīng)有了響應(yīng)的會話信息。否則將連接確認(rèn)標(biāo)志設(shè)為0
        7.4 如果已經(jīng)存在相應(yīng)的client的會話,則根據(jù)新的連接,更新clientSession里面的是否清理session屬性

8.喚醒攔截器記錄連接事件
9.創(chuàng)建或者從新加載clientSession,這里面單獨講解一下

            private ClientSession createOrLoadClientSession(ConnectionDescriptor descriptor, MqttConnectMessage msg,
        String clientId) {
    final boolean success = descriptor.assignState(SENDACK, SESSION_CREATED);
    if (!success) {
        return null;
    }

    ClientSession clientSession = m_sessionsStore.sessionForClient(clientId);
    boolean isSessionAlreadyStored = clientSession != null;
    if (!isSessionAlreadyStored) {
        clientSession = m_sessionsStore.createNewSession(clientId, msg.variableHeader().isCleanSession());
    }
    if (msg.variableHeader().isCleanSession()) {
        LOG.info("Cleaning session. CId={}", clientId);
        clientSession.cleanSession();
    }
    return clientSession;
}

     9.1 AtomicReference<ConnectionState>通過調(diào)用原子引用類  compareAndSet(SENDACK, SESSION_CREATED)將連接狀態(tài)從sendAck修改為session_create
     9.2 session存儲結(jié)合里面,是否已經(jīng)存在會話信息,如果不存在,創(chuàng)建一個新的clientsession
     9.3 如果存在,根據(jù)CONNETCT報文里面的cleansession自動決定是否清理調(diào)舊的會話信息。

10.如果CONNETCT報文要求不清理會話信息(cleansession標(biāo)志位為0),則重發(fā)QoS1 and QoS2 messages,同時將連接狀態(tài)從session_create修改成message_republish
11.將連接狀態(tài)從session_create修改成established

到此,broker和client直接的mqtt連接正式建立,后面client可以開始發(fā)送SUBSCRIBE或者PUBLISH報文了。
在這里再補充一點,對于broker來說,建立連接的過程中,連接狀態(tài)會從disConnect->sendAck->session_create->message_republish->established,之所以要設(shè)置這些狀態(tài),是因為,每一步后面的操作都要基于前面的狀態(tài)來決定是否需要真正執(zhí)行,這里面用到了原子引用類來保證,狀態(tài)的修改這個操作的原子行,確保了在并發(fā)的情況下,每一步操作都是條件滿足的。

下面一篇將會講解SUBSCRIBE報文的處理

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI