您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“RocketMQ普通消息同步發(fā)送怎么實(shí)現(xiàn)”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
同步消息是指發(fā)送出消息后,同步等待,直到接收到Broker發(fā)送成功的響應(yīng)才會(huì)繼續(xù)發(fā)送下一個(gè)消息。這個(gè)方式可以確保消息發(fā)送到Broker成功,一些重要的消息可以使用此方式,比如重要的通知。
public static void main(String[] args) throws Exception { //實(shí)例化消息生產(chǎn)者對(duì)象 DefaultMQProducer producer = new DefaultMQProducer("group_luke"); //設(shè)置NameSever地址 producer.setNamesrvAddr("127.0.0.1:9876"); //啟動(dòng)Producer實(shí)例 producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8)); //同步發(fā)送方式 SendResult send = producer.send(msg); //確認(rèn)返回 System.out.println(send); } //關(guān)閉producer producer.shutdown(); }
異步消息發(fā)送方在發(fā)送了一條消息后,不等接收方發(fā)回響應(yīng),接著進(jìn)行第二條消息發(fā)送。發(fā)送方通過回調(diào)接口的方式接收服務(wù)器響應(yīng),并對(duì)響應(yīng)結(jié)果進(jìn)行處理。
public static void main(String[] args) throws Exception { //實(shí)例化消息生產(chǎn)者對(duì)象 DefaultMQProducer producer = new DefaultMQProducer("group_luke"); //設(shè)置NameSever地址 producer.setNamesrvAddr("127.0.0.1:9876"); //啟動(dòng)Producer實(shí)例 producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8)); //SendCallback會(huì)接收異步返回結(jié)果的回調(diào) producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable throwable) { throwable.printStackTrace(); } }); } //若是過早關(guān)閉producer,會(huì)拋出The producer service state not OK, SHUTDOWN_ALREADY的錯(cuò) Thread.sleep(10000); //關(guān)閉producer producer.shutdown(); }
單項(xiàng)發(fā)送不關(guān)心發(fā)送的結(jié)果,只發(fā)送請(qǐng)求不等待應(yīng)答。發(fā)送消息耗時(shí)極短。
public static void main(String[] args) throws Exception { //實(shí)例化消息生產(chǎn)者對(duì)象 DefaultMQProducer producer = new DefaultMQProducer("group_luke"); //設(shè)置NameSever地址 producer.setNamesrvAddr("127.0.0.1:9876"); //啟動(dòng)Producer實(shí)例 producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8)); //同步發(fā)送方式 producer.sendOneway(msg); } //關(guān)閉producer producer.shutdown(); }
消費(fèi)者采用負(fù)載均衡的方式消費(fèi)消息,同一個(gè)Group下的多個(gè)Consumer共同消費(fèi)Queue里的Message,每個(gè)Consumer處理的消息不同。
一個(gè)Consumer Group中的各個(gè)Consumer實(shí)例分共同消費(fèi)消息,即一條消息只會(huì)投遞到一個(gè)Group下面的一個(gè)實(shí)例,并且只消費(fèi)一遍。
例如某個(gè)Topic有3個(gè)隊(duì)列,其中一個(gè)Consumer Group 有 3 個(gè)實(shí)例,那么每個(gè)實(shí)例只消費(fèi)其中的1個(gè)隊(duì)列。集群消費(fèi)模式是消費(fèi)者默認(rèn)的消費(fèi)方式。
public static void main(String[] args) throws Exception { //實(shí)例化消息消費(fèi)者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke"); //指定nameserver地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //訂閱topic,"*"表示所有tag consumer.subscribe("topic_luke","*"); consumer.setMessageModel(MessageModel.CLUSTERING); // 注冊(cè)回調(diào)實(shí)現(xiàn)類來處理從broker拉取回來的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @SneakyThrows @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } // 標(biāo)記該消息已經(jīng)被成功消費(fèi) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動(dòng)消費(fèi)者實(shí)例 consumer.start(); System.out.printf("Consumer Started.%n"); }
廣播消費(fèi)模式中把消息對(duì)一個(gè)Group下的各個(gè)Consumer實(shí)例都投遞一遍。也就是說消息也會(huì)被 Group 中的每個(gè)Consumer都消費(fèi)一次。
實(shí)際上,是一個(gè)消費(fèi)組下的每個(gè)消費(fèi)者實(shí)例都獲取到了topic下面的每個(gè)Message Queue去拉取消費(fèi)。所以消息會(huì)投遞到每個(gè)消費(fèi)者實(shí)例。
public static void main(String[] args) throws Exception { //實(shí)例化消息消費(fèi)者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke"); //指定nameserver地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //訂閱topic,"*"表示所有tag consumer.subscribe("topic_luke","*"); consumer.setMessageModel(MessageModel.BROADCASTING); // 注冊(cè)回調(diào)實(shí)現(xiàn)類來處理從broker拉取回來的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @SneakyThrows @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } // 標(biāo)記該消息已經(jīng)被成功消費(fèi) return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啟動(dòng)消費(fèi)者實(shí)例 consumer.start(); System.out.printf("Consumer Started.%n"); }
“RocketMQ普通消息同步發(fā)送怎么實(shí)現(xiàn)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。