溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

RocketMQ普通消息同步發(fā)送怎么實(shí)現(xiàn)

發(fā)布時(shí)間:2022-08-23 11:17:37 來源:億速云 閱讀:124 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容介紹了“RocketMQ普通消息同步發(fā)送怎么實(shí)現(xiàn)”的有關(guān)知識(shí),在實(shí)際案例的操作過程中,不少人都會(huì)遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

普通消息同步發(fā)送

同步消息是指發(fā)送出消息后,同步等待,直到接收到Broker發(fā)送成功的響應(yīng)才會(huì)繼續(xù)發(fā)送下一個(gè)消息。這個(gè)方式可以確保消息發(fā)送到Broker成功,一些重要的消息可以使用此方式,比如重要的通知。

public static void main(String[] args) throws Exception {
    //實(shí)例化消息生產(chǎn)者對(duì)象
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //設(shè)置NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //啟動(dòng)Producer實(shí)例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8));
        //同步發(fā)送方式
        SendResult send = producer.send(msg);
        //確認(rèn)返回
        System.out.println(send);
    }
    //關(guān)閉producer
    producer.shutdown();
}

普通消息異步發(fā)送

異步消息發(fā)送方在發(fā)送了一條消息后,不等接收方發(fā)回響應(yīng),接著進(jìn)行第二條消息發(fā)送。發(fā)送方通過回調(diào)接口的方式接收服務(wù)器響應(yīng),并對(duì)響應(yīng)結(jié)果進(jìn)行處理。

public static void main(String[] args) throws Exception {
    //實(shí)例化消息生產(chǎn)者對(duì)象
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //設(shè)置NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //啟動(dòng)Producer實(shí)例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8));
        //SendCallback會(huì)接收異步返回結(jié)果的回調(diào)
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }
            @Override
            public void onException(Throwable throwable) {
                throwable.printStackTrace();
            }
        });
    }
    //若是過早關(guān)閉producer,會(huì)拋出The producer service state not OK, SHUTDOWN_ALREADY的錯(cuò)
    Thread.sleep(10000);
    //關(guān)閉producer
    producer.shutdown();
}

普通消息單向發(fā)送

單項(xiàng)發(fā)送不關(guān)心發(fā)送的結(jié)果,只發(fā)送請(qǐng)求不等待應(yīng)答。發(fā)送消息耗時(shí)極短。

public static void main(String[] args) throws Exception {
    //實(shí)例化消息生產(chǎn)者對(duì)象
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //設(shè)置NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //啟動(dòng)Producer實(shí)例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("這是第"+i+"條消息。").getBytes(StandardCharsets.UTF_8));
        //同步發(fā)送方式
        producer.sendOneway(msg);
    }
    //關(guān)閉producer
    producer.shutdown();
}

集群消費(fèi)模式

消費(fèi)者采用負(fù)載均衡的方式消費(fèi)消息,同一個(gè)Group下的多個(gè)Consumer共同消費(fèi)Queue里的Message,每個(gè)Consumer處理的消息不同。

一個(gè)Consumer Group中的各個(gè)Consumer實(shí)例分共同消費(fèi)消息,即一條消息只會(huì)投遞到一個(gè)Group下面的一個(gè)實(shí)例,并且只消費(fèi)一遍。

例如某個(gè)Topic有3個(gè)隊(duì)列,其中一個(gè)Consumer Group 有 3 個(gè)實(shí)例,那么每個(gè)實(shí)例只消費(fèi)其中的1個(gè)隊(duì)列。集群消費(fèi)模式是消費(fèi)者默認(rèn)的消費(fèi)方式。

public static void main(String[] args) throws Exception {
    //實(shí)例化消息消費(fèi)者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
    //指定nameserver地址
    consumer.setNamesrvAddr("127.0.0.1:9876");
    //訂閱topic,"*"表示所有tag
    consumer.subscribe("topic_luke","*");
    consumer.setMessageModel(MessageModel.CLUSTERING);
    // 注冊(cè)回調(diào)實(shí)現(xiàn)類來處理從broker拉取回來的消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @SneakyThrows
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }
            // 標(biāo)記該消息已經(jīng)被成功消費(fèi)
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 啟動(dòng)消費(fèi)者實(shí)例
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

廣播消費(fèi)模式

廣播消費(fèi)模式中把消息對(duì)一個(gè)Group下的各個(gè)Consumer實(shí)例都投遞一遍。也就是說消息也會(huì)被 Group 中的每個(gè)Consumer都消費(fèi)一次。

實(shí)際上,是一個(gè)消費(fèi)組下的每個(gè)消費(fèi)者實(shí)例都獲取到了topic下面的每個(gè)Message Queue去拉取消費(fèi)。所以消息會(huì)投遞到每個(gè)消費(fèi)者實(shí)例。

public static void main(String[] args) throws Exception {
    //實(shí)例化消息消費(fèi)者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
    //指定nameserver地址
    consumer.setNamesrvAddr("127.0.0.1:9876");
    //訂閱topic,"*"表示所有tag
    consumer.subscribe("topic_luke","*");
    consumer.setMessageModel(MessageModel.BROADCASTING);
    // 注冊(cè)回調(diào)實(shí)現(xiàn)類來處理從broker拉取回來的消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @SneakyThrows
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }
            // 標(biāo)記該消息已經(jīng)被成功消費(fèi)
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 啟動(dòng)消費(fèi)者實(shí)例
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

“RocketMQ普通消息同步發(fā)送怎么實(shí)現(xiàn)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI