溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

MQTT大消息失敗原因排查的過程

發(fā)布時間:2021-10-19 10:55:13 來源:億速云 閱讀:243 作者:柒染 欄目:大數(shù)據(jù)

這篇文章將為大家詳細(xì)講解有關(guān)MQTT大消息失敗原因排查的過程,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

Background

小組內(nèi)使用 MQTT 協(xié)議搭建了一個聊天服務(wù)器,前天在測大消息(超過5000漢字)時,連接直接變得不可用,后續(xù)發(fā)送的消息全部都收不到回復(fù)。

服務(wù)器環(huán)境:
Netty :4.1.32.Final
使用的是 Netty 包中自帶的 MqttDecoder

客戶端: Android

排查過程

  1. 由于所有的消息都打印了日志,因此先搜了一下服務(wù)器日志,發(fā)現(xiàn)日志中并沒有發(fā)送的消息內(nèi)容。

  2. 難道是客戶端在超長消息時沒有發(fā)送?使用 tcpdump 抓了包,發(fā)現(xiàn)客戶端正常發(fā)送,并且所有的包服務(wù)端都已經(jīng) ack,但是后續(xù)服務(wù)端沒有發(fā)回響應(yīng),猜測是服務(wù)端在大消息的情況下處理失敗了。

    1. tcpdump 使用 -nn 打印出ip和端口,-X 打印網(wǎng)絡(luò)包的內(nèi)容,也可以使用-w 選項保存到文件里,然后使用 tcpdumpwireshark 來分析

  3. 于是查了一下 MQTT 支持的最大 payload,MQTT 官方文檔 中說明是 256M,這個大小肯定不會超過。

  4. 在服務(wù)端抓了下包,確認(rèn)消息已經(jīng)收到,但是無確認(rèn)消息返回

  5. 開啟線上debug,發(fā)現(xiàn)收到了一個 PUBLISH 類型的消息,但是消息的 class 不為 MqttPublishMessage, 且 payload 中無數(shù)據(jù),但在 Message 中有一個報錯消息 too large message: 56234 bytes

  6. Google 一下,有網(wǎng)友遇到了同樣的問題, 雖然這個問題里 MQTT 是 C 語言的。

  7. 查看 MqttDecoder, 發(fā)現(xiàn) decoder 有最長 payload 限制(以下為部分代碼),啟動代碼里調(diào)用的是默認(rèn)構(gòu)造函數(shù),因此默認(rèn)最長數(shù)據(jù)為 8092 字節(jié)。

public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
    private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092;
    public MqttDecoder() {
      this(DEFAULT_MAX_BYTES_IN_MESSAGE);
    }

    public MqttDecoder(int maxBytesInMessage) {
        super(DecoderState.READ_FIXED_HEADER);
        this.maxBytesInMessage = maxBytesInMessage;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
        switch (state()) {
            case READ_FIXED_HEADER: try {
                mqttFixedHeader = decodeFixedHeader(buffer);
                bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
                checkpoint(DecoderState.READ_VARIABLE_HEADER);
                // fall through
            } catch (Exception cause) {
                out.add(invalidMessage(cause));
                return;
            }

            case READ_VARIABLE_HEADER:  try {
                final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
                variableHeader = decodedVariableHeader.value;
                if (bytesRemainingInVariablePart > maxBytesInMessage) {
                    throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");
                }
                bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
                checkpoint(DecoderState.READ_PAYLOAD);
                // fall through
            } catch (Exception cause) {
                out.add(invalidMessage(cause));
                return;
            }

            case READ_PAYLOAD: try {
                final Result<?> decodedPayload =
                        decodePayload(
                                buffer,
                                mqttFixedHeader.messageType(),
                                bytesRemainingInVariablePart,
                                variableHeader);
                bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
                if (bytesRemainingInVariablePart != 0) {
                    throw new DecoderException(
                            "non-zero remaining payload bytes: " +
                                    bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
                }
                checkpoint(DecoderState.READ_FIXED_HEADER);
                MqttMessage message = MqttMessageFactory.newMessage(
                        mqttFixedHeader, variableHeader, decodedPayload.value);
                mqttFixedHeader = null;
                variableHeader = null;
                out.add(message);
                break;
            } catch (Exception cause) {
                out.add(invalidMessage(cause));
                return;
            }

            case BAD_MESSAGE:
                // Keep discarding until disconnection.
                buffer.skipBytes(actualReadableBytes());
                break;

            default:
                // Shouldn't reach here.
                throw new Error();
        }
    }

    private MqttMessage invalidMessage(Throwable cause) {
      checkpoint(DecoderState.BAD_MESSAGE);
      return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
    }
}
  1. 長消息的原因找到了,還剩一個問題,為什么后續(xù)的消息包括 ping 消息就再也發(fā)不出去了?經(jīng)過查看代碼,這與 MqttDecoder 的父類 ReplayingDecoder 有關(guān)系,查看源碼有詳盡的類說明, 在讀取可變長度頭部時,如果payload 超過了最大限制,那么直接拋出異常。摘出代碼如下:

case READ_VARIABLE_HEADER:  try {
    final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
    variableHeader = decodedVariableHeader.value;
    if (bytesRemainingInVariablePart > maxBytesInMessage) {
        throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");
    }
    bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
    checkpoint(DecoderState.READ_PAYLOAD);
    // fall through
} catch (Exception cause) {
    out.add(invalidMessage(cause));
    return;
}

在異常處理中,調(diào)用了 invalidMessage 方法,這個方法將 狀態(tài)設(shè)為 DecoderState.BAD_MESSAGE, 在這個狀態(tài)下,所有的字節(jié)都直接被丟棄。

case BAD_MESSAGE:
    // Keep discarding until disconnection.
    buffer.skipBytes(actualReadableBytes());
    break;

也就是說此后的消息都不會進入到業(yè)務(wù)處理邏輯,這條長連接廢掉了。

解決方案

  1. 客戶端對長消息做字?jǐn)?shù)限制和拆分,保證單條消息不超過最大限制

  2. 服務(wù)端增大最大載荷長度,MqttDecoder 提供了構(gòu)造函數(shù)(不建議使用,這樣會增大服務(wù)器處理時間和內(nèi)存負(fù)擔(dān))

關(guān)于MQTT大消息失敗原因排查的過程就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI