溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

怎么在springboot中利用Redis實(shí)現(xiàn)一個(gè)發(fā)布訂閱功能

發(fā)布時(shí)間:2021-01-29 15:58:48 來源:億速云 閱讀:520 作者:Leah 欄目:開發(fā)技術(shù)

怎么在springboot中利用Redis實(shí)現(xiàn)一個(gè)發(fā)布訂閱功能?針對(duì)這個(gè)問題,這篇文章詳細(xì)介紹了相對(duì)應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問題的小伙伴找到更簡(jiǎn)單易行的方法。

一、背景

單機(jī)節(jié)點(diǎn)下,WebSocket連接成功后,可以直接發(fā)送消息。而多節(jié)點(diǎn)下,連接時(shí)通過nginx會(huì)代理到不同節(jié)點(diǎn)。

假設(shè)一開始用戶連接了node1的socket服務(wù)。觸發(fā)消息發(fā)送的條件的時(shí)候也通過nginx進(jìn)行代理,假如代理轉(zhuǎn)到了node2節(jié)點(diǎn)上,那么node2節(jié)點(diǎn)的socket服務(wù)就發(fā)送不了消息,因?yàn)橐婚_始用戶注冊(cè)的是node1節(jié)點(diǎn)。這就導(dǎo)致了消息發(fā)送失敗。

怎么在springboot中利用Redis實(shí)現(xiàn)一個(gè)發(fā)布訂閱功能

為了解決這一方案,消息發(fā)送時(shí),就需要一個(gè)中間件來記錄,這樣,三個(gè)節(jié)點(diǎn)都可以獲取消息,然后在根據(jù)條件進(jìn)行消息推送。

二、解決方案(springboot 基于 Redis發(fā)布訂閱)

1、依賴

<!-- redis -->    
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- websocket --> 
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2、創(chuàng)建業(yè)務(wù)處理類 Demo.class,該類可以實(shí)現(xiàn)MessageListener接口后重寫onMessage方法,也可以不實(shí)現(xiàn),自己寫方法。

import com.alibaba.fastjson.JSON;
import com.dy.service.impl.OrdersServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
 
import java.util.HashMap;
 
/**
 * @program: 
 * @description: redis消息訂閱-業(yè)務(wù)處理
 * @author: zhang yi
 * @create: 2021-01-25 16:46
 */
@Component
public class Demo implements MessageListener {
  Logger logger = LoggerFactory.getLogger(this.getClass());
 
  @Override
  public void onMessage(Message message, byte[] pattern) {
    logger.info("消息訂閱成功---------");
    logger.info("內(nèi)容:"+message.getBody());
    logger.info("交換機(jī):"+message.getChannel());
  }
}

3、創(chuàng)建PubSubConfig配置類

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
 
/**
 * @program: 
 * @description: redis發(fā)布訂閱配置
 * @author: zhang yi
 * @create: 2021-01-25 16:49
 */
@Configuration
@EnableCaching
public class PubSubConfig {
  Logger logger = LoggerFactory.getLogger(this.getClass());
 
  //如果是多個(gè)交換機(jī),則參數(shù)為(RedisConnectionFactory connectionFactory,
  //              MessageListenerAdapter listenerAdapter,
  //              MessageListenerAdapter listenerAdapter2)
  @Bean
  RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                      MessageListenerAdapter listenerAdapter) {
 
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    // 可以添加多個(gè) messageListener,配置不同的交換機(jī)
    container.addMessageListener(listenerAdapter, new PatternTopic("channel:demo"));
    //container.addMessageListener(listenerAdapter2, new PatternTopic("channel:demo2"));
    return container;
  }
 
  /**
   * 消息監(jiān)聽器適配器,綁定消息處理器,利用反射技術(shù)調(diào)用消息處理器的業(yè)務(wù)方法
   * @param demo 第一步的業(yè)務(wù)處理類
   * @return
   */
  @Bean
  MessageListenerAdapter listenerAdapter(Demo demo) {
    logger.info("----------------消息監(jiān)聽器加載成功----------------");
    // onMessage 就是方法名,基于反射調(diào)用
    return new MessageListenerAdapter(demo, "onMessage");
  }
 
  /**
   * 多個(gè)交換機(jī)就多寫一個(gè)
   * @param subCheckOrder
   * @return
   */
  //@Bean
  //MessageListenerAdapter listenerAdapter2(SubCheckOrder subCheckOrder) {
  //  logger.info("----------------消息監(jiān)聽器加載成功----------------");
  //  return new MessageListenerAdapter(subCheckOrder, "onMessage");
  //}
 
  @Bean
  StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
    return new StringRedisTemplate(connectionFactory);
  }
}

4、消息發(fā)布

@Autowired
private RedisTemplate<String, Object> redisTemplate;
 
redisTemplate.convertAndSend("channel:demo", "我是內(nèi)容");

三、具體用法

  • socket連接成功。

  • socket消息推送時(shí),把信息發(fā)布到redis中。socket服務(wù)訂閱redis的消息,訂閱成功后進(jìn)行推送。集群下的socket都能訂閱到消息,但是只有之前連接成功的節(jié)點(diǎn)能推送成功,其余的無法推送。

關(guān)于怎么在springboot中利用Redis實(shí)現(xiàn)一個(gè)發(fā)布訂閱功能問題的解答就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注億速云行業(yè)資訊頻道了解更多相關(guān)知識(shí)。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI