溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

SpringBoot2如何整合Redis哨兵集群 實(shí)現(xiàn)消息隊(duì)列場(chǎng)景

發(fā)布時(shí)間:2021-11-20 15:33:11 來源:億速云 閱讀:181 作者:小新 欄目:編程語言

這篇文章主要介紹了SpringBoot2如何整合Redis哨兵集群 實(shí)現(xiàn)消息隊(duì)列場(chǎng)景,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

一、Redis集群簡(jiǎn)介

1、RedisCluster概念

Redis的分布式解決方案,在3.0版本后推出的方案,有效地解決了Redis分布式的需求,當(dāng)一個(gè)服務(wù)宕機(jī)可以快速的切換到另外一個(gè)服務(wù)。redis cluster主要是針對(duì)海量數(shù)據(jù)+高并發(fā)+高可用的場(chǎng)景。

二、與SpringBoot2.0整合

1、核心依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>${spring-boot.version}</version>
</dependency>
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>${redis-client.version}</version>
</dependency>

2、核心配置

spring:
  # Redis 集群
  redis:
    sentinel:
      # sentinel 配置
      master: mymaster
      nodes: 192.168.0.127:26379
      maxTotal: 60
      minIdle: 10
      maxWaitMillis: 10000
      testWhileIdle: true
      testOnBorrow: true
      testOnReturn: false
      timeBetweenEvictionRunsMillis: 10000

3、參數(shù)渲染類

@ConfigurationProperties(prefix = "spring.redis.sentinel")
public class RedisParam {
    private String nodes ;
    private String master ;
    private Integer maxTotal ;
    private Integer minIdle ;
    private Integer maxWaitMillis ;
    private Integer timeBetweenEvictionRunsMillis ;
    private boolean testWhileIdle ;
    private boolean testOnBorrow ;
    private boolean testOnReturn ;
    // 省略GET和SET方法
}

4、集群配置文件

@Configuration
@EnableConfigurationProperties(RedisParam.class)
public class RedisPool {
    @Resource
    private RedisParam redisParam ;
    @Bean("jedisSentinelPool")
    public JedisSentinelPool getRedisPool (){
        Set<String> sentinels = new HashSet<>();
        sentinels.addAll(Arrays.asList(redisParam.getNodes().split(",")));
        GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
        poolConfig.setMaxTotal(redisParam.getMaxTotal());
        poolConfig.setMinIdle(redisParam.getMinIdle());
        poolConfig.setMaxWaitMillis(redisParam.getMaxWaitMillis());
        poolConfig.setTestWhileIdle(redisParam.isTestWhileIdle());
        poolConfig.setTestOnBorrow(redisParam.isTestOnBorrow());
        poolConfig.setTestOnReturn(redisParam.isTestOnReturn());
        poolConfig.setTimeBetweenEvictionRunsMillis(redisParam.getTimeBetweenEvictionRunsMillis());
        JedisSentinelPool redisPool = new JedisSentinelPool(redisParam.getMaster(), sentinels, poolConfig);
        return redisPool;
    }
    @Bean
    SpringUtil springUtil() {
        return new SpringUtil();
    }
    @Bean
    RedisListener redisListener() {
        return new RedisListener();
    }
}

5、配置Redis模板類

@Configuration
public class RedisConfig {
    @Bean
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
        stringRedisTemplate.setConnectionFactory(factory);
        return stringRedisTemplate;
    }
}

三、模擬隊(duì)列場(chǎng)景案例

生產(chǎn)者消費(fèi)者模式:客戶端監(jiān)聽消息隊(duì)列,消息達(dá)到,消費(fèi)者馬上消費(fèi),如果消息隊(duì)列里面沒有消息,那么消費(fèi)者就繼續(xù)監(jiān)聽。基于Redis的LPUSH(BLPUSH)把消息入隊(duì),用 RPOP(BRPOP)獲取消息的模式。

1、加鎖解鎖工具

