您好,登錄后才能下訂單哦!
這篇“springboot如何整合mqtt”文章的知識(shí)點(diǎn)大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細(xì),步驟清晰,具有一定的借鑒價(jià)值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來(lái)看看這篇“springboot如何整合mqtt”文章吧。
springboot 整合 mqtt
搭建的時(shí)候如果你使用的是集群 記得開(kāi)放以下端口:
好了, 搭建成功下一步就是我們的java程序要與mqtt連接, 這里有兩種方式(其實(shí)不止兩種)進(jìn)行連接.
一是 直接使用 MQTT Java 客戶端庫(kù)
二是使用 spring integration mqtt
也是比較推薦的一種,也是我們主講這種.
第一步 添加 maven dependency
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.5.14</version> </dependency>
第二步 添加配置
1 先寫(xiě)好一些基本配置
mqtt: username: test # 賬號(hào) password: 123456 # 密碼 host-url: tcp://127.0.0.1:1883 # mqtt連接tcp地址 in-client-id: ${random.value} # 隨機(jī)值,使出入站 client ID 不同 out-client-id: ${random.value} client-id: ${random.int} # 客戶端Id,不能相同,采用隨機(jī)數(shù) ${random.value} default-topic: test/#,topic/+/+/up # 默認(rèn)主題 timeout: 60 # 超時(shí)時(shí)間 keepalive: 60 # 保持連接 clearSession: true # 清除會(huì)話(設(shè)置為false,斷開(kāi)連接,重連后使用原來(lái)的會(huì)話 保留訂閱的主題,能接收離線期間的消息)
2.然后寫(xiě)一個(gè)對(duì)應(yīng)的類MqttProperties
import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * MqttProperties * * @author hengzi * @date 2022/8/23 */ @Component public class MqttProperties { /** * 用戶名 */ @Value("${mqtt.username}") private String username; /** * 密碼 */ @Value("${mqtt.password}") private String password; /** * 連接地址 */ @Value("${mqtt.host-url}") private String hostUrl; /** * 進(jìn)-客戶Id */ @Value("${mqtt.in-client-id}") private String inClientId; /** * 出-客戶Id */ @Value("${mqtt.out-client-id}") private String outClientId; /** * 客戶Id */ @Value("${mqtt.client-id}") private String clientId; /** * 默認(rèn)連接話題 */ @Value("${mqtt.default-topic}") private String defaultTopic; /** * 超時(shí)時(shí)間 */ @Value("${mqtt.timeout}") private int timeout; /** * 保持連接數(shù) */ @Value("${mqtt.keepalive}") private int keepalive; /**是否清除session*/ @Value("${mqtt.clearSession}") private boolean clearSession; // ...getter and setter }
接下來(lái)就是配置一些亂七八糟的東西, 這里有很多概念性的東西 比如 管道channel
, 適配器 adapter
, 入站Inbound
, 出站Outbound
,等等等等, 看起來(lái)是非常頭痛的
好吧,那就一個(gè)一個(gè)來(lái),
首先連接mqtt需要一個(gè)客戶端, 那么我們就開(kāi)一個(gè)客戶端工廠, 這里可以產(chǎn)生很多很多的客戶端
@Bean public MqttPahoClientFactory mqttPahoClientFactory(){ DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(mqttProperties.getHostUrl().split(",")); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); factory.setConnectionOptions(options); return factory; }
然后再搞兩根管子(channel
),一個(gè)出站,一個(gè)入站
//出站消息管道, @Bean public MessageChannel mqttOutboundChannel(){ return new DirectChannel(); } // 入站消息管道 @Bean public MessageChannel mqttInboundChannel(){ return new DirectChannel(); }
為了使這些管子能流通 就需要一個(gè)適配器(adapter
)
// Mqtt 管道適配器 @Bean public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){ return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(",")); }
然后定義消息生產(chǎn)者
// 消息生產(chǎn)者 @Bean public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){ adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); //入站投遞的通道 adapter.setOutputChannel(mqttInboundChannel()); adapter.setQos(1); return adapter; }
那我們收到消息去哪里處理呢,答案是這里:
@Bean //使用ServiceActivator 指定接收消息的管道為 mqttInboundChannel,投遞到mqttInboundChannel管道中的消息會(huì)被該方法接收并執(zhí)行 @ServiceActivator(inputChannel = "mqttInboundChannel") public MessageHandler handleMessage() { // 這個(gè) mqttMessageHandle 其實(shí)就是一個(gè) MessageHandler 的實(shí)現(xiàn)類(這個(gè)類我放下面) return mqttMessageHandle; // 你也可以這樣寫(xiě) // return new MessageHandler() { // @Override // public void handleMessage(Message<?> message) throws MessagingException { // // do something // } // };
到這里我們其實(shí)已經(jīng)可以接受到來(lái)自mqtt的消息了
接下來(lái)配置向mqtt發(fā)送消息
配置 出站處理器
// 出站處理器 @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound(MqttPahoClientFactory factory){ MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory); handler.setAsync(true); handler.setConverter(new DefaultPahoMessageConverter()); handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]); return handler; }
這個(gè) 出站處理器 在我看來(lái)就是讓別人 (MqttPahoMessageHandler
)處理了, 我就不處理了,我只管我要發(fā)送什么,至于怎么發(fā)送,由MqttPahoMessageHandler
來(lái)完成
接下來(lái)我們定義一個(gè)接口即可
import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; /** * MqttGateway * * @author hengzi * @date 2022/8/23 */ @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String data); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) Integer Qos, String data); }
我們直接調(diào)用這個(gè)接口就可以向mqtt 發(fā)送數(shù)據(jù)
到目前為止,整個(gè)配置文件長(zhǎng)這樣:
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; /** * MqttConfig * * @author hengzi * @date 2022/8/23 */ @Configuration public class MqttConfig { /** * 以下屬性將在配置文件中讀取 **/ @Autowired private MqttProperties mqttProperties; //Mqtt 客戶端工廠 @Bean public MqttPahoClientFactory mqttPahoClientFactory(){ DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(mqttProperties.getHostUrl().split(",")); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); factory.setConnectionOptions(options); return factory; } // Mqtt 管道適配器 @Bean public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){ return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(",")); } // 消息生產(chǎn)者 @Bean public MessageProducer mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter){ adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); //入站投遞的通道 adapter.setOutputChannel(mqttInboundChannel()); adapter.setQos(1); return adapter; } // 出站處理器 @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound(MqttPahoClientFactory factory){ MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory); handler.setAsync(true); handler.setConverter(new DefaultPahoMessageConverter()); handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]); return handler; } @Bean //使用ServiceActivator 指定接收消息的管道為 mqttInboundChannel,投遞到mqttInboundChannel管道中的消息會(huì)被該方法接收并執(zhí)行 @ServiceActivator(inputChannel = "mqttInboundChannel") public MessageHandler handleMessage() { return mqttMessageHandle; } //出站消息管道, @Bean public MessageChannel mqttOutboundChannel(){ return new DirectChannel(); } // 入站消息管道 @Bean public MessageChannel mqttInboundChannel(){ return new DirectChannel(); } }
處理消息的 MqttMessageHandle
@Component public class MqttMessageHandle implements MessageHandler { @Override public void handleMessage(Message<?> message) throws MessagingException { } }
在進(jìn)一步了解之后,發(fā)現(xiàn)可以優(yōu)化的地方,比如channel 的類型是有很多種的, 這里使用的DirectChannel
,是Spring Integration
默認(rèn)的消息通道,它將消息發(fā)送給為一個(gè)訂閱者,然后阻礙發(fā)送直到消息被接收,傳輸方式都是同步的方式,都是由一個(gè)線程來(lái)運(yùn)行的.
這里我們可以將入站channel
改成 ExecutorChannel
一個(gè)可以使用多線程的channel
@Bean public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 最大可創(chuàng)建的線程數(shù) int maxPoolSize = 200; executor.setMaxPoolSize(maxPoolSize); // 核心線程池大小 int corePoolSize = 50; executor.setCorePoolSize(corePoolSize); // 隊(duì)列最大長(zhǎng)度 int queueCapacity = 1000; executor.setQueueCapacity(queueCapacity); // 線程池維護(hù)線程所允許的空閑時(shí)間 int keepAliveSeconds = 300; executor.setKeepAliveSeconds(keepAliveSeconds); // 線程池對(duì)拒絕任務(wù)(無(wú)線程可用)的處理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } // 入站消息管道 @Bean public MessageChannel mqttInboundChannel(){ // 用線程池 return new ExecutorChannel(mqttThreadPoolTaskExecutor()); }
到這里其實(shí)可以運(yùn)行了.
但是這樣配置其實(shí)還是有點(diǎn)多, 有點(diǎn)亂, 于是我查找官網(wǎng), f發(fā)現(xiàn)一種更簡(jiǎn)單的配置方法 叫 Java DSL
我們參考官網(wǎng),稍微改一下,使用 DSL的方式進(jìn)行配置:
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.channel.ExecutorChannel; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; /** * MqttConfigV2 * * @author hengzi * @date 2022/8/24 */ @Configuration public class MqttConfigV2 { @Autowired private MqttProperties mqttProperties; @Autowired private MqttMessageHandle mqttMessageHandle; //Mqtt 客戶端工廠 所有客戶端從這里產(chǎn)生 @Bean public MqttPahoClientFactory mqttPahoClientFactory(){ DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(mqttProperties.getHostUrl().split(",")); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); factory.setConnectionOptions(options); return factory; } // Mqtt 管道適配器 @Bean public MqttPahoMessageDrivenChannelAdapter adapter(MqttPahoClientFactory factory){ return new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getInClientId(),factory,mqttProperties.getDefaultTopic().split(",")); } // 消息生產(chǎn)者 (接收,處理來(lái)自mqtt的消息) @Bean public IntegrationFlow mqttInbound(MqttPahoMessageDrivenChannelAdapter adapter) { adapter.setCompletionTimeout(5000); adapter.setQos(1); return IntegrationFlows.from( adapter) .channel(new ExecutorChannel(mqttThreadPoolTaskExecutor())) .handle(mqttMessageHandle) .get(); } @Bean public ThreadPoolTaskExecutor mqttThreadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); // 最大可創(chuàng)建的線程數(shù) int maxPoolSize = 200; executor.setMaxPoolSize(maxPoolSize); // 核心線程池大小 int corePoolSize = 50; executor.setCorePoolSize(corePoolSize); // 隊(duì)列最大長(zhǎng)度 int queueCapacity = 1000; executor.setQueueCapacity(queueCapacity); // 線程池維護(hù)線程所允許的空閑時(shí)間 int keepAliveSeconds = 300; executor.setKeepAliveSeconds(keepAliveSeconds); // 線程池對(duì)拒絕任務(wù)(無(wú)線程可用)的處理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } // 出站處理器 (向 mqtt 發(fā)送消息) @Bean public IntegrationFlow mqttOutboundFlow(MqttPahoClientFactory factory) { MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttProperties.getOutClientId(),factory); handler.setAsync(true); handler.setConverter(new DefaultPahoMessageConverter()); handler.setDefaultTopic(mqttProperties.getDefaultTopic().split(",")[0]); return IntegrationFlows.from( "mqttOutboundChannel").handle(handler).get(); } }
這樣看起來(lái)真的簡(jiǎn)單多了, 頭也沒(méi)那么大了, 我要是早知道多好.
好了以上就是配置相關(guān)的, 到這里其實(shí)是已經(jīng)完成springboot 與 mqtt 的整合了.
但其實(shí)我一直有個(gè)想法, 就是我們接收的消息 都是在 handleMessage
這個(gè)方法里面執(zhí)行的,
@Override public void handleMessage(Message<?> message) throws MessagingException { }
所以我就有了一個(gè)想法, 能不能根據(jù) 我訂閱的主題,在不同的方法執(zhí)行, 對(duì)于這個(gè)問(wèn)題,其實(shí)你用if ... else ...
也能實(shí)現(xiàn), 但很明顯,如果我訂閱的主題很多的話, 那寫(xiě)起來(lái)就很頭痛了.
對(duì)于這個(gè)問(wèn)題,有兩種思路, 一個(gè)是添加Spring Integration
的路由 router
,根據(jù)不同topic路由到不同的channel
, 這個(gè)我也知道能不能實(shí)現(xiàn), 我這里就不討論了.
第二種是, 我也不知道名字改如何叫, 我是參考了 spring
的 @Controller
的設(shè)計(jì), 暫且叫他注解模式.
眾所周知,我們的接口都是在類上加 @Controller
這個(gè)注解, 就代表這個(gè)類是 http 接口, 再在方法加上 @RequestMapping
就能實(shí)現(xiàn)不同的 url 調(diào)用不同的方法.
參數(shù)這個(gè)設(shè)計(jì) 我們?cè)陬惿厦婕?@MqttService
就代表這個(gè)類是專門(mén)處理mqtt消息的服務(wù)類
同時(shí) 在這個(gè)類的方法上 加上 @MqttTopic
就代表 這個(gè)主題由這個(gè)方法處理.
OK, 理論有了,接下來(lái)就是 實(shí)踐.
先定義 兩個(gè)注解
import org.springframework.core.annotation.AliasFor; import org.springframework.stereotype.Component; import java.lang.annotation.*; @Documented @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Component public @interface MqttService { @AliasFor( annotation = Component.class ) String value() default ""; }
加上 @Component
注解 spring就會(huì)掃描, 并注冊(cè)到IOC容器里
import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface MqttTopic { /** * 主題名字 */ String value() default ""; }
參考 @RequestMapping
我們使用起來(lái)應(yīng)該是這樣的:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; /** * MqttTopicHandle * * @author hengzi * @date 2022/8/24 */ @MqttService public class MqttTopicHandle { public static final Logger log = LoggerFactory.getLogger(MqttTopicHandle.class); // 這里的 # 號(hào)是通配符 @MqttTopic("test/#") public void test(Message<?> message){ log.info("test="+message.getPayload()); } // 這里的 + 號(hào)是通配符 @MqttTopic("topic/+/+/up") public void up(Message<?> message){ log.info("up="+message.getPayload()); } // 注意 你必須先訂閱 @MqttTopic("topic/1/2/down") public void down(Message<?> message){ log.info("down="+message.getPayload()); } }
OK 接下來(lái)就是實(shí)現(xiàn)這樣的使用
分析 :
當(dāng)我們收到消息時(shí), 我們從IOC容器中 找到所有 帶 @MqttService
注解的類
然后 遍歷這些類, 找到帶有 @MqttTopic
的方法
接著 把 @MqttTopic
注解的的值 與 接受到的topic 進(jìn)行對(duì)比
如果一致則執(zhí)行這個(gè)方法
廢話少說(shuō), 上代碼
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Map; /** * MessageHandleService * * @author hengzi * @date 2022/8/24 */ @Component public class MqttMessageHandle implements MessageHandler { public static final Logger log = LoggerFactory.getLogger(MqttMessageHandle.class); // 包含 @MqttService注解 的類(Component) public static Map<String, Object> mqttServices; /** * 所有mqtt到達(dá)的消息都會(huì)在這里處理 * 要注意這個(gè)方法是在線程池里面運(yùn)行的 * @param message message */ @Override public void handleMessage(Message<?> message) throws MessagingException { getMqttTopicService(message); } public Map<String, Object> getMqttServices(){ if(mqttServices==null){ mqttServices = SpringUtils.getBeansByAnnotation(MqttService.class); } return mqttServices; } public void getMqttTopicService(Message<?> message){ // 在這里 我們根據(jù)不同的 主題 分發(fā)不同的消息 String receivedTopic = message.getHeaders().get("mqtt_receivedTopic",String.class); if(receivedTopic==null || "".equals(receivedTopic)){ return; } for(Map.Entry<String, Object> entry : getMqttServices().entrySet()){ // 把所有帶有 @MqttService 的類遍歷 Class<?> clazz = entry.getValue().getClass(); // 獲取他所有方法 Method[] methods = clazz.getDeclaredMethods(); for ( Method method: methods ){ if (method.isAnnotationPresent(MqttTopic.class)){ // 如果這個(gè)方法有 這個(gè)注解 MqttTopic handleTopic = method.getAnnotation(MqttTopic.class); if(isMatch(receivedTopic,handleTopic.value())){ // 并且 這個(gè) topic 匹配成功 try { method.invoke(SpringUtils.getBean(clazz),message); return; } catch (IllegalAccessException e) { e.printStackTrace(); log.error("代理炸了"); } catch (InvocationTargetException e) { log.error("執(zhí)行 {} 方法出現(xiàn)錯(cuò)誤",handleTopic.value(),e); } } } } } } /** * mqtt 訂閱的主題與我實(shí)際的主題是否匹配 * @param topic 是實(shí)際的主題 * @param pattern 是我訂閱的主題 可以是通配符模式 * @return 是否匹配 */ public static boolean isMatch(String topic, String pattern){ if((topic==null) || (pattern==null) ){ return false; } if(topic.equals(pattern)){ // 完全相等是肯定匹配的 return true; } if("#".equals(pattern)){ // # 號(hào)代表所有主題 肯定匹配的 return true; } String[] splitTopic = topic.split("/"); String[] splitPattern = pattern.split("/"); boolean match = true; // 如果包含 # 則只需要判斷 # 前面的 for (int i = 0; i < splitPattern.length; i++) { if(!"#".equals(splitPattern[i])){ // 不是# 號(hào) 正常判斷 if(i>=splitTopic.length){ // 此時(shí)長(zhǎng)度不相等 不匹配 match = false; break; } if(!splitTopic[i].equals(splitPattern[i]) && !"+".equals(splitPattern[i])){ // 不相等 且不等于 + match = false; break; } } else { // 是# 號(hào) 肯定匹配的 break; } } return match; } }
工具類 SpringUtils
import org.springframework.aop.framework.AopContext; import org.springframework.beans.BeansException; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import java.util.Map; /** * spring工具類 方便在非spring管理環(huán)境中獲取bean * */ @Component public final class SpringUtils implements BeanFactoryPostProcessor, ApplicationContextAware { /** Spring應(yīng)用上下文環(huán)境 */ private static ConfigurableListableBeanFactory beanFactory; private static ApplicationContext applicationContext; public static Map<String, Object> getBeansByAnnotation(Class clsName) throws BeansException{ return beanFactory.getBeansWithAnnotation(clsName); } @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { SpringUtils.beanFactory = beanFactory; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringUtils.applicationContext = applicationContext; } /** * 獲取對(duì)象 * * @param name * @return Object 一個(gè)以所給名字注冊(cè)的bean的實(shí)例 * @throws org.springframework.beans.BeansException * */ @SuppressWarnings("unchecked") public static <T> T getBean(String name) throws BeansException { return (T) beanFactory.getBean(name); } /** * 獲取類型為requiredType的對(duì)象 * * @param clz * @return * @throws org.springframework.beans.BeansException * */ public static <T> T getBean(Class<T> clz) throws BeansException { T result = (T) beanFactory.getBean(clz); return result; } /** * 如果BeanFactory包含一個(gè)與所給名稱匹配的bean定義,則返回true * * @param name * @return boolean */ public static boolean containsBean(String name) { return beanFactory.containsBean(name); } /** * 判斷以給定名字注冊(cè)的bean定義是一個(gè)singleton還是一個(gè)prototype。 如果與給定名字相應(yīng)的bean定義沒(méi)有被找到,將會(huì)拋出一個(gè)異常(NoSuchBeanDefinitionException) * * @param name * @return boolean * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException * */ public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException { return beanFactory.isSingleton(name); } /** * @param name * @return Class 注冊(cè)對(duì)象的類型 * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException * */ public static Class<?> getType(String name) throws NoSuchBeanDefinitionException { return beanFactory.getType(name); } /** * 如果給定的bean名字在bean定義中有別名,則返回這些別名 * * @param name * @return * @throws org.springframework.beans.factory.NoSuchBeanDefinitionException * */ public static String[] getAliases(String name) throws NoSuchBeanDefinitionException { return beanFactory.getAliases(name); } /** * 獲取aop代理對(duì)象 * * @param invoker * @return */ @SuppressWarnings("unchecked") public static <T> T getAopProxy(T invoker) { return (T) AopContext.currentProxy(); } /** * 獲取當(dāng)前的環(huán)境配置,無(wú)配置返回null * * @return 當(dāng)前的環(huán)境配置 */ public static String[] getActiveProfiles() { return applicationContext.getEnvironment().getActiveProfiles(); } }
OK, 大功告成. 終于舒服了, 終于不用寫(xiě)if...else...
了, 個(gè)人感覺(jué)這樣處理起來(lái)會(huì)更加優(yōu)雅. 寫(xiě)代碼最重要是什么, 是優(yōu)雅~
以上!
參考文章:
使用 Spring integration 在Springboot中集成Mqtt
Spring Integration(一)概述
附:
動(dòng)態(tài)添加主題方式:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.stereotype.Service; import java.util.Arrays; /** * MqttService * * @author hengzi * @date 2022/8/25 */ @Service public class MqttService { @Autowired private MqttPahoMessageDrivenChannelAdapter adapter; public void addTopic(String topic) { addTopic(topic, 1); } public void addTopic(String topic,int qos) { String[] topics = adapter.getTopic(); if(!Arrays.asList(topics).contains(topic)){ adapter.addTopic(topic,qos); } } public void removeTopic(String topic) { adapter.removeTopic(topic); } }
直接調(diào)用就行
以上就是關(guān)于“springboot如何整合mqtt”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對(duì)大家有幫助,若想了解更多相關(guān)的知識(shí)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。