溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

RabbitMQ用多路由,多隊(duì)列來(lái)破除流控

發(fā)布時(shí)間:2021-06-22 14:34:35 來(lái)源:億速云 閱讀:183 作者:chen 欄目:大數(shù)據(jù)

本篇內(nèi)容主要講解“RabbitMQ用多路由,多隊(duì)列來(lái)破除流控”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“RabbitMQ用多路由,多隊(duì)列來(lái)破除流控”吧!

流控機(jī)制是我們?cè)谑褂肦abbitMQ最頭疼的問(wèn)題,一旦并發(fā)激增時(shí),消費(fèi)者消費(fèi)隊(duì)列消息就像滴水一樣慢。

現(xiàn)在我們下單后,需要給通知中心發(fā)送消息,讓通知中心通知服務(wù)商收取訂單,并確認(rèn)提供服務(wù)。

我們先給Order接口添加一個(gè)發(fā)送消息的方法。

public interface Order {public void makeOrder(Order order);    public OrderSuccessResult getResult(Order order);    public void postOrder(Order order);}

實(shí)現(xiàn)類實(shí)現(xiàn)該方法

@Data@AllArgsConstructor@NoArgsConstructor@ServiceOrderVersion(value = 1)@RequiredArgsConstructorpublic class ServiceOrder extends AbstractOrder {private Long id;    @NonNull    private String code;    @NonNull    private Store store;    @NonNull    private ProviderService service;    @NonNull    private Car car;    @NonNull    private Date serviceDate;    @NonNull    private String contact;    @NonNull    private String contactTel;    private AppUser user;    @NonNull    private String content;    private int status;    private Date createDate;    @Override    public void makeOrder(Order order) {
        ServiceOrderDao serviceOrderDao = SpringBootUtil.getBean(ServiceOrderDao.class);        IdService idService = SpringBootUtil.getBean(IdService.class);        ((ServiceOrder)order).setId(idService.genId());        ((ServiceOrder)order).setCode(getCodeInfo(idService));        AppUser loginAppUser = AppUserUtil.getLoginAppUser();        AppUser user = new AppUser();        user.setId(loginAppUser.getId());        user.setUsername(loginAppUser.getUsername());        ((ServiceOrder)order).setUser(user);        ((ServiceOrder)order).setStatus(1);        ((ServiceOrder)order).setCreateDate(new Date());        serviceOrderDao.save((ServiceOrder) order);    }@Override    public OrderSuccessResult getResult(Order order) {
        ServiceOrderSuccessResultFactory orderSuccessResultFactory = SpringBootUtil.getBean(ServiceOrderSuccessResultFactory.class);        this.orderSuccessResult = orderSuccessResultFactory.getOrderSuccessResult();        return this.orderSuccessResult.getResult(order);    }@Override    public void postOrder(Order order) {
        MessageSender sender = SpringBootUtil.getBean(MessageSender.class);        CompletableFuture.runAsync(() ->sender.send(OwnerCarCenterMq.MQ_EXCHANGE_ORDER,                        OwnerCarCenterMq.ROUTING_KEY_ORDER,                        order)
        );    }private String getCodeInfo(IdService idService) {
        String flow = String.valueOf(idService.genId());        flow = flow.substring(14,flow.length());        String pre = DateUtils.format(new Date(), DateUtils.pattern9);        return pre + flow;    }
}

其中我們定義了這么一組隊(duì)列名,交換機(jī),和路由

public interface OwnerCarCenterMq {/**     * 隊(duì)列名     */    String ORDER_QUEUE = "order";    /**     * 服務(wù)系統(tǒng)exchange名     */    String MQ_EXCHANGE_ORDER = "order.topic.exchange";    /**     * 服務(wù)添加routing key     */    String ROUTING_KEY_ORDER = "post.order";}

為了避免流控,我們定義了10個(gè)隊(duì)列,并全部綁定到一個(gè)交換機(jī)上。

@Configurationpublic class RabbitmqConfig {   @Bean   public List<Queue> orderQueues() {
      List<Queue> queues = new ArrayList<>();      for (int i = 1;i < 11;i++) {
         Queue queue = new Queue(OwnerCarCenterMq.ORDER_QUEUE + "_" + i);         queues.add(queue);      }      return queues;   }   @Bean   public TopicExchange orderExchange() {      return new TopicExchange(OwnerCarCenterMq.MQ_EXCHANGE_ORDER);   }   @Bean   public List<Binding> bindingOrders() {
      List<Binding> bindings = new ArrayList<>();      for (int i = 1;i < 11;i++) {
         Binding binding = BindingBuilder.bind(orderQueues().get(i - 1)).to(orderExchange())
               .with(OwnerCarCenterMq.ROUTING_KEY_ORDER + "_" + i);         bindings.add(binding);      }      return bindings;   }
}

重新封裝消息提供者,每次發(fā)送都隨機(jī)選取一個(gè)路由來(lái)進(jìn)行發(fā)送。

@Slf4j@Componentpublic class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Autowired    private RabbitTemplate rabbitTemplate;    public void send(String exchange,String routingKey,Object content) {log.info("send content=" + content);        this.rabbitTemplate.setMandatory(true);        this.rabbitTemplate.setConfirmCallback(this);        this.rabbitTemplate.setReturnCallback(this);        ThreadLocalRandom random = ThreadLocalRandom.current();        this.rabbitTemplate.convertAndSend(exchange,routingKey + "_" + random.nextInt(1,11),serialize(content));    }/**     * 確認(rèn)后回調(diào):     * @param correlationData     * @param ack     * @param cause     */    @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {log.info("send ack fail, cause = " + cause);        } else {log.info("send ack success");        }
    }/**     * 失敗后return回調(diào):     *     * @param message     * @param replyCode     * @param replyText     * @param exchange     * @param routingKey     */    @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);    }/**     * 對(duì)消息對(duì)象進(jìn)行二進(jìn)制序列化     * @param o     * @return     */    private byte[] serialize(Object o) {
        Kryo kryo = new Kryo();        ByteArrayOutputStream stream = new ByteArrayOutputStream();        Output output = new Output(stream);        kryo.writeObject(output, o);        output.close();        return stream.toByteArray();    }
}