@Component
public class RedisLock {
    private static String keyPrefix = "RedisLock:";
    @Resource
    private JedisSentinelPool jedisSentinelPool;
    public boolean addLock(String key, long expire) {
        Jedis jedis = null;
        try {
            jedis = jedisSentinelPool.getResource();
            /*
             * nxxx的值只能取NX或者XX,如果取NX,則只有當(dāng)key不存在是才進(jìn)行set,如果取XX,則只有當(dāng)key已經(jīng)存在時(shí)才進(jìn)行set
             * expx的值只能取EX或者PX,代表數(shù)據(jù)過期時(shí)間的單位,EX代表秒,PX代表毫秒。
             */
            String value = jedis.set(keyPrefix + key, "1", "nx", "ex", expire);
            return value != null;
        } catch (Exception e){
            e.printStackTrace();
        }finally {
            if (jedis != null) jedis.close();
        }
        return false;
    }
    public void removeLock(String key) {
        Jedis jedis = null;
        try {
            jedis = jedisSentinelPool.getResource();
            jedis.del(keyPrefix + key);
        } finally {
            if (jedis != null) jedis.close();
        }
    }
}

2、消息消費(fèi)

1)封裝接口

public interface RedisHandler  {
    /**
     * 隊(duì)列名稱
     */
    String queueName();
    /**
     * 隊(duì)列消息內(nèi)容
     */
    String consume (String msgBody);
}

2)接口實(shí)現(xiàn)

@Component
public class LogAListen implements RedisHandler {
    private static final Logger LOG = LoggerFactory.getLogger(LogAListen.class) ;
    @Resource
    private RedisLock redisLock;
    @Override
    public String queueName() {
        return "LogA-key";
    }
    @Override
    public String consume(String msgBody) {
        // 加鎖,防止消息重復(fù)投遞
        String lockKey = "lock-order-uuid-A";
        boolean lock = false;
        try {
            lock = redisLock.addLock(lockKey, 60);
            if (!lock) {
                return "success";
            }
            LOG.info("LogA-key == >>" + msgBody);
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            if (lock) {
                redisLock.removeLock(lockKey);
            }
        }
        return "success";
    }
}

3、消息監(jiān)聽器

public class RedisListener implements InitializingBean {
    /**
     * Redis 集群
     */
    @Resource
    private JedisSentinelPool jedisSentinelPool;
    private List<RedisHandler> handlers = null;
    private ExecutorService product = null;
    private ExecutorService consumer = null;
    /**
     * 初始化配置
     */
    @Override
    public void afterPropertiesSet() {
        handlers = SpringUtil.getBeans(RedisHandler.class) ;
        product = new ThreadPoolExecutor(10,15,60 * 3,
                TimeUnit.SECONDS,new SynchronousQueue<>());
        consumer = new ThreadPoolExecutor(10,15,60 * 3,
                TimeUnit.SECONDS,new SynchronousQueue<>());
        for (RedisHandler redisHandler : handlers){
            product.execute(() -> {
                redisTask(redisHandler);
            });
        }
    }
    /**
     * 隊(duì)列監(jiān)聽
     */
    public void redisTask (RedisHandler redisHandler){
        Jedis jedis = null ;
        while (true){
            try {
                jedis = jedisSentinelPool.getResource() ;
                List<String> msgBodyList = jedis.brpop(0, redisHandler.queueName());
                if (msgBodyList != null && msgBodyList.size()>0){
                    consumer.execute(() -> {
                        redisHandler.consume(msgBodyList.get(1)) ;
                    });
                }
            } catch (Exception e){
                e.printStackTrace();
            } finally {
                if (jedis != null) jedis.close();
            }
        }
    }
}

4、消息生產(chǎn)者

@Service
public class RedisServiceImpl implements RedisService {
    @Resource
    private JedisSentinelPool jedisSentinelPool;
    @Override
    public void saveQueue(String queueKey, String msgBody) {
        Jedis jedis = null;
        try {
            jedis = jedisSentinelPool.getResource();
            jedis.lpush(queueKey,msgBody) ;
        } catch (Exception e){
          e.printStackTrace();
        } finally {
            if (jedis != null) jedis.close();
        }
    }
}

5、場(chǎng)景測(cè)試接口

@RestController
public class RedisController {
    @Resource
    private RedisService redisService ;
    /**
     * 隊(duì)列推消息
     */
    @RequestMapping("/saveQueue")
    public String saveQueue (){
        MsgBody msgBody = new MsgBody() ;
        msgBody.setName("LogAModel");
        msgBody.setDesc("描述");
        msgBody.setCreateTime(new Date());
        redisService.saveQueue("LogA-key", JSONObject.toJSONString(msgBody));
        return "success" ;
    }
}

感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“SpringBoot2如何整合Redis哨兵集群 實(shí)現(xiàn)消息隊(duì)列場(chǎng)景”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI