溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

SpringBoot如何實(shí)現(xiàn)MQTT消息發(fā)送和接收

發(fā)布時(shí)間:2023-03-11 11:40:13 來(lái)源:億速云 閱讀:149 作者:iii 欄目:開(kāi)發(fā)技術(shù)

今天小編給大家分享一下SpringBoot如何實(shí)現(xiàn)MQTT消息發(fā)送和接收的相關(guān)知識(shí)點(diǎn),內(nèi)容詳細(xì),邏輯清晰,相信大部分人都還太了解這方面的知識(shí),所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來(lái)了解一下吧。

Spring integration交互邏輯

對(duì)于發(fā)布者:

1.消息通過(guò)消息網(wǎng)關(guān)發(fā)送出去,由 MessageChannel 的實(shí)例 DirectChannel 處理發(fā)送的細(xì)節(jié)。

2.DirectChannel 收到消息后,內(nèi)部通過(guò) MessageHandler 的實(shí)例 MqttPahoMessageHandler 發(fā)送到指定的 Topic。

對(duì)于訂閱者:

1.通過(guò)注入 MessageProducerSupport 的實(shí)例 MqttPahoMessageDrivenChannelAdapter,實(shí)現(xiàn)訂閱 Topic 和綁定消息消費(fèi)的 MessageChannel

2.同樣由 MessageChannel 的實(shí)例 DirectChannel 處理消費(fèi)細(xì)節(jié)。

Channel 消息后會(huì)發(fā)送給我們自定義的 MqttInboundMessageHandler 實(shí)例進(jìn)行消費(fèi)。

可以看到整個(gè)處理的流程和前面將的基本一致。Spring Integration 就是抽象出了這么一套消息通信的機(jī)制,具體的通信細(xì)節(jié)由它集成的中間件來(lái)決定。

1、maven依賴

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-integration -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
    <version>2.5.1</version>
</dependency>
 
<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-stream -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
    <version>5.5.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.5.5</version>
</dependency>

2、yaml配置文件

#mqtt配置
mqtt:
  username: 123
  password: 123
  #MQTT-服務(wù)器連接地址,如果有多個(gè),用逗號(hào)隔開(kāi)
  url: tcp://127.0.0.1:1883
  #MQTT-連接服務(wù)器默認(rèn)客戶端ID
  client:
    id: ${random.value}
  default:
    #MQTT-默認(rèn)的消息推送主題,實(shí)際可在調(diào)用接口時(shí)指定
    topic: topic,mqtt/test/#
    #連接超時(shí)
  completionTimeout: 3000

3、mqtt生產(chǎn)者消費(fèi)者配置類(lèi)

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
 
import java.util.Arrays;
import java.util.List;
 
/**
 * mqtt 推送and接收 消息類(lèi)
 **/
@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttSenderAndReceiveConfig {
 
    private static final byte[] WILL_DATA;
 
    static {
        WILL_DATA = "offline".getBytes();
    }
 
    @Autowired
    private MqttReceiveHandle mqttReceiveHandle;
 
    @Value("${mqtt.username}")
    private String username;
 
    @Value("${mqtt.password}")
    private String password;
 
    @Value("${mqtt.url}")
    private String hostUrl;
 
    @Value("${mqtt.client.id}")
    private String clientId;
 
    @Value("${mqtt.default.topic}")
    private String defaultTopic;
 
    @Value("${mqtt.completionTimeout}")
    private int completionTimeout;   //連接超時(shí)
 
    /**
     * MQTT連接器選項(xiàng)
     **/
    @Bean(value = "getMqttConnectOptions")
    public MqttConnectOptions getMqttConnectOptions1() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        // 設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會(huì)保留客戶端的連接記錄,這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接
        mqttConnectOptions.setCleanSession(true);
        // 設(shè)置超時(shí)時(shí)間 單位為秒
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        // 設(shè)置會(huì)話心跳時(shí)間 單位為秒 服務(wù)器會(huì)每隔1.5*20秒的時(shí)間向客戶端發(fā)送心跳判斷客戶端是否在線,但這個(gè)方法并沒(méi)有重連的機(jī)制
        mqttConnectOptions.setKeepAliveInterval(10);
        // 設(shè)置“遺囑”消息的話題,若客戶端與服務(wù)器之間的連接意外中斷,服務(wù)器將發(fā)布客戶端的“遺囑”消息。
        //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);
        return mqttConnectOptions;
    }
 
    /**
     * MQTT工廠
     **/
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions1());
        return factory;
    }
 
    /**
     * MQTT信息通道(生產(chǎn)者)
     **/
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
 
    /**
     * MQTT消息處理器(生產(chǎn)者)
     **/
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        messageHandler.setAsyncEvents(true); // 消息發(fā)送和傳輸完成會(huì)有異步的通知回調(diào)
        //設(shè)置轉(zhuǎn)換器 發(fā)送bytes數(shù)據(jù)
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);
        return messageHandler;
    }
 
    /**
     * 配置client,監(jiān)聽(tīng)的topic
     * MQTT消息訂閱綁定(消費(fèi)者)
     **/
    @Bean
    public MessageProducer inbound() {
        List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
        String[] topics = new String[topicList.size()];
        topicList.toArray(topics);
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId + "_consumer", mqttClientFactory(), topics);
        adapter.setCompletionTimeout(completionTimeout);
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);
        adapter.setConverter(converter);
        adapter.setQos(2);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
 
    /**
     * MQTT信息通道(消費(fèi)者)
     **/
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
 
    /**
     * MQTT消息處理器(消費(fèi)者)
     **/
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                //處理接收消息
                mqttReceiveHandle.handle(message);
            }
        };
    }
}