我們可以看到在ServiceOrder里,我們是通過(guò)異步來(lái)進(jìn)行發(fā)送到。

Controller如下

@Slf4j@RestControllerpublic class OrderController {private ThreadLocal<OrderFactory> orderFactory = new ThreadLocal<>();    private ThreadLocal<Order> orderService = new ThreadLocal<>();    @Autowired    private OrderBean orderBean;    @Transactional    @SuppressWarnings("unchecked")@PostMapping("/makeeorder")public Result<OrderSuccessResult> makeOrder(@RequestBody String orderStr, @RequestParam("type") String type) {log.info(orderStr);        Order order = setOrderFactory(orderStr,type);        orderService.get().makeOrder(order);        orderService.get().postOrder(order);        return Result.success(orderService.get().getResult(order));    }/**     * 判斷是哪一種類型的訂單來(lái)獲取哪一種類型的具體訂單工廠     * @param orderStr     * @return     */    private Order setOrderFactory(String orderStr,String type) {
        Class<?> classType = orderBean.getOrderMap().get(type);        Object order = JSONObject.parseObject(orderStr, classType);//        if (orderStr.contains("service")) {//            order = JSON.parseObject(orderStr, ServiceOrder.class);//        }else if (orderStr.contains("product")) {//            order = JSON.parseObject(orderStr, ProductOrder.class);//        }        Class<?> classFactoryType = orderBean.getOrderFactoryMap().get(type + "Factory");        this.orderFactory.set((OrderFactory) SpringBootUtil.getBean(classFactoryType));//        if (order instanceof ServiceOrder) {//            this.orderFactory.set(SpringBootUtil.getBean(ServiceOrderFactory.class));//        }else if (order instanceof ProductOrder) {//            this.orderFactory.set(SpringBootUtil.getBean(ProductOrderFactory.class));//        }        orderService.set(orderFactory.get().getOrder());        return (Order) order;    }
}

最后是在我們的通知中心模塊接收消息,同時(shí)對(duì)這10個(gè)隊(duì)列實(shí)行監(jiān)控

@Slf4j@Component@RabbitListener(queues = {OwnerCarCenterMq.ORDER_QUEUE + "_" + 1,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 2,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 3,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 4,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 5,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 6,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 7,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 8,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 9,        OwnerCarCenterMq.ORDER_QUEUE + "_" + 10})public class ServiceOrderConsummer {@Getter    private Queue<ServiceOrder> serviceOrders = new ConcurrentLinkedDeque<>();    @RabbitHandler    public void receiceOrder(byte[] data, Channel channel, Message message) throws IOException {try {//告訴服務(wù)器收到這條消息 已經(jīng)被我消費(fèi)了 可以在隊(duì)列刪掉;否則消息服務(wù)器以為這條消息沒(méi)處理掉 后續(xù)還會(huì)在發(fā)            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);            ServiceOrder order = unSerialize(data);            this.serviceOrders.add(order);            log.info(String.valueOf(order));        } catch (IOException e) {
            e.printStackTrace();            //丟棄這條消息            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);            log.info("receiver fail");        }
    }/**     * 反序列化     * @param data     * @return     */    private ServiceOrder unSerialize(byte[] data) {
        Input input = null;        try {
            Kryo kryo = new Kryo();            input = new Input(new ByteArrayInputStream(data));            return kryo.readObject(input,ServiceOrder.class);        }finally {
            input.close();        }
    }
}

項(xiàng)目啟動(dòng)后,我們可以看到rabbitmq的情況如下

RabbitMQ用多路由,多隊(duì)列來(lái)破除流控

RabbitMQ用多路由,多隊(duì)列來(lái)破除流控

現(xiàn)我們來(lái)對(duì)其進(jìn)行壓測(cè),啟動(dòng)Jmeter,我們使用1000線程來(lái)進(jìn)行壓測(cè)測(cè)試。各配置如下

RabbitMQ用多路由,多隊(duì)列來(lái)破除流控

RabbitMQ用多路由,多隊(duì)列來(lái)破除流控

RabbitMQ用多路由,多隊(duì)列來(lái)破除流控

保存文件上傳服務(wù)器,因?yàn)楸救耸侨A為云的服務(wù)器,故在服務(wù)器上進(jìn)行壓測(cè),不進(jìn)行遠(yuǎn)程壓測(cè)

在服務(wù)器的jmeter的bin目錄下輸入

./jmeter -n -t model/rabbit.jmx -l log.jtl

這里-n為不啟動(dòng)圖形界面,-t使用我們上傳的配置文件,-l記錄日志

壓測(cè)結(jié)果如下

RabbitMQ用多路由,多隊(duì)列來(lái)破除流控

我們?cè)趬簻y(cè)過(guò)程中來(lái)看一下rabbitmq的UI界面

RabbitMQ用多路由,多隊(duì)列來(lái)破除流控

RabbitMQ用多路由,多隊(duì)列來(lái)破除流控

消費(fèi)基本上是實(shí)時(shí)的,沒(méi)有出現(xiàn)流控積壓現(xiàn)象。

到此,相信大家對(duì)“RabbitMQ用多路由,多隊(duì)列來(lái)破除流控”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI