rpush my-queue apple banana pear (intege..."/>
您好,登錄后才能下訂單哦!
Redis的 list 數(shù)據(jù)結構常用來作為 異步消息隊列 使用,使用 rpush/lpush 操作 入隊 ,使用 lpop/rpop 來操作 出隊
> rpush my-queue apple banana pear
(integer) 3
> llen my-queue
(integer) 3
> lpop my-queue
"apple"
> llen my-queue
(integer) 2
> lpop my-queue
"banana"
> llen my-queue
(integer) 1
> lpop my-queue
"pear"
> llen my-queue
(integer) 0
> lpop my-queue
(nil)
空隊列
空閑連接
鎖沖突處理
延時隊列
public class RedisDelayingQueue<T> {
@Data
@AllArgsConstructor
@NoArgsConstructor
private static class TaskItem<T> {
private String id;
private T msg;
}
private Type taskType = new TypeReference<TaskItem<T>>() {
}.getType();
private Jedis jedis;
private String queueKey;
public RedisDelayingQueue(Jedis jedis, String queueKey) {
this.jedis = jedis;
this.queueKey = queueKey;
}
public void delay(T msg) {
TaskItem<T> task = new TaskItem<>(UUID.randomUUID().toString(), msg);
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, JSON.toJSONString(task));
}
public void loop() {
// 可以進一步優(yōu)化,通過Lua腳本將zrangeByScore和zrem統(tǒng)一挪到Redis服務端進行原子化操作,減少搶奪失敗出現(xiàn)的資源浪費
while (!Thread.interrupted()) {
// 只取一條
Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
if (values.isEmpty()) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
break;
}
continue;
}
String s = values.iterator().next();
if (jedis.zrem(queueKey, s) > 0) {
// zrem是多線程多進程爭奪任務的關鍵
TaskItem<T> task = JSON.parseObject(s, taskType);
this.handleMsg(task.msg);
}
}
}
private void handleMsg(T msg) {
try {
System.out.println(msg);
} catch (Throwable ignored) {
// 一定要捕獲異常,避免因為個別任務處理問題導致循環(huán)異常退出
}
}
public static void main(String[] args) {
final RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(new Jedis("localhost", 16379), "q-demo");
Thread producer = new Thread() {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
queue.delay("zhongmingmao" + i);
}
}
};
Thread consumer = new Thread() {
@Override
public void run() {
queue.loop();
}
};
producer.start();
consumer.start();
try {
producer.join();
Thread.sleep(6000);
consumer.interrupt();
consumer.join();
} catch (InterruptedException ignored) {
}
}
}
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。