4、消息處理類(lèi) 

/**
 * mqtt客戶端消息處理類(lèi)
 **/
@Slf4j
@Component
public class MqttReceiveHandle {
 
    public void handle(Message<?> message) {
        log.info("收到訂閱消息: {}", message);
        String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
        log.info("消息主題:{}", topic);
        Object payLoad = message.getPayload();
        byte[] data = (byte[]) payLoad;
        Packet packet = Packet.parse(data);
        log.info("發(fā)送的Packet數(shù)據(jù){}", JSON.toJSONString(packet));
 
    }
}

5、mqtt發(fā)送接口 

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
 
/**
 * mqtt發(fā)送消息
 * (defaultRequestChannel = "mqttOutboundChannel" 對(duì)應(yīng)config配置)
 * **/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
 
    /**
     * 發(fā)送信息到MQTT服務(wù)器
     *
     * @param
     */
    void sendToMqttObject(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
 
    /**
     * 發(fā)送信息到MQTT服務(wù)器
     *
     * @param topic 主題
     * @param payload 消息主體
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
 
    /**
     * 發(fā)送信息到MQTT服務(wù)器
     *
     * @param topic 主題
     * @param qos 對(duì)消息處理的幾種機(jī)制。
     * 0 表示的是訂閱者沒(méi)收到消息不會(huì)再次發(fā)送,消息會(huì)丟失。
     * 1 表示的是會(huì)嘗試重試,一直到接收到消息,但這種情況可能導(dǎo)致訂閱者收到多次重復(fù)消息。
     * 2 多了一次去重的動(dòng)作,確保訂閱者收到的消息有一次。
     * @param payload 消息主體
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
 
    /**
     * 發(fā)送信息到MQTT服務(wù)器
     *
     * @param topic 主題
     * @param payload 消息主體
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, Object payload);
 
    /**
     * 發(fā)送信息到MQTT服務(wù)器
     *
     * @param topic 主題
     * @param payload 消息主體
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
}

6、mqtt事件監(jiān)聽(tīng)類(lèi) 

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.stereotype.Component;
 
@Slf4j
@Component
public class MqttListener {
    /**
     * 連接失敗的事件通知
     * @param mqttConnectionFailedEvent
     */
    @EventListener(classes = MqttConnectionFailedEvent.class)
    public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) {
        log.info("連接失敗的事件通知");
    }
 
    /**
     * 已發(fā)送的事件通知
     * @param mqttMessageSentEvent
     */
    @EventListener(classes = MqttMessageSentEvent.class)
    public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) {
        log.info("已發(fā)送的事件通知");
    }
 
    /**
     * 已傳輸完成的事件通知
     * 1.QOS == 0,發(fā)送消息后會(huì)即可進(jìn)行此事件回調(diào),因?yàn)椴恍枰却貓?zhí)
     * 2.QOS == 1,發(fā)送消息后會(huì)等待ACK回執(zhí),ACK回執(zhí)后會(huì)進(jìn)行此事件通知
     * 3.QOS == 2,發(fā)送消息后會(huì)等待PubRECV回執(zhí),知道收到PubCOMP后會(huì)進(jìn)行此事件通知
     * @param mqttMessageDeliveredEvent
     */
    @EventListener(classes = MqttMessageDeliveredEvent.class)
    public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) {
        log.info("已傳輸完成的事件通知");
    }
 
    /**
     * 消息訂閱的事件通知
     * @param mqttSubscribedEvent
     */
    @EventListener(classes = MqttSubscribedEvent.class)
    public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) {
        log.info("消息訂閱的事件通知");
    }
}

 7、接口測(cè)試

@Resource
    private MqttGateway mqttGateway;
    /**
     * sendData 消息
     * topic 訂閱主題
     **/
    @RequestMapping(value = "/sendMqtt",method = RequestMethod.POST)
    public String sendMqtt(String sendData, String topic) {
        MqttMessage mqttMessage = new MqttMessage();
        mqttGateway.sendToMqtt(topic, sendData);
        //mqttGateway.sendToMqttObject(topic, sendData.getBytes());
        return "OK";
    }

以上就是“SpringBoot如何實(shí)現(xiàn)MQTT消息發(fā)送和接收”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會(huì)為大家更新不同的知識(shí),如果還想學(xué)習(xí)更多的知識(shí),請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI