您好,登錄后才能下訂單哦!
小編給大家分享一下JMS是什么意思,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!
JMS即Java消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個(gè)Java平臺(tái)中關(guān)于面向消息中間件(MOM)的API,用于在兩個(gè)應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信。Java消息服務(wù)是一個(gè)與具體平臺(tái)無(wú)關(guān)的API,絕大多數(shù)MOM提供商都對(duì)JMS提供支持。
基于之前一篇“一個(gè)故事告訴你什么是消息隊(duì)列”,了解了消息隊(duì)列的使用場(chǎng)景以及相關(guān)的特性。本文主要講述消息服務(wù)在 JAVA 中的使用。
市面上的有關(guān)消息隊(duì)列的技術(shù)選型非常多,如果我們的代碼框架要支持不同的消息實(shí)現(xiàn),在保證框架具有較高擴(kuò)展性的前提下,我們勢(shì)必要進(jìn)行一定的封裝。
在 JAVA 中,大可不必如此。因?yàn)?JAVA 已經(jīng)制定了一套標(biāo)準(zhǔn)的 JMS 規(guī)范。該規(guī)范定義了一套通用的接口和相關(guān)語(yǔ)義,提供了諸如持久、驗(yàn)證和事務(wù)的消息服務(wù),其最主要的目的是允許Java應(yīng)用程序訪問現(xiàn)有的消息中間件。就和 JDBC 一樣。
在介紹具體的使用之前,先簡(jiǎn)單介紹一下 JMS 的一些基本知識(shí)。這里我打算分為 3 部分來介紹,即 消息隊(duì)列(MQ)的連接、消息發(fā)送與消息接收。
這里我們的技術(shù)選型是 SpringBoot、JMS、ActiveMQ
為了更好的理解 JMS,這里沒有使用 SpringBoot 零配置來搭建項(xiàng)目
使用 MQ 的第一步一定是先連接 MQ。因?yàn)檫@里使用的是 JMS 規(guī)范,對(duì)于任何遵守 JMS 規(guī)范的 MQ 來說,都會(huì)實(shí)現(xiàn)相應(yīng)的ConnectionFactory接口,因此我們只需要?jiǎng)?chuàng)建一個(gè)ConnectionFactory工廠類,由它來實(shí)現(xiàn) MQ 的連接,以及封裝一系列特性的 MQ 參數(shù)。
例子:這里我們以 ActiveMQ 為例,
maven 依賴:
org.springframework.bootspring-boot-starter-parent1.5.3.RELEASE org.springframework.boot spring-boot-starter-activemq
創(chuàng)建 ActiveMQ 連接工廠:
@Bean public ConnectionFactory connectionFactory(){ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL(ActiveMQ_URL); connectionFactory.setUserName(ActiveMQ_USER); connectionFactory.setPassword(ActiveMQ_PASSWORD); return connectionFactory; }
關(guān)于消息的發(fā)送,是通過 JMS 核心包中的JmsTemplate類來實(shí)現(xiàn)的,它簡(jiǎn)化了 JMS 的使用,因?yàn)樵诎l(fā)送或同步接收消息時(shí)它幫我們處理了資源的創(chuàng)建和釋放。從它的作用也不難推測(cè)出,它需要引用我們上面創(chuàng)建的連接工廠,具體代碼如下:
@Bean public JmsTemplate jmsQueueTemplate(){ return new JmsTemplate(connectionFactory()); }
JmsTemplate創(chuàng)建完成后,我們就可以調(diào)用它的方法來發(fā)送消息了。這里有兩個(gè)概念需要注意:
代碼示例:
@Autowired private JmsTemplate jmsQueueTemplate; /** * 發(fā)送原始消息 Message */ public void send(){ jmsQueueTemplate.send("queue1", new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage("我是原始消息"); } }); }
優(yōu)化:當(dāng)然,我們不用每次都通過MessageCreator匿名類的方式來創(chuàng)建Message對(duì)象,JmsTemplate類中提供了對(duì)象實(shí)體自動(dòng)轉(zhuǎn)換為Message對(duì)象的方法,convertAndSend(String destinationName, final Object message)。
優(yōu)化代碼示例:
/** * 發(fā)送消息自動(dòng)轉(zhuǎn)換成原始消息 */ public void convertAndSend(){ jmsQueueTemplate.convertAndSend("queue1", "我是自動(dòng)轉(zhuǎn)換的消息"); }
注:關(guān)于消息轉(zhuǎn)換,還可以通過實(shí)現(xiàn)MessageConverter接口來自定義轉(zhuǎn)換內(nèi)容
講完了消息發(fā)送,我們最后來說說消息是如何接收的。消息既然是以Message對(duì)象的形式發(fā)送到指定的目的地,那么消息的接收勢(shì)必會(huì)去指定的目的地上去接收消息。這里采用的是監(jiān)聽者的方式來監(jiān)聽指定地點(diǎn)的消息,采用注解@JmsListener來設(shè)置監(jiān)聽方法。
代碼示例:
@Component public class Listener1 { @JmsListener(destination = "queue1") public void receive(String msg){ System.out.println("監(jiān)聽到的消息內(nèi)容為: " + msg); } }
有了監(jiān)聽的目標(biāo)和方法后,監(jiān)聽器還得和 MQ 關(guān)聯(lián)起來,這樣才能運(yùn)作起來。這里的監(jiān)聽器可能不止一個(gè),如果每個(gè)都要和 MQ 建立連接,肯定不太合適。所以需要一個(gè)監(jiān)聽容器工廠的概念,即接口JmsListenerContainerFactory,它會(huì)引用上面創(chuàng)建好的與 MQ 的連接工廠,由它來負(fù)責(zé)接收消息以及將消息分發(fā)給指定的監(jiān)聽器。當(dāng)然也包括事務(wù)管理、資源獲取與釋放和異常轉(zhuǎn)換等。
代碼示例:
@Bean public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); //設(shè)置連接數(shù) factory.setConcurrency("3-10"); //重連間隔時(shí)間 factory.setRecoveryInterval(1000L); return factory; }
代碼地址:https://github.com/jasonGeng88/springboot-jms
對(duì) JMS 有了基本的理解后,我們就來在具體的場(chǎng)景中使用一下。
首先,我們需要先啟動(dòng) ActiveMQ,這里我們以 Docker 容器化的方式進(jìn)行啟動(dòng)。
啟動(dòng)命令:
docker run -d -p 8161:8161 -p 61616:61616 --name activemq webcenter/activemq
啟動(dòng)成功后,在 ActiveMQ 可視化界面查看效果(http://localhost:8161):
點(diǎn)對(duì)點(diǎn)模式(單消費(fèi)者)
下面介紹消息隊(duì)列中最常用的一種場(chǎng)景,即點(diǎn)對(duì)點(diǎn)模式?;靖拍钊缦拢?/p>
代碼實(shí)現(xiàn)(為簡(jiǎn)化代碼,部分代碼沿用上面所述): 啟動(dòng)文件(Application.java)
@SpringBootApplication @EnableJms public class Application { ... /** * JMS 隊(duì)列的模板類 * connectionFactory() 為 ActiveMQ 連接工廠 */ @Bean public JmsTemplate jmsQueueTemplate(){ return new JmsTemplate(connectionFactory()); } public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
注解@EnableJms設(shè)置在@Configuration類上,用來聲明對(duì) JMS 注解的支持。
@Component public class PtpProducer { @Autowired private JmsTemplate jmsQueueTemplate; /** * 發(fā)送消息自動(dòng)轉(zhuǎn)換成原始消息 */ public void convertAndSend(){ jmsQueueTemplate.convertAndSend("ptp", "我是自動(dòng)轉(zhuǎn)換的消息"); } }
@RestController @RequestMapping(value = "/ptp") public class PtpController { @Autowired private PtpProducer ptpProducer; @RequestMapping(value = "/convertAndSend") public Object convertAndSend(){ ptpProducer.convertAndSend(); return "success"; } }
@SpringBootApplication @EnableJms public class Application { ... /** * JMS 隊(duì)列的監(jiān)聽容器工廠 */ @Bean(name = "jmsQueueListenerCF") public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); //設(shè)置連接數(shù) factory.setConcurrency("3-10"); //重連間隔時(shí)間 factory.setRecoveryInterval(1000L); return factory; } ... }
@Component public class PtpListener1 { /** * 消息隊(duì)列監(jiān)聽器 * destination 隊(duì)列地址 * containerFactory 監(jiān)聽器容器工廠, 若存在2個(gè)以上的監(jiān)聽容器工廠,需進(jìn)行指定 */ @JmsListener(destination = "ptp", containerFactory = "jmsQueueListenerCF") public void receive(String msg){ System.out.println("點(diǎn)對(duì)點(diǎn)模式1: " + msg); } }
啟動(dòng)項(xiàng)目啟動(dòng)后,通過 REST 接口的方式來調(diào)用消息生產(chǎn)者發(fā)送消息,請(qǐng)求如下:
curl -XGET 127.0.0.1:8080/ptp/convertAndSend
消費(fèi)者控制臺(tái)信息:
ActiveMQ 控制臺(tái)信息:
列表說明:
基于上面一個(gè)消費(fèi)者消費(fèi)的模式,因?yàn)樯a(chǎn)者可能會(huì)有很多,同時(shí)像某個(gè)隊(duì)列發(fā)送消息,這時(shí)一個(gè)消費(fèi)者可能會(huì)成為瓶頸。所以需要多個(gè)消費(fèi)者來分?jǐn)傁M(fèi)壓力(消費(fèi)線程池能解決一定壓力,但畢竟在單機(jī)上,做不到分布式分布,所以多消費(fèi)者是有必要的),也就產(chǎn)生了下面的場(chǎng)景。
添加新的監(jiān)聽器
@Component public class PtpListener2 { @JmsListener(destination = Constant.QUEUE_NAME, containerFactory = "jmsQueueListenerCF") public void receive(String msg){ System.out.println("點(diǎn)對(duì)點(diǎn)模式2: " + msg); } }
演示 這里我們發(fā)起 10 次請(qǐng)求,來觀察消費(fèi)者的消費(fèi)情況:
這里因?yàn)楸O(jiān)聽容器設(shè)置了線程池的緣故,在實(shí)際消費(fèi)過程中,監(jiān)聽器消費(fèi)的順序會(huì)有所差異。
發(fā)布訂閱模式
除了點(diǎn)對(duì)點(diǎn)模式,發(fā)布訂閱模式也是消息隊(duì)列中常見的一種使用。試想一下,有一個(gè)即時(shí)聊天群,你在群里發(fā)送一條消息。所有在這個(gè)群里的人(即訂閱了該群的人),都會(huì)收到你發(fā)送的信息。
基本概念:
代碼實(shí)現(xiàn) 修改 JmsTemplate 模板類,使其支持發(fā)布訂閱功能
@SpringBootApplication @EnableJms public class Application { ... @Bean public JmsTemplate jmsTopicTemplate(){ JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory()); jmsTemplate.setPubSubDomain(true); return jmsTemplate; } ... }
消息生產(chǎn)者(PubSubProducer.java)
@Component public class PtpProducer { @Autowired private JmsTemplate jmsTopicTemplate; public void convertAndSend(){ jmsTopicTemplate.convertAndSend("topic", "我是自動(dòng)轉(zhuǎn)換的消息"); } }
生產(chǎn)者調(diào)用類(PubSubController.java)
@RestController @RequestMapping(value = "/pubsub") public class PtpController { @Autowired private PubSubProducer pubSubProducer; @RequestMapping(value = "/convertAndSend") public String convertAndSend(){ pubSubProducer.convertAndSend(); return "success"; } }
修改 DefaultJmsListenerContainerFactory 類,使其支持發(fā)布訂閱功能
@SpringBootApplication @EnableJms public class Application { ... /** * JMS 隊(duì)列的監(jiān)聽容器工廠 */ @Bean(name = "jmsTopicListenerCF") public DefaultJmsListenerContainerFactory jmsTopicListenerContainerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setConcurrency("1"); factory.setPubSubDomain(true); return factory; } ... }
消息監(jiān)聽器(這里設(shè)置2個(gè)訂閱者)
@Component public class PubSubListener1 { @JmsListener(destination = "topic", containerFactory = "jmsTopicListenerCF") public void receive(String msg){ System.out.println("訂閱者1 - " + msg); } } @Component public class PubSubListener2 { @JmsListener(destination = "topic", containerFactory = "jmsTopicListenerCF") public void receive(String msg){ System.out.println("訂閱者2 - " + msg); } }
演示
curl -XGET 127.0.0.1:8080/pubSub/convertAndSend
消費(fèi)者控制臺(tái)信息:
ActiveMQ 控制臺(tái)信息:
看完了這篇文章,相信你對(duì)“JMS是什么意思”有了一定的了解,如果想了解更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。