您好,登錄后才能下訂單哦!
這篇文章主要介紹了SpringBoot2如何整合Redis哨兵集群 實(shí)現(xiàn)消息隊(duì)列場(chǎng)景,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
Redis的分布式解決方案,在3.0版本后推出的方案,有效地解決了Redis分布式的需求,當(dāng)一個(gè)服務(wù)宕機(jī)可以快速的切換到另外一個(gè)服務(wù)。redis cluster主要是針對(duì)海量數(shù)據(jù)+高并發(fā)+高可用的場(chǎng)景。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>${spring-boot.version}</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>${redis-client.version}</version> </dependency>
spring: # Redis 集群 redis: sentinel: # sentinel 配置 master: mymaster nodes: 192.168.0.127:26379 maxTotal: 60 minIdle: 10 maxWaitMillis: 10000 testWhileIdle: true testOnBorrow: true testOnReturn: false timeBetweenEvictionRunsMillis: 10000
@ConfigurationProperties(prefix = "spring.redis.sentinel") public class RedisParam { private String nodes ; private String master ; private Integer maxTotal ; private Integer minIdle ; private Integer maxWaitMillis ; private Integer timeBetweenEvictionRunsMillis ; private boolean testWhileIdle ; private boolean testOnBorrow ; private boolean testOnReturn ; // 省略GET和SET方法 }
@Configuration @EnableConfigurationProperties(RedisParam.class) public class RedisPool { @Resource private RedisParam redisParam ; @Bean("jedisSentinelPool") public JedisSentinelPool getRedisPool (){ Set<String> sentinels = new HashSet<>(); sentinels.addAll(Arrays.asList(redisParam.getNodes().split(","))); GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); poolConfig.setMaxTotal(redisParam.getMaxTotal()); poolConfig.setMinIdle(redisParam.getMinIdle()); poolConfig.setMaxWaitMillis(redisParam.getMaxWaitMillis()); poolConfig.setTestWhileIdle(redisParam.isTestWhileIdle()); poolConfig.setTestOnBorrow(redisParam.isTestOnBorrow()); poolConfig.setTestOnReturn(redisParam.isTestOnReturn()); poolConfig.setTimeBetweenEvictionRunsMillis(redisParam.getTimeBetweenEvictionRunsMillis()); JedisSentinelPool redisPool = new JedisSentinelPool(redisParam.getMaster(), sentinels, poolConfig); return redisPool; } @Bean SpringUtil springUtil() { return new SpringUtil(); } @Bean RedisListener redisListener() { return new RedisListener(); } }
@Configuration public class RedisConfig { @Bean public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) { StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(); stringRedisTemplate.setConnectionFactory(factory); return stringRedisTemplate; } }
生產(chǎn)者消費(fèi)者模式:客戶端監(jiān)聽消息隊(duì)列,消息達(dá)到,消費(fèi)者馬上消費(fèi),如果消息隊(duì)列里面沒有消息,那么消費(fèi)者就繼續(xù)監(jiān)聽。基于Redis的LPUSH(BLPUSH)把消息入隊(duì),用 RPOP(BRPOP)獲取消息的模式。
@Component public class RedisLock { private static String keyPrefix = "RedisLock:"; @Resource private JedisSentinelPool jedisSentinelPool; public boolean addLock(String key, long expire) { Jedis jedis = null; try { jedis = jedisSentinelPool.getResource(); /* * nxxx的值只能取NX或者XX,如果取NX,則只有當(dāng)key不存在是才進(jìn)行set,如果取XX,則只有當(dāng)key已經(jīng)存在時(shí)才進(jìn)行set * expx的值只能取EX或者PX,代表數(shù)據(jù)過期時(shí)間的單位,EX代表秒,PX代表毫秒。 */ String value = jedis.set(keyPrefix + key, "1", "nx", "ex", expire); return value != null; } catch (Exception e){ e.printStackTrace(); }finally { if (jedis != null) jedis.close(); } return false; } public void removeLock(String key) { Jedis jedis = null; try { jedis = jedisSentinelPool.getResource(); jedis.del(keyPrefix + key); } finally { if (jedis != null) jedis.close(); } } }
1)封裝接口
public interface RedisHandler { /** * 隊(duì)列名稱 */ String queueName(); /** * 隊(duì)列消息內(nèi)容 */ String consume (String msgBody); }
2)接口實(shí)現(xiàn)
@Component public class LogAListen implements RedisHandler { private static final Logger LOG = LoggerFactory.getLogger(LogAListen.class) ; @Resource private RedisLock redisLock; @Override public String queueName() { return "LogA-key"; } @Override public String consume(String msgBody) { // 加鎖,防止消息重復(fù)投遞 String lockKey = "lock-order-uuid-A"; boolean lock = false; try { lock = redisLock.addLock(lockKey, 60); if (!lock) { return "success"; } LOG.info("LogA-key == >>" + msgBody); } catch (Exception e){ e.printStackTrace(); } finally { if (lock) { redisLock.removeLock(lockKey); } } return "success"; } }
public class RedisListener implements InitializingBean { /** * Redis 集群 */ @Resource private JedisSentinelPool jedisSentinelPool; private List<RedisHandler> handlers = null; private ExecutorService product = null; private ExecutorService consumer = null; /** * 初始化配置 */ @Override public void afterPropertiesSet() { handlers = SpringUtil.getBeans(RedisHandler.class) ; product = new ThreadPoolExecutor(10,15,60 * 3, TimeUnit.SECONDS,new SynchronousQueue<>()); consumer = new ThreadPoolExecutor(10,15,60 * 3, TimeUnit.SECONDS,new SynchronousQueue<>()); for (RedisHandler redisHandler : handlers){ product.execute(() -> { redisTask(redisHandler); }); } } /** * 隊(duì)列監(jiān)聽 */ public void redisTask (RedisHandler redisHandler){ Jedis jedis = null ; while (true){ try { jedis = jedisSentinelPool.getResource() ; List<String> msgBodyList = jedis.brpop(0, redisHandler.queueName()); if (msgBodyList != null && msgBodyList.size()>0){ consumer.execute(() -> { redisHandler.consume(msgBodyList.get(1)) ; }); } } catch (Exception e){ e.printStackTrace(); } finally { if (jedis != null) jedis.close(); } } } }
@Service public class RedisServiceImpl implements RedisService { @Resource private JedisSentinelPool jedisSentinelPool; @Override public void saveQueue(String queueKey, String msgBody) { Jedis jedis = null; try { jedis = jedisSentinelPool.getResource(); jedis.lpush(queueKey,msgBody) ; } catch (Exception e){ e.printStackTrace(); } finally { if (jedis != null) jedis.close(); } } }
@RestController public class RedisController { @Resource private RedisService redisService ; /** * 隊(duì)列推消息 */ @RequestMapping("/saveQueue") public String saveQueue (){ MsgBody msgBody = new MsgBody() ; msgBody.setName("LogAModel"); msgBody.setDesc("描述"); msgBody.setCreateTime(new Date()); redisService.saveQueue("LogA-key", JSONObject.toJSONString(msgBody)); return "success" ; } }
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“SpringBoot2如何整合Redis哨兵集群 實(shí)現(xiàn)消息隊(duì)列場(chǎng)景”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。