溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

簡單了解如何在spring中使用RabbitMQ

發(fā)布時(shí)間:2020-09-24 11:56:15 來源:腳本之家 閱讀:365 作者:Runtimeing 欄目:編程語言

這篇文章主要介紹了簡單了解如何在spring中使用RabbitMQ,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下

常見的消息中間件產(chǎn)品:

(1)ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力強(qiáng)勁的開源消息總線。ActiveMQ 是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)。

(2)RabbitMQ

AMQP協(xié)議的領(lǐng)導(dǎo)實(shí)現(xiàn),支持多種場(chǎng)景。淘寶的MySQL集群內(nèi)部有使用它進(jìn)行通訊,OpenStack開源云平臺(tái)的通信組件,最先在金融行業(yè)得到運(yùn)用。我們?cè)诒敬握n程中介紹 RabbitMQ的使用。

(3)ZeroMQ

史上最快的消息隊(duì)列系統(tǒng)

(4)Kafka

Apache下的一個(gè)子項(xiàng)目 。特點(diǎn):高吞吐,在一臺(tái)普通的服務(wù)器上既可以達(dá)到10W/s的吞吐速率;完全的分布式系統(tǒng)。適合處理海量數(shù)據(jù)。

(5)RocketMQ 阿里巴巴

消息中間件利用高效可靠的消息傳遞機(jī)制進(jìn)行平臺(tái)無關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進(jìn)行分布式系統(tǒng)的集成。通過提供消息傳遞和消息排隊(duì)模型,它可以在分布式環(huán)境下擴(kuò)展進(jìn)程間的通信。對(duì)于消息中間件,常見的角色大致也就有Producer(生產(chǎn)者)、Consumer(消費(fèi)者)。

消息隊(duì)列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用解耦,異步消息,流量削鋒等問題,實(shí)現(xiàn)高性能,高可用,可伸縮和最終一致性架構(gòu)。

​ Spring-amqp是對(duì)AMQP協(xié)議的抽象實(shí)現(xiàn),而spring-rabbit 是對(duì)協(xié)議的具體實(shí)現(xiàn),也是目前的唯一實(shí)現(xiàn)。底層使用的就是RabbitMQ。

已經(jīng)配置好了ssm的開發(fā)環(huán)境

1.導(dǎo)入依賴

<dependencies>
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.3</version>
  </dependency>
  <dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.1.3.RELEASE</version>
  </dependency>

  <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.5</version>
  </dependency>
</dependencies>

2.編寫生產(chǎn)者

2.1配置文件

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/rabbit
  http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
  http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
  http://www.springframework.org/schema/context
  http://www.springframework.org/schema/context/spring-context.xsd">

  <context:component-scan base-package="cn.test.rabbitmq.spring"/>

<!-- 配置連接工廠 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/saas"
              host="127.0.0.1" port="5672" username="saas" password="saas" />
<!-- 定義mq管理 -->
<rabbit:admin connection-factory="connectionFactory" />

<!-- 聲明隊(duì)列 -->
<rabbit:queue name="spring.test.queue" auto-declare="true" durable="true" />

<!-- 定義交換機(jī)綁定隊(duì)列(路由模式) -->
<rabbit:direct-exchange name="spring.test.exchange">
  <rabbit:bindings>
    <rabbit:binding queue="spring.test.queue" key="user.insert" />
  </rabbit:bindings>
</rabbit:direct-exchange>
<!-- 定義交換機(jī)綁定隊(duì)列(路由模式)使用匹配符
<rabbit:topic-exchange id="springTestExchange" name="spring.test.exchange">
  <rabbit:bindings>
    <rabbit:binding queue="spring.test.queue" pattern="#.#" />
  </rabbit:bindings>
</rabbit:topic-exchange>
-->
<!-- 消息對(duì)象json轉(zhuǎn)換類 -->
<bean id="jsonMessageConverter"
   class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />

<!-- 定義模版 -->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"
         exchange="spring.test.exchange"
         message-converter="jsonMessageConverter"/>

</beans>

2.2 發(fā)送方代碼

這里是往RabbitMQ隊(duì)列中放入任務(wù),讓消費(fèi)者去取

package cn.test.rabbitmq.spring;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MqSender {
  @Autowired
  private AmqpTemplate amqpTemplate;
  public void sendMessage(){
    //根據(jù)key發(fā)送到對(duì)應(yīng)的隊(duì)列
    amqpTemplate.convertAndSend("user.insert","spring整合RabbitMQ消息");
    System.out.println("發(fā)送成功........");
  }
}

2.3 測(cè)試代碼

package cn.test.rabbitmq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-mq-send.xml")
public class MqSendDemo {

  @Autowired
  private MqSender mqSender;
  @Test
  public void test(){
    //根據(jù)key發(fā)送到對(duì)應(yīng)的隊(duì)列
    mqSender.sendMessage();
  }
}

3.編寫消費(fèi)者

3.1 配置文件

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/rabbit
  http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
  http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans.xsd">


  <!-- 配置連接工廠 -->
  <rabbit:connection-factory id="connectionFactory" virtual-host="/saas"
                host="127.0.0.1" port="5672" username="saas" password="saas" />
  <!-- 定義mq管理 -->
  <rabbit:admin connection-factory="connectionFactory" />

  <!-- 聲明隊(duì)列 -->
  <rabbit:queue name="spring.test.queue" auto-declare="true" durable="true" />

  <!-- 定義消費(fèi)者 -->
  <bean id="testMqListener" class="cn.test.rabbitmq.spring.MqListener" />

  <!-- 定義消費(fèi)者監(jiān)聽隊(duì)列 -->
  <rabbit:listener-container
      connection-factory="connectionFactory">
    <rabbit:listener ref="testMqListener" queues="spring.test.queue" />
  </rabbit:listener-container>

</beans>

3.2 監(jiān)聽代碼

package cn.test.rabbitmq.spring;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

public class MqListener implements MessageListener {

  public void onMessage(Message message) {
    try {
      System.out.println(message.getBody());
      String ms = new String(message.getBody(), "UTF-8");
      System.out.println(ms);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

3.3 測(cè)試代碼

package cn.itcast.rabbitmq.spring;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext-mq-receive.xml")
public class MqReceiveDemo {

  @Test
  public void test(){
   //等待隊(duì)列中放入任務(wù),如果有任務(wù),立即消費(fèi)任務(wù)
    while (true){
    }
  }
}

以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持億速云。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI