您好,登錄后才能下訂單哦!
RocketMQ如何快速入門(mén),相信很多沒(méi)有經(jīng)驗(yàn)的人對(duì)此束手無(wú)策,為此本文總結(jié)了問(wèn)題出現(xiàn)的原因和解決方法,通過(guò)這篇文章希望你能解決這個(gè)問(wèn)題。
本章簡(jiǎn)單講講RocketMQ的入門(mén)操作,消息發(fā)送和消息接收。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.2.0</version> </dependency>
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("producer_test"); producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); producer.start(); for (int i = 0; i < 100; i++) { try { //構(gòu)建消息 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("測(cè)試RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
查看結(jié)果
public static void main(String[] args){ try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup("consumer_test_push"); consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList, ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) { try { for(MessageExt msg : paramList){ String msgbody = new String(msg.getBody(), "utf-8"); System.out.println(" MessageBody: "+ msgbody);//輸出消息內(nèi)容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費(fèi)成功 } }); consumer.start(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }
查看結(jié)果
看到消費(fèi)的結(jié)果大家可能有疑問(wèn),我們生產(chǎn)消息的時(shí)候是按照順序生產(chǎn)的消息,消費(fèi)時(shí)候?yàn)槭裁床皇琼樞蛳M(fèi)下來(lái)的。
MQ消息的無(wú)序性,每個(gè)主題對(duì)應(yīng)多個(gè)隊(duì)列,生產(chǎn)消息時(shí)是根據(jù)算法放置不同的隊(duì)列中,消費(fèi)則就是無(wú)序了(有序消息后面討論)
也有可能出現(xiàn)一條消息被消費(fèi)了多次,RocketMQ的目標(biāo)就是不丟數(shù)據(jù),<u>每條消息至少發(fā)送一次</u>,內(nèi)部通過(guò)ACK的確認(rèn)機(jī)制實(shí)現(xiàn)的后面會(huì)重點(diǎn)討論
為了方便的查看消息的詳情我們可以通過(guò)消息的管控臺(tái)更好的管理和查看消息詳情,當(dāng)然我們也可以通過(guò)后臺(tái)的提供的命令來(lái)為運(yùn)維提供更多的管理。
RocketMQ-Console地址: https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
可以直接下載到本地之后通過(guò)mavne進(jìn)行編譯獲取jar,該項(xiàng)目是SpringBoot項(xiàng)目
mvn clean package -Dmaven.test.skip=true java -jar target/rocketmq-console-ng-1.0.0.jar
丟到linux服務(wù)器上啟動(dòng)
(1)啟動(dòng)時(shí)設(shè)置具體的RocketMQ的參數(shù)
java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=10.10.12.203:9876;10.10.12.204:9876
(2)直接修改rocketmq-console-ng-1.0.0.jar中的配置文件,找到rocketmq-console-ng-1.0.0.jar\BOOT-INF\classes\application.properties文件,根據(jù)自己的NamesrvAddr進(jìn)行修改rocketmq.config.namesrvAddr的值,默認(rèn)端口12581
瀏覽器登錄查看控制臺(tái)信息
查看RocketMQ集群的節(jié)點(diǎn)信息
根據(jù)主題時(shí)間段查詢消息
查看某條消息的具體信息
管控臺(tái)提供了很多運(yùn)維功能能極大的提高我們的運(yùn)維效率,里面的功能包括創(chuàng)建主題、修改主題、發(fā)送消息、對(duì)消費(fèi)者的信息進(jìn)行查看等功能我們不一一介紹,可以簡(jiǎn)單的了解使用。
看完上述內(nèi)容,你們掌握RocketMQ如何快速入門(mén)的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。