rpush my-queue apple banana pear (intege..."/>
溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

一篇詳解Redis -- 延時隊列

發(fā)布時間:2020-07-31 16:07:25 來源:網(wǎng)絡 閱讀:868 作者:wx5d6cccb1cb158 欄目:編程語言

Redis的 list 數(shù)據(jù)結構常用來作為 異步消息隊列 使用,使用 rpush/lpush 操作 入隊 ,使用 lpop/rpop 來操作 出隊

一篇詳解Redis -- 延時隊列

> rpush my-queue apple banana pear
(integer) 3
> llen my-queue
(integer) 3
> lpop my-queue
"apple"
> llen my-queue
(integer) 2
> lpop my-queue
"banana"
> llen my-queue
(integer) 1
> lpop my-queue
"pear"
> llen my-queue
(integer) 0
> lpop my-queue
(nil)


空隊列

  1. 如果隊列為空,客戶端會陷入 pop的死循環(huán) , 空輪詢 不僅拉高了 客戶端的CPU , Redis的QPS 也會被拉高
  2. 如果空輪詢的客戶端有幾十個, Redis的慢查詢 也會顯著增加,可以嘗試讓客戶端線程 sleep 1s
  3. 但睡眠會導致消息的 延遲增大 ,可以使用 blpop/brpop (blocking, 阻塞讀 )
  4. 阻塞讀在隊列沒有數(shù)據(jù)時,會立即進入 休眠 狀態(tài),一旦有數(shù)據(jù)到來,會立即被 喚醒 , 消息延遲幾乎為0


空閑連接

  1. 如果線程一直阻塞在那里,Redis的客戶端連接就成了 閑置連接
  2. 閑置過久, 服務器 一般會 主動斷開 連接, 減少閑置的資源占用 ,此時 blpop/brpop 會 拋出異常


鎖沖突處理

  1. 分布式鎖 加鎖失敗 的處理策略
  2. 直接拋出異常 ,通知用戶稍后重試
  3. sleep 后再重試
  4. 將請求轉移到 延時隊列 ,過一會重試
  5. 拋出異常
  6. 這種方式比較適合由 用戶直接發(fā)起 的請求
  7. sleep
  8. sleep會 阻塞 當前的消息處理線程,從而導致隊列的后續(xù)消息處理出現(xiàn) 延遲
  9. 如果 碰撞比較頻繁 ,sleep方案不合適
  10. 延時隊列
  11. 比較適合異步消息處理的場景,通過將當前沖突的請求轉移到另一個隊列 延后處理 來 避免沖突


延時隊列

  1. 可以通過Redis的 zset 來實現(xiàn)延時隊列
  2. 將消息序列化成一個字符串作為zet的 value ,將該消息的 到期處理時間 作為 score
  3. 然后 多線程輪詢 zset獲取 到期的任務 進行處理
  4. 多線程是為了保障 可用性 ,但同時要考慮 并發(fā)安全 ,確保 任務不能被多次執(zhí)行

public class RedisDelayingQueue<T> {
 @Data
 @AllArgsConstructor
 @NoArgsConstructor
 private static class TaskItem<T> {
 private String id;
 private T msg;
 }
 private Type taskType = new TypeReference<TaskItem<T>>() {
 }.getType();
 private Jedis jedis;
 private String queueKey;
 public RedisDelayingQueue(Jedis jedis, String queueKey) {
 this.jedis = jedis;
 this.queueKey = queueKey;
 }
 public void delay(T msg) {
 TaskItem<T> task = new TaskItem<>(UUID.randomUUID().toString(), msg);
 jedis.zadd(queueKey, System.currentTimeMillis() + 5000, JSON.toJSONString(task));
 }
 public void loop() {
 // 可以進一步優(yōu)化,通過Lua腳本將zrangeByScore和zrem統(tǒng)一挪到Redis服務端進行原子化操作,減少搶奪失敗出現(xiàn)的資源浪費
 while (!Thread.interrupted()) {
 // 只取一條
 Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
 if (values.isEmpty()) {
 try {
 Thread.sleep(500);
 } catch (InterruptedException e) {
 break;
 }
 continue;
 }
 String s = values.iterator().next();
 if (jedis.zrem(queueKey, s) > 0) {
 // zrem是多線程多進程爭奪任務的關鍵
 TaskItem<T> task = JSON.parseObject(s, taskType);
 this.handleMsg(task.msg);
 }
 }
 }
 private void handleMsg(T msg) {
 try {
 System.out.println(msg);
 } catch (Throwable ignored) {
 // 一定要捕獲異常,避免因為個別任務處理問題導致循環(huán)異常退出
 }
 }
 public static void main(String[] args) {
 final RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(new Jedis("localhost", 16379), "q-demo");
 Thread producer = new Thread() {
 @Override
 public void run() {
 for (int i = 0; i < 10; i++) {
 queue.delay("zhongmingmao" + i);
 }
 }
 };
 Thread consumer = new Thread() {
 @Override
 public void run() {
 queue.loop();
 }
 };
 producer.start();
 consumer.start();
 try {
 producer.join();
 Thread.sleep(6000);
 consumer.interrupt();
 consumer.join();
 } catch (InterruptedException ignored) {
 }
 }
}
向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。

AI