您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關(guān)使用Redis怎么實(shí)現(xiàn)延遲隊(duì)列,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
采用通過定時任務(wù)采用數(shù)據(jù)庫/非關(guān)系型數(shù)據(jù)庫輪詢方案。
優(yōu)點(diǎn):
1. 實(shí)現(xiàn)簡單,對于項(xiàng)目前期這樣是最容易的解決方案。
缺點(diǎn):
1. DB 有效使用率低,需要將一部分的數(shù)據(jù)庫的QPS分配給 JOB 的無效輪詢。
2. 服務(wù)資源浪費(fèi),因?yàn)檩喸冃枰獙λ械臄?shù)據(jù)做一次 SCAN 掃描 JOB 服務(wù)的資源開銷很大。
采用延遲隊(duì)列:
優(yōu)點(diǎn):
1. 服務(wù)的資源使用率較高,能夠精確的實(shí)現(xiàn)超時任務(wù)的執(zhí)行。
2. 減少 DB 的查詢次數(shù),能夠降低數(shù)據(jù)庫的壓力
缺點(diǎn):
1. 對于延遲隊(duì)列來說本身設(shè)計(jì)比較復(fù)雜,目前沒有通用的比較好過的方案。
基于以上的分析,我決定通過 Redis 來實(shí)現(xiàn)分布式隊(duì)列。
設(shè)計(jì)思路:
1. 第一步將需要發(fā)放的消息發(fā)送到延遲隊(duì)列中。
2. 延遲隊(duì)列將數(shù)據(jù)存入 Redis 的 ZSet 有序集合中score 為當(dāng)前時間戳,member 存入需要發(fā)送的數(shù)據(jù)。
3. 添加一個 schedule 來進(jìn)行對 Redis 有序隊(duì)列的輪詢。
4. 如果到達(dá)達(dá)到消息的執(zhí)行時間,那么就進(jìn)行業(yè)務(wù)的執(zhí)行。
5. 如果沒有達(dá)到消息的執(zhí)行是將,那么消息等待下輪執(zhí)行。
由于本處篇幅有限,所以只列舉部分代碼,完整的代碼可以在本文最后訪問 GitHub 獲取。由于本人閱歷/水平有限,如有建議/或更正歡迎留言或提問。先在此謝謝大家駐足閱讀 ? ? ?。
需要注意的問題:
單個 Redis 命令的執(zhí)行是原子性的,但 Redis 沒有在事務(wù)上增加任何維持原子性的機(jī)制,所以 Redis 事務(wù)的執(zhí)行并不是原子性的。
事務(wù)可以理解為一個打包的批量執(zhí)行腳本,但批量指令并非原子化的操作,中間某條指令的失敗不會導(dǎo)致前面已做指令的回滾,也不會造成后續(xù)的指令不做。
我們可以通過 Redis 的 eval 命令來執(zhí)行 lua 腳本來保證原子性實(shí)現(xiàn)Redis的事務(wù)。
實(shí)現(xiàn)步驟如下:
1. 延遲隊(duì)列接口
/** * 延遲隊(duì)列 * * @author zhengsh * @date 2020-03-27 */ public interface RedisDelayQueue<E extends DelayMessage> { String META_TOPIC_WAIT = "delay:meta:topic:wait"; String META_TOPIC_ACTIVE = "delay:meta:topic:active"; String TOPIC_ACTIVE = "delay:active:9999"; /** * 拉取消息 */ void poll(); /** * 推送延遲消息 * * @param e */ void push(E e); }
2. 延遲隊(duì)列消息
/** * 消息體 * * @author zhengsh * @date 2020-03-27 */ @Setter @Getter public class DelayMessage { /** * 消息唯一標(biāo)識 */ private String id; /** * 消息主題 */ private String topic = "default"; /** * 具體消息 json */ private String body; /** * 延時時間, 格式為時間戳: 當(dāng)前時間戳 + 實(shí)際延遲毫秒數(shù) */ private Long delayTime = System.currentTimeMillis() + 30000L; /** * 消息發(fā)送時間 */ private LocalDateTime createTime; }
3. 延遲隊(duì)列實(shí)現(xiàn)
/** * 延遲隊(duì)列實(shí)現(xiàn) * * @author zhengsh * @date 2020-03-27 */ @Component public class RedisDelayQueueImpl<E extends DelayMessage> implements RedisDelayQueue<E> { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private StringRedisTemplate redisTemplate; @Override public void poll() { // todo } /** * 發(fā)送消息 * * @param e */ @SneakyThrows @Override public void push(E e) { try { String jsonStr = JSON.toJSONString(e); String topic = e.getTopic(); String zkey = String.format("delay:wait:%s", topic); String u = "redis.call('sadd', KEYS[1], ARGV[1])\n" + "redis.call('zadd', KEYS[2], ARGV[2], ARGV[3])\n" + "return 1"; Object[] keys = new Object[]{serialize(META_TOPIC_WAIT), serialize(zkey)}; Object[] values = new Object[]{ serialize(zkey), serialize(String.valueOf(e.getDelayTime())),serialize(jsonStr)}; Long result = redisTemplate.execute((RedisCallback<Long>) connection -> { Object nativeConnection = connection.getNativeConnection(); if (nativeConnection instanceof RedisAsyncCommands) { RedisAsyncCommands commands = (RedisAsyncCommands) nativeConnection; return (Long) commands.getStatefulConnection().sync().eval(u, ScriptOutputType.INTEGER, keys, values); } else if (nativeConnection instanceof RedisAdvancedClusterAsyncCommands) { RedisAdvancedClusterAsyncCommands commands = (RedisAdvancedClusterAsyncCommands) nativeConnection; return (Long) commands.getStatefulConnection().sync().eval(u, ScriptOutputType.INTEGER, keys, values); } return 0L; }); logger.info("延遲隊(duì)列[1],消息推送成功進(jìn)入等待隊(duì)列({}), topic: {}", result != null && result > 0, e.getTopic()); } catch (Throwable t) { t.printStackTrace(); } } private byte[] serialize(String key) { RedisSerializer<String> stringRedisSerializer = (RedisSerializer<String>) redisTemplate.getKeySerializer(); //lettuce連接包下序列化鍵值,否則無法用默認(rèn)的ByteArrayCodec解析 return stringRedisSerializer.serialize(key); } }
4. 定時任務(wù)
/** * 分發(fā)任務(wù) */ @Component public class DistributeTask { private static final String LUA_SCRIPT; private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private StringRedisTemplate redisTemplate; static { StringBuilder sb = new StringBuilder(128); sb.append("local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1], 'limit', 0, 1)\n"); sb.append("if(next(val) ~= nil) then\n"); sb.append(" redis.call('sadd', KEYS[2], ARGV[2])\n"); sb.append(" redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)\n"); sb.append(" for i = 1, #val, 100 do\n"); sb.append(" redis.call('rpush', KEYS[3], unpack(val, i, math.min(i+99, #val)))\n"); sb.append(" end\n"); sb.append(" return 1\n"); sb.append("end\n"); sb.append("return 0"); LUA_SCRIPT = sb.toString(); } /** * 2秒鐘掃描一次執(zhí)行隊(duì)列 */ @Scheduled(cron = "0/5 * * * * ?") public void scheduledTaskByCorn() { try { Set<String> members = redisTemplate.opsForSet().members(META_TOPIC_WAIT); assert members != null; for (String k : members) { if (!redisTemplate.hasKey(k)) { // 如果 KEY 不存在元數(shù)據(jù)中刪除 redisTemplate.opsForSet().remove(META_TOPIC_WAIT, k); continue; } String lk = k.replace("delay:wait", "delay:active"); Object[] keys = new Object[]{serialize(k), serialize(META_TOPIC_ACTIVE), serialize(lk)}; Object[] values = new Object[]{serialize(String.valueOf(System.currentTimeMillis())), serialize(lk)}; Long result = redisTemplate.execute((RedisCallback<Long>) connection -> { Object nativeConnection = connection.getNativeConnection(); if (nativeConnection instanceof RedisAsyncCommands) { RedisAsyncCommands commands = (RedisAsyncCommands) nativeConnection; return (Long) commands.getStatefulConnection().sync().eval(LUA_SCRIPT, ScriptOutputType.INTEGER, keys, values); } else if (nativeConnection instanceof RedisAdvancedClusterAsyncCommands) { RedisAdvancedClusterAsyncCommands commands = (RedisAdvancedClusterAsyncCommands) nativeConnection; return (Long) commands.getStatefulConnection().sync().eval(LUA_SCRIPT, ScriptOutputType.INTEGER, keys, values); } return 0L; }); logger.info("延遲隊(duì)列[2],消息到期進(jìn)入執(zhí)行隊(duì)列({}): {}", result != null && result > 0, TOPIC_ACTIVE); } } catch (Throwable t) { t.printStackTrace(); } } private byte[] serialize(String key) { RedisSerializer<String> stringRedisSerializer = (RedisSerializer<String>) redisTemplate.getKeySerializer(); //lettuce連接包下序列化鍵值,否則無法用默認(rèn)的ByteArrayCodec解析 return stringRedisSerializer.serialize(key); } }
以上就是使用Redis怎么實(shí)現(xiàn)延遲隊(duì)列,小編相信有部分知識點(diǎn)可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。