溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

如何進(jìn)行ActiveMQ的簡(jiǎn)單入門與使用

發(fā)布時(shí)間:2021-11-25 13:12:54 來源:億速云 閱讀:193 作者:柒染 欄目:開發(fā)技術(shù)

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)碛嘘P(guān)如何進(jìn)行ActiveMQ的簡(jiǎn)單入門與使用,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

一、什么是消息中間件

消息中間件顧名思義實(shí)現(xiàn)的就是在兩個(gè)系統(tǒng)或兩個(gè)客戶端之間進(jìn)行消息傳送

如何進(jìn)行ActiveMQ的簡(jiǎn)單入門與使用

二、什么是ActiveMQ

ActiveMQ是一種開源的基于JMS(Java Message Servie)規(guī)范的一種消息中間件的實(shí)現(xiàn),ActiveMQ的設(shè)計(jì)目標(biāo)是提供標(biāo)準(zhǔn)的,面向消息的,能夠跨越多語(yǔ)言和多系統(tǒng)的應(yīng)用集成消息通信中間件。

三、什么時(shí)候需要用ActiveMQ

ActiveMQ常被應(yīng)用與系統(tǒng)業(yè)務(wù)的解耦,異步消息的推送,增加系統(tǒng)并發(fā)量,提高用戶體驗(yàn)。例如以我在工作中的使用,在比較耗時(shí)且異步的遠(yuǎn)程開鎖操作時(shí)

如何進(jìn)行ActiveMQ的簡(jiǎn)單入門與使用

四、如何使用ActiveMQ

1.AcitveMQ的數(shù)據(jù)傳送流程

如何進(jìn)行ActiveMQ的簡(jiǎn)單入門與使用

2.ActiveMQ的兩種消息傳遞類型

(1)點(diǎn)對(duì)點(diǎn)傳輸,即一個(gè)生產(chǎn)者對(duì)應(yīng)一個(gè)消費(fèi)者,生產(chǎn)者向broke推送數(shù)據(jù),數(shù)據(jù)存儲(chǔ)在broke的一個(gè)隊(duì)列中,當(dāng)消費(fèi)者接受該條隊(duì)列里的數(shù)據(jù)。

(2)基于發(fā)布/訂閱模式的傳輸,即根據(jù)訂閱話題來接收相應(yīng)數(shù)據(jù),一個(gè)生產(chǎn)者可向多個(gè)消費(fèi)者推送數(shù)據(jù),與MQTT協(xié)議的實(shí)現(xiàn)是類似的,對(duì)MQTT協(xié)議有興趣的可跳轉(zhuǎn)到https://www.cnblogs.com/xiguadadage/p/11216463.html

兩種消息傳遞類型的不同,點(diǎn)對(duì)點(diǎn)傳輸消費(fèi)者可以接收到在連接之前生產(chǎn)者所推送的數(shù)據(jù),而基于發(fā)布/訂閱模式的傳輸方式消費(fèi)者只能接收到連接之后生產(chǎn)者推送的數(shù)據(jù)。

3.ActiveMQ的安裝與啟動(dòng)

(1)官網(wǎng)下載對(duì)應(yīng)服務(wù)器版本

如何進(jìn)行ActiveMQ的簡(jiǎn)單入門與使用

(2)解壓后進(jìn)入apache-activemq-5.15.9/bin目錄

(3)執(zhí)行./activemq start啟動(dòng)ActiveMQ

如何進(jìn)行ActiveMQ的簡(jiǎn)單入門與使用

(4)瀏覽器輸入ActiveMQ啟動(dòng)的服務(wù)器ip:8161便可進(jìn)入web界面,點(diǎn)擊Manage ActiveMQ broker可以查看消息推送的狀態(tài),默認(rèn)賬號(hào)密碼為admin,admin

如何進(jìn)行ActiveMQ的簡(jiǎn)單入門與使用

(5)啟動(dòng)錯(cuò)誤分析

進(jìn)入/root/apache-activemq-5.15.9/data目錄查看activemq.log文件,根據(jù)錯(cuò)誤提示信息修改,例如端口號(hào)被占用等。

4.ActiveMQ的代碼測(cè)試

(1)構(gòu)建maven項(xiàng)目,引入依賴

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.9.0</version>
        </dependency>

(2)生產(chǎn)者類

/**
 * @Description 生產(chǎn)者
 * @Date 2019/7/20
 * @Created by yqh
 */
public class MyProducer {

    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";

    public static void main(String[] args) throws JMSException {
        // 創(chuàng)建連接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 創(chuàng)建連接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打開連接
        connection.start();
        // 創(chuàng)建會(huì)話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 創(chuàng)建隊(duì)列目標(biāo),并標(biāo)識(shí)隊(duì)列名稱,消費(fèi)者根據(jù)隊(duì)列名稱接收數(shù)據(jù)
        Destination destination = session.createQueue("myQueue");
        // 創(chuàng)建一個(gè)生產(chǎn)者
        MessageProducer producer = session.createProducer(destination);
        // 向隊(duì)列推送10個(gè)文本消息數(shù)據(jù)
        for (int i = 1 ; i <= 10 ; i++){
            // 創(chuàng)建文本消息
            TextMessage message = session.createTextMessage("第" + i + "個(gè)文本消息");
            //發(fā)送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("已發(fā)送的消息:" + message.getText());
        }
        //關(guān)閉連接
        connection.close();
    }

}

運(yùn)行結(jié)果:

已發(fā)送的消息:第1個(gè)文本消息

已發(fā)送的消息:第2個(gè)文本消息

已發(fā)送的消息:第3個(gè)文本消息

已發(fā)送的消息:第4個(gè)文本消息

已發(fā)送的消息:第5個(gè)文本消息

已發(fā)送的消息:第6個(gè)文本消息

已發(fā)送的消息:第7個(gè)文本消息

已發(fā)送的消息:第8個(gè)文本消息

已發(fā)送的消息:第9個(gè)文本消息

已發(fā)送的消息:第10個(gè)文本消息

測(cè)試查看web后臺(tái)顯示,有10條消息在隊(duì)列中等待消費(fèi)

如何進(jìn)行ActiveMQ的簡(jiǎn)單入門與使用

(3)消費(fèi)者類

/**
 * @Description 消費(fèi)者類
 * @Date 2019/7/20 0020
 * @Created by yqh
 */
public class MyConsumer {

    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";

    public static void main(String[] args) throws JMSException {
        // 創(chuàng)建連接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 創(chuàng)建連接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打開連接
        connection.start();
        // 創(chuàng)建會(huì)話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 創(chuàng)建隊(duì)列目標(biāo),并標(biāo)識(shí)隊(duì)列名稱,消費(fèi)者根據(jù)隊(duì)列名稱接收數(shù)據(jù)
        Destination destination = session.createQueue("myQueue");
        // 創(chuàng)建消費(fèi)者
        MessageConsumer consumer = session.createConsumer(destination);
        // 創(chuàng)建消費(fèi)的監(jiān)聽
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消費(fèi)的消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

測(cè)試結(jié)果:

消費(fèi)的消息:第1個(gè)文本消息

消費(fèi)的消息:第2個(gè)文本消息

消費(fèi)的消息:第3個(gè)文本消息

消費(fèi)的消息:第4個(gè)文本消息

消費(fèi)的消息:第5個(gè)文本消息

消費(fèi)的消息:第6個(gè)文本消息

消費(fèi)的消息:第7個(gè)文本消息

消費(fèi)的消息:第8個(gè)文本消息

消費(fèi)的消息:第9個(gè)文本消息

消費(fèi)的消息:第10個(gè)文本消息

web后臺(tái)顯示有一個(gè)消費(fèi)者處于連接狀態(tài),且已消費(fèi)了10個(gè)message,而該條隊(duì)列已沒有message待消費(fèi)了

如何進(jìn)行ActiveMQ的簡(jiǎn)單入門與使用

(4)當(dāng)我們運(yùn)行兩個(gè)消費(fèi)者類,消息又是怎么被消費(fèi)的呢?是兩個(gè)消費(fèi)者都能收到生產(chǎn)者生產(chǎn)的message,還是只有其中一個(gè)消費(fèi)者能消費(fèi)呢?

我們先運(yùn)行兩個(gè)消費(fèi)者,在運(yùn)行一個(gè)生產(chǎn)者對(duì)目標(biāo)隊(duì)列生產(chǎn)10個(gè)message,會(huì)發(fā)現(xiàn)有以下情況

// Consumer1控制臺(tái)

消費(fèi)的消息:第1個(gè)文本消息

消費(fèi)的消息:第3個(gè)文本消息

消費(fèi)的消息:第5個(gè)文本消息

消費(fèi)的消息:第7個(gè)文本消息

消費(fèi)的消息:第9個(gè)文本消息

// Consumer2控制臺(tái)

消費(fèi)的消息:第2個(gè)文本消息

消費(fèi)的消息:第4個(gè)文本消息

消費(fèi)的消息:第6個(gè)文本消息

消費(fèi)的消息:第8個(gè)文本消息

消費(fèi)的消息:第10個(gè)文本消息

即隊(duì)列中的數(shù)據(jù)會(huì)平均的分給每一個(gè)消費(fèi)者消費(fèi),且每一條數(shù)據(jù)只能被消費(fèi)一次

(5)以上是基于隊(duì)列點(diǎn)對(duì)點(diǎn)的傳輸類型,以下是基于發(fā)布/訂閱模式傳輸?shù)念愋蜏y(cè)試

/**
 * @Description 基于發(fā)布/訂閱模式傳輸類型的生產(chǎn)者測(cè)試
 * @Date 2019/7/20 0020
 * @Created by yqh
 */
public class MyProducerForTopic {

    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";

    public static void main(String[] args) throws JMSException {
        // 創(chuàng)建連接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 創(chuàng)建連接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打開連接
        connection.start();
        // 創(chuàng)建會(huì)話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 創(chuàng)建隊(duì)列目標(biāo),并標(biāo)識(shí)隊(duì)列名稱,消費(fèi)者根據(jù)隊(duì)列名稱接收數(shù)據(jù)
        Destination destination = session.createTopic("topicTest");
        // 創(chuàng)建一個(gè)生產(chǎn)者
        MessageProducer producer = session.createProducer(destination);
        // 向隊(duì)列推送10個(gè)文本消息數(shù)據(jù)
        for (int i = 1 ; i <= 10 ; i++){
            // 創(chuàng)建文本消息
            TextMessage message = session.createTextMessage("第" + i + "個(gè)文本消息");
            //發(fā)送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("已發(fā)送的消息:" + message.getText());
        }
        //關(guān)閉連接
        connection.close();
    }

}
/**
 * @Description 基于發(fā)布/訂閱模式傳輸類型的消費(fèi)者測(cè)試
 * @Date 2019/7/20 0020
 * @Created by yqh
 */
public class MyConsumerForTopic {

    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";

    public static void main(String[] args) throws JMSException {
        // 創(chuàng)建連接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 創(chuàng)建連接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打開連接
        connection.start();
        // 創(chuàng)建會(huì)話
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 創(chuàng)建隊(duì)列目標(biāo),并標(biāo)識(shí)隊(duì)列名稱,消費(fèi)者根據(jù)隊(duì)列名稱接收數(shù)據(jù)
        Destination destination = session.createTopic("topicTest");
        // 創(chuàng)建消費(fèi)者
        MessageConsumer consumer = session.createConsumer(destination);
        // 創(chuàng)建消費(fèi)的監(jiān)聽
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消費(fèi)的消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

現(xiàn)在如果我們先啟動(dòng)生產(chǎn)者,再啟動(dòng)消費(fèi)者,會(huì)發(fā)現(xiàn)消費(fèi)者是無法接收到之前生產(chǎn)者之前所生產(chǎn)的數(shù)據(jù),只有消費(fèi)者先啟動(dòng),再讓生產(chǎn)者消費(fèi)才可以正常接收數(shù)據(jù),這也是發(fā)布/訂閱的主題模式與點(diǎn)對(duì)點(diǎn)的隊(duì)列模式的一個(gè)明顯區(qū)別。

而如果啟動(dòng)兩個(gè)消費(fèi)者,那么每一個(gè)消費(fèi)者都能完整的接收到生產(chǎn)者生產(chǎn)的數(shù)據(jù),即每一條數(shù)據(jù)都被消費(fèi)了兩次,這是發(fā)布/訂閱的主題模式與點(diǎn)對(duì)點(diǎn)的隊(duì)列模式的另一個(gè)明顯區(qū)別。

上述就是小編為大家分享的如何進(jìn)行ActiveMQ的簡(jiǎn)單入門與使用了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI