溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

SpringBoot如何整合RabbitMQ實(shí)現(xiàn)死信交換機(jī)

發(fā)布時(shí)間:2022-06-13 09:52:19 來源:億速云 閱讀:158 作者:zzz 欄目:開發(fā)技術(shù)

本篇內(nèi)容介紹了“SpringBoot如何整合RabbitMQ實(shí)現(xiàn)死信交換機(jī)”的有關(guān)知識,在實(shí)際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

環(huán)境

Windows10,IDEA,otp_win64_25.0,rabbitmq-server-3.10.4
1.雙擊C:\Program Files\RabbitMQ Server\rabbitmq_server-3.10.4\sbin\rabbitmq-server.bat啟動MQ服務(wù)
2.然后訪問http://localhost:15672/,默認(rèn)賬號密碼均為guest,
3.手動添加一個虛擬主機(jī)為admin_host,手動創(chuàng)建一個用戶賬號密碼均為admin

pom.xml

        <!-- RabbitMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.7.0</version>
        </dependency>

配置

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin
    virtual-host: admin_host
    publisher-confirm-type: correlated
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          enabled: true    #開啟失敗重試
          max-attempts: 3    #最大重試次數(shù)
          initial-interval: 1000  #重試間隔時(shí)間 毫秒

配置文件

RabbitConfig

package com.example.rabitmqdemo.mydemo.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;


/**
 * Broker:它提供一種傳輸服務(wù),它的角色就是維護(hù)一條從生產(chǎn)者到消費(fèi)者的路線,保證數(shù)據(jù)能按照指定的方式進(jìn)行傳輸,
 * Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個隊(duì)列。
 * Queue:消息的載體,每個消息都會被投到一個或多個隊(duì)列。
 * Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來.
 * Routing Key:路由關(guān)鍵字,exchange根據(jù)這個關(guān)鍵字進(jìn)行消息投遞。
 * vhost:虛擬主機(jī),一個broker里可以有多個vhost,用作不同用戶的權(quán)限分離。
 * Producer:消息生產(chǎn)者,就是投遞消息的程序.
 * Consumer:消息消費(fèi)者,就是接受消息的程序.
 * Channel:消息通道,在客戶端的每個連接里,可建立多個channel.
 */
@Slf4j
@Component
public class RabbitConfig {
    //業(yè)務(wù)交換機(jī)
    public static final String EXCHANGE_PHCP = "phcp";
    //業(yè)務(wù)隊(duì)列1
    public static final String QUEUE_COMPANY = "company";
    //業(yè)務(wù)隊(duì)列1的key
    public static final String ROUTINGKEY_COMPANY = "companyKey";
    //業(yè)務(wù)隊(duì)列2
    public static final String QUEUE_PROJECT = "project";
    //業(yè)務(wù)隊(duì)列2的key
    public static final String ROUTINGKEY_PROJECT = "projectKey";

    //死信交換機(jī)
    public static final String EXCHANGE_PHCP_DEAD = "phcp_dead";
    //死信隊(duì)列1
    public static final String QUEUE_COMPANY_DEAD = "company_dead";
    //死信隊(duì)列2
    public static final String QUEUE_PROJECT_DEAD = "project_dead";
    //死信隊(duì)列1的key
    public static final String ROUTINGKEY_COMPANY_DEAD = "companyKey_dead";
    //死信隊(duì)列2的key
    public static final String ROUTINGKEY_PROJECT_DEAD = "projectKey_dead";


//    /**
//     * 解決重復(fù)確認(rèn)報(bào)錯問題,如果沒有報(bào)錯的話,就不用啟用這個
//     *
//     * @param connectionFactory
//     * @return
//     */
//    @Bean
//    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//        factory.setConnectionFactory(connectionFactory);
//        factory.setMessageConverter(new Jackson2JsonMessageConverter());
//        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//        return factory;
//    }

    /**
     * 聲明業(yè)務(wù)交換機(jī)
     * 1. 設(shè)置交換機(jī)類型
     * 2. 將隊(duì)列綁定到交換機(jī)
     * FanoutExchange: 將消息分發(fā)到所有的綁定隊(duì)列,無routingkey的概念
     * HeadersExchange :通過添加屬性key-value匹配
     * DirectExchange:按照routingkey分發(fā)到指定隊(duì)列
     * TopicExchange:多關(guān)鍵字匹配
     */
    @Bean("exchangePhcp")
    public DirectExchange exchangePhcp() {
        return new DirectExchange(EXCHANGE_PHCP);
    }

     * 聲明死信交換機(jī)
    @Bean("exchangePhcpDead")
    public DirectExchange exchangePhcpDead() {
        return new DirectExchange(EXCHANGE_PHCP_DEAD);

     * 聲明業(yè)務(wù)隊(duì)列1
     *
     * @return
    @Bean("queueCompany")
    public Queue queueCompany() {
        Map<String,Object> arguments = new HashMap<>(2);
        arguments.put("x-dead-letter-exchange",EXCHANGE_PHCP_DEAD);
        //綁定該隊(duì)列到死信交換機(jī)的隊(duì)列1
        arguments.put("x-dead-letter-routing-key",ROUTINGKEY_COMPANY_DEAD);
        return QueueBuilder.durable(QUEUE_COMPANY).withArguments(arguments).build();
     * 聲明業(yè)務(wù)隊(duì)列2
    @Bean("queueProject")
    public Queue queueProject() {
        //綁定該隊(duì)列到死信交換機(jī)的隊(duì)列2
        arguments.put("x-dead-letter-routing-key",ROUTINGKEY_PROJECT_DEAD);
        return QueueBuilder.durable(QUEUE_PROJECT).withArguments(arguments).build();

     * 聲明死信隊(duì)列1
    @Bean("queueCompanyDead")
    public Queue queueCompanyDead() {
        return new Queue(QUEUE_COMPANY_DEAD);
     * 聲明死信隊(duì)列2
    @Bean("queueProjectDead")
    public Queue queueProjectDead() {
        return new Queue(QUEUE_PROJECT_DEAD);

     * 綁定業(yè)務(wù)隊(duì)列1和業(yè)務(wù)交換機(jī)
     * @param queue
     * @param directExchange
    @Bean
    public Binding bindingQueueCompany(@Qualifier("queueCompany") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY);

     * 綁定業(yè)務(wù)隊(duì)列2和業(yè)務(wù)交換機(jī)
    public Binding bindingQueueProject(@Qualifier("queueProject") Queue queue, @Qualifier("exchangePhcp") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT);

     * 綁定死信隊(duì)列1和死信交換機(jī)
    public Binding bindingQueueCompanyDead(@Qualifier("queueCompanyDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_COMPANY_DEAD);

     * 綁定死信隊(duì)列2和死信交換機(jī)
    public Binding bindingQueueProjectDead(@Qualifier("queueProjectDead") Queue queue, @Qualifier("exchangePhcpDead") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitConfig.ROUTINGKEY_PROJECT_DEAD);
}

生產(chǎn)者

RabbltProducer

package com.example.rabitmqdemo.mydemo.producer;
import com.example.rabitmqdemo.mydemo.config.RabbitConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Component
@Slf4j
public class RabbltProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
    @Resource
    private RabbitTemplate rabbitTemplate;
    /**
     * 初始化消息確認(rèn)函數(shù)
     */
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
        rabbitTemplate.setMandatory(true);
    }
    /**
     * 發(fā)送消息服務(wù)器確認(rèn)函數(shù)
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息發(fā)送成功" + correlationData);
        } else {
            System.out.println("消息發(fā)送失敗:" + cause);
        }
    }
    /**
     * 消息發(fā)送失敗,消息回調(diào)函數(shù)
     * @param returnedMessage
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        String str = new String(returnedMessage.getMessage().getBody());
        System.out.println("消息發(fā)送失敗:" + str);
    }
    /**
     * 處理消息發(fā)送到隊(duì)列1
     * @param str
     */
    public void sendCompany(String str){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,message,correlationData);
        //也可以用下面的方式
        //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_COMPANY,str,correlationData);
    }
    /**
     * 處理消息發(fā)送到隊(duì)列2
     * @param str
     */
    public void sendProject(String str){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("application/json");
        Message message = new Message(str.getBytes(StandardCharsets.UTF_8),messageProperties);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,message,correlationData);
        //也可以用下面的方式
        //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //this.rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_PHCP,RabbitConfig.ROUTINGKEY_PROJECT,str,correlationData);
    }
}

業(yè)務(wù)消費(fèi)者

RabbitConsumer

package com.example.rabitmqdemo.mydemo.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
 * 監(jiān)聽業(yè)務(wù)交換機(jī)
 * @author JeWang
 */
@Component
@Slf4j
public class RabbitConsumer {
    /**
     * 監(jiān)聽業(yè)務(wù)隊(duì)列1
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "company")
    public void company(Message message, Channel channel) throws IOException {
        try{
            System.out.println("次數(shù)" + message.getMessageProperties().getDeliveryTag());
            channel.basicQos(1);
            Thread.sleep(2000);
            String s = new String(message.getBody());
            log.info("處理消息"+s);
            //下面兩行是嘗試手動拋出異常,用來測試重試次數(shù)和發(fā)送到死信交換機(jī)
            //String str = null;
            //str.split("1");
            //處理成功,確認(rèn)應(yīng)答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("處理消息時(shí)發(fā)生異常:"+e.getMessage());
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            if(redelivered){
                log.error("異常重試次數(shù)已到達(dá)設(shè)置次數(shù),將發(fā)送到死信交換機(jī)");
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            }else {
                log.error("消息即將返回隊(duì)列處理重試");
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
    /**
     * 監(jiān)聽業(yè)務(wù)隊(duì)列2
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "project")
    public void project(Message message, Channel channel) throws IOException {
        try{
            System.out.println("次數(shù)" + message.getMessageProperties().getDeliveryTag());
            channel.basicQos(1);
            Thread.sleep(2000);
            String s = new String(message.getBody());
            log.info("處理消息"+s);
            //下面兩行是嘗試手動拋出異常,用來測試重試次數(shù)和發(fā)送到死信交換機(jī)
            //String str = null;
            //str.split("1");
            //處理成功,確認(rèn)應(yīng)答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("處理消息時(shí)發(fā)生異常:"+e.getMessage());
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            if(redelivered){
                log.error("異常重試次數(shù)已到達(dá)設(shè)置次數(shù),將發(fā)送到死信交換機(jī)");
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            }else {
                log.error("消息即將返回隊(duì)列處理重試");
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

死信消費(fèi)者

RabbitConsumer

package com.example.rabitmqdemo.mydemo.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
 * 監(jiān)聽死信交換機(jī)
 * @author JeWang
 */
@Component
@Slf4j
public class RabbitConsumerDead {
    /**
     * 處理死信隊(duì)列1
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "company_dead")
    public void company_dead(Message message, Channel channel) throws IOException {
        try{
            channel.basicQos(1);
            String s = new String(message.getBody());
            log.info("處理死信"+s);
            //在此處記錄到數(shù)據(jù)庫、報(bào)警之類的操作
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("接收異常:"+e.getMessage());
        }
    }
    /**
     * 處理死信隊(duì)列2
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "project_dead")
    public void project_dead(Message message, Channel channel) throws IOException {
        try{
            channel.basicQos(1);
            String s = new String(message.getBody());
            log.info("處理死信"+s);
            //在此處記錄到數(shù)據(jù)庫、報(bào)警之類的操作
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("接收異常:"+e.getMessage());
        }
    }
}

測試

MqController

package com.example.rabitmqdemo.mydemo.controller;
import com.example.rabitmqdemo.mydemo.producer.RabbltProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RequestMapping("/def")
@RestController
@Slf4j
public class MsgController {
    @Resource
    private RabbltProducer rabbltProducer;
    
    @RequestMapping("/handleCompany")
    public void handleCompany(@RequestBody String jsonStr){
        rabbltProducer.sendCompany(jsonStr);
    }
}

“SpringBoot如何整合RabbitMQ實(shí)現(xiàn)死信交換機(jī)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI