溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Redis和隊(duì)列的案例分析

發(fā)布時(shí)間:2020-08-27 09:57:53 來源:億速云 閱讀:167 作者:小新 欄目:關(guān)系型數(shù)據(jù)庫

這篇文章給大家分享的是有關(guān)Redis和隊(duì)列的案例分析的內(nèi)容。小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考。一起跟隨小編過來看看吧。

概要

Redis不僅可作為緩存服務(wù)器,還可用作消息隊(duì)列。它的列表類型天生支持用作消息隊(duì)列。如下圖所示:

Redis和隊(duì)列的案例分析

由于Redis的列表是使用雙向鏈表實(shí)現(xiàn)的,保存了頭尾節(jié)點(diǎn),所以在列表頭尾兩邊插取元素都是非??斓摹?/p>

普通隊(duì)列實(shí)現(xiàn)

所以可以直接使用Redis的List實(shí)現(xiàn)消息隊(duì)列,只需簡單的兩個(gè)指令lpush和rpop或者rpush和lpop。簡單示例如下:

存放消息端(消息生產(chǎn)者):

package org.yamikaze.redis.messsage.queue; 
import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; 
import java.util.concurrent.TimeUnit; 
/**
 * 消息生產(chǎn)者
 * @author yamikaze */public class Producer extends Thread { 
    public static final String MESSAGE_KEY = "message:queue";    private Jedis jedis;    private String producerName;    private volatile int count; 
    public Producer(String name) {        this.producerName = name;
        init();
    } 
    private void init() {
        jedis = MyJedisFactory.getLocalJedis();
    } 
    public void putMessage(String message) {
        Long size = jedis.lpush(MESSAGE_KEY, message);
        System.out.println(producerName + ": 當(dāng)前未被處理消息條數(shù)為:" + size);
        count++;
    } 
    public int getCount() {        return count;
    }
 
    @Override    public void run() {        try {            while (true) {
                putMessage(StringUtils.generate32Str());
                TimeUnit.SECONDS.sleep(1);
            }
        } catch (InterruptedException e) {
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    } 
    public static void main(String[] args) throws InterruptedException{
        Producer producer = new Producer("myProducer");
        producer.start(); 
        for(; ;) {
            System.out.println("main : 已存儲(chǔ)消息條數(shù):" + producer.getCount());
            TimeUnit.SECONDS.sleep(10);
        }
    }
}

消息處理端(消息消費(fèi)者):

package org.yamikaze.redis.messsage.queue; 
import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; 
/**
 * 消息消費(fèi)者
 * @author yamikaze */public class Customer extends Thread{ 
    private String customerName;    private volatile int count;    private Jedis jedis; 
    public Customer(String name) {        this.customerName = name;
        init();
    } 
    private void init() {
        jedis = MyJedisFactory.getLocalJedis();
    } 
    public void processMessage() {
        String message = jedis.rpop(Producer.MESSAGE_KEY);        if(message != null) {
            count++;
            handle(message);
        }
    } 
    public void handle(String message) {
        System.out.println(customerName + " 正在處理消息,消息內(nèi)容是: " + message + " 這是第" + count + "條");
    }
 
    @Override    public void run() {        while (true) {
            processMessage();
        }
    } 
    public static void main(String[] args) {
        Customer customer = new Customer("yamikaze");
        customer.start();
    }
}

貌似還不錯(cuò),但上述例子中消息消費(fèi)者有一個(gè)問題存在,即需要不停的調(diào)用rpop方法查看List中是否有待處理消息。每調(diào)用一次都會(huì)發(fā)起一次連接,這會(huì)造成不必要的浪費(fèi)。也許你會(huì)使用Thread.sleep()等方法讓消費(fèi)者線程隔一段時(shí)間再消費(fèi),但這樣做有兩個(gè)問題:

1)、如果生產(chǎn)者速度大于消費(fèi)者消費(fèi)速度,消息隊(duì)列長度會(huì)一直增大,時(shí)間久了會(huì)占用大量內(nèi)存空間。

2)、如果睡眠時(shí)間過長,這樣不能處理一些時(shí)效性的消息,睡眠時(shí)間過短,也會(huì)在連接上造成比較大的開銷。

所以可以使用brpop指令,這個(gè)指令只有在有元素時(shí)才返回,沒有則會(huì)阻塞直到超時(shí)返回null,于是消費(fèi)端可以將processMessage可以改為這樣:

public void processMessage() {    /**
     * brpop支持多個(gè)列表(隊(duì)列)
     * brpop指令是支持隊(duì)列優(yōu)先級(jí)的,比如這個(gè)例子中MESSAGE_KEY的優(yōu)先級(jí)大于testKey(順序決定)。
     * 如果兩個(gè)列表中都有元素,會(huì)優(yōu)先返回優(yōu)先級(jí)高的列表中的元素,所以這兒優(yōu)先返回MESSAGE_KEY
     * 0表示不限制等待,會(huì)一直阻塞在這兒     */
    List<String> messages = jedis.brpop(0, Producer.MESSAGE_KEY, "testKey");    if(messages.size() != 0) {        //由于該指令可以監(jiān)聽多個(gè)Key,所以返回的是一個(gè)列表        //列表由2項(xiàng)組成,1) 列表名,2)數(shù)據(jù)
        String keyName = messages.get(0);        //如果返回的是MESSAGE_KEY的消息
        if(Producer.MESSAGE_KEY.equals(keyName)) {
            String message = messages.get(1);
            handle(message);
        }
 
    }
    System.out.println("=======================");
}

然后可以運(yùn)行Customer,清空控制臺(tái),可以看到程序沒有任何輸出,阻塞在了brpop這兒。然后在打開Redis的客戶端,輸入指令client list,可以查看當(dāng)前有兩個(gè)連接。

一次生產(chǎn)多次消費(fèi)的隊(duì)列

Redis除了對(duì)消息隊(duì)列提供支持外,還提供了一組命令用于支持發(fā)布/訂閱模式。利用Redis的pub/sub模式可以實(shí)現(xiàn)一次生產(chǎn)多次消費(fèi)的隊(duì)列。

1)發(fā)布
   PUBLISH指令可用于發(fā)布一條消息,格式 PUBLISH channel message

返回值表示訂閱了該消息的數(shù)量。
   2)訂閱
   SUBSCRIBE指令用于接收一條消息,格式 SUBSCRIBE channel

可以看到使用SUBSCRIBE指令后進(jìn)入了訂閱模式,但沒有接收到publish發(fā)送的消息,這是因?yàn)橹挥性谙l(fā)出去前訂閱才會(huì)接收到。在這個(gè)模式下其他指令,只能看到回復(fù)?;貜?fù)分為三種類型:
   1、如果為subscribe,第二個(gè)值表示訂閱的頻道,第三個(gè)值表示是第幾個(gè)訂閱的頻道?(理解成序號(hào)?)
   2、如果為message(消息),第二個(gè)值為產(chǎn)生該消息的頻道,第三個(gè)值為消息
   3、如果為unsubscribe,第二個(gè)值表示取消訂閱的頻道,第三個(gè)值表示當(dāng)前客戶端的訂閱數(shù)量。

可以使用指令UNSUBSCRIBE退訂,如果不加參數(shù),則會(huì)退訂所有由SUBSCRIBE指令訂閱的頻道。
 
   Redis還支持基于通配符的消息訂閱,使用指令PSUBSCRIBE (pattern subscribe),例如:

再試試推送消息會(huì)得到以下結(jié)果:

可以看到publish指令返回的是2,而訂閱端這邊接收了兩次消息。這是因?yàn)镻SUBSCRIBE指令可以重復(fù)訂閱頻道。而使用PSUBSCRIBE指令訂閱的頻道也要使用指令PUNSUBSCRIBE指令退訂,該指令無法退訂SUBSCRIBE訂閱的頻道,同理UNSUBSCRIBE也不能退訂PSUBSCRIBE指令訂閱的頻道。同時(shí)PUNSUBSCRIBE指令通配符不會(huì)展開。
例如:PUNSUBSCRIBE * 不會(huì)匹配到 channel.*, 所以要取消訂閱channel.*就要這樣寫PUBSUBSCRIBE channel.*。

代碼示范如下:

package org.yamikaze.redis.messsage.subscribe; 
import org.yamikaze.redis.messsage.queue.StringUtils;import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis; 
/**
 * 消息發(fā)布方
 * @author yamikaze */public class Publisher { 
    public static final String CHANNEL_KEY = "channel:message";    private Jedis jedis; 
    public Publisher() {
        jedis = MyJedisFactory.getLocalJedis();
    } 
    public void publishMessage(String message) {        if(StringUtils.isBlank(message)) {            return;
        }
        jedis.publish(CHANNEL_KEY, message);
    } 
    public static void main(String[] args) {
        Publisher publisher = new Publisher();
        publisher.publishMessage("Hello Redis!");
    }
}

簡單的發(fā)送一個(gè)消息。

消息訂閱方:

package org.yamikaze.redis.messsage.subscribe; 
import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub; 
import java.util.concurrent.TimeUnit; 
/**
 * 消息訂閱方客戶端
 * @author yamikaze */public class SubscribeClient { 
    private Jedis jedis;    private static final String EXIT_COMMAND = "exit"; 
    public SubscribeClient() {
        jedis = MyJedisFactory.getLocalJedis();
    } 
    public void subscribe(String ...channel) {        if(channel == null || channel.length <= 0) {            return;
        }        //消息處理,接收到消息時(shí)如何處理
        JedisPubSub jps = new JedisPubSub() {            /**
             * JedisPubSub類是一個(gè)沒有抽象方法的抽象類,里面方法都是一些空實(shí)現(xiàn)
             * 所以可以選擇需要的方法覆蓋,這兒使用的是SUBSCRIBE指令,所以覆蓋了onMessage
             * 如果使用PSUBSCRIBE指令,則覆蓋onPMessage方法
             * 當(dāng)然也可以選擇BinaryJedisPubSub,同樣是抽象類,但方法參數(shù)為byte[]             */
            @Override            public void onMessage(String channel, String message) {                if(Publisher.CHANNEL_KEY.equals(channel)) {
                    System.out.println("接收到消息: channel : " + message);                    //接收到exit消息后退出
                    if(EXIT_COMMAND.equals(message)) {
                        System.exit(0);
                    }
 
                }
            } 
            /**
             * 訂閱時(shí)             */
            @Override            public void onSubscribe(String channel, int subscribedChannels) {                if(Publisher.CHANNEL_KEY.equals(channel)) {
                    System.out.println("訂閱了頻道:" + channel);
                }
            }
        };        //可以訂閱多個(gè)頻道 當(dāng)前線程會(huì)阻塞在這兒        jedis.subscribe(jps, channel);
    } 
    public static void main(String[] args) {
        SubscribeClient client = new SubscribeClient();
        client.subscribe(Publisher.CHANNEL_KEY);        //并沒有 unsubscribe方法        //相應(yīng)的也沒有punsubscribe方法    }
}

先運(yùn)行client,再運(yùn)行Publisher進(jìn)行消息發(fā)送,輸出結(jié)果:

Redis和隊(duì)列的案例分析

Redis的pub/sub也有其缺點(diǎn),那就是如果消費(fèi)者下線,生產(chǎn)者的消息會(huì)丟失。

延時(shí)隊(duì)列

背景

在業(yè)務(wù)發(fā)展過程中,會(huì)出現(xiàn)一些需要延時(shí)處理的場景,比如:

a.訂單下單之后超過30分鐘用戶未支付,需要取消訂單
b.訂單一些評(píng)論,如果48h用戶未對(duì)商家評(píng)論,系統(tǒng)會(huì)自動(dòng)產(chǎn)生一條默認(rèn)評(píng)論
c.點(diǎn)我達(dá)訂單下單后,超過一定時(shí)間訂單未派出,需要超時(shí)取消訂單等。。。
處理這類需求,比較直接簡單的方式就是定時(shí)任務(wù)輪訓(xùn)掃表。這種處理方式在數(shù)據(jù)量不大的場景下是完全沒問題,但是當(dāng)數(shù)據(jù)量大的時(shí)候高頻的輪訓(xùn)數(shù)據(jù)庫就會(huì)比較的耗資源,導(dǎo)致數(shù)據(jù)庫的慢查或者查詢超時(shí)。所以在處理這類需求時(shí)候,采用了延時(shí)隊(duì)列來完成。

幾種延時(shí)隊(duì)列

延時(shí)隊(duì)列就是一種帶有延遲功能的消息隊(duì)列。下面會(huì)介紹幾種目前已有的延時(shí)隊(duì)列:

1.Java中java.util.concurrent.DelayQueue

優(yōu)點(diǎn):JDK自身實(shí)現(xiàn),使用方便,量小適用
缺點(diǎn):隊(duì)列消息處于jvm內(nèi)存,不支持分布式運(yùn)行和消息持久化

2.Rocketmq延時(shí)隊(duì)列

優(yōu)點(diǎn):消息持久化,分布式
缺點(diǎn):不支持任意時(shí)間精度,只支持特定level的延時(shí)消息

3.Rabbitmq延時(shí)隊(duì)列(TTL+DLX實(shí)現(xiàn))

優(yōu)點(diǎn):消息持久化,分布式
缺點(diǎn):延時(shí)相同的消息必須扔在同一個(gè)隊(duì)列

Redis實(shí)現(xiàn)的延時(shí)消息隊(duì)列適合的項(xiàng)目特點(diǎn):

  • Spring框架管理對(duì)象
  • 有消息需求,但不想維護(hù)mq中間件
  • 有使用redis
  • 對(duì)消息持久化并沒有很苛刻的要求

Redis實(shí)現(xiàn)的延時(shí)消息隊(duì)列思路

Redis由于其自身的Zset數(shù)據(jù)結(jié)構(gòu),本質(zhì)就是Set結(jié)構(gòu)上加了個(gè)排序的功能,除了添加數(shù)據(jù)value之外,還提供另一屬性score,這一屬性在添加修改元素時(shí)候可以指定,每次指定后,Zset會(huì)自動(dòng)重新按新的值調(diào)整順序??梢岳斫鉃橛袃闪凶侄蔚臄?shù)據(jù)表,一列存value,一列存順序編號(hào)。操作中key理解為zset的名字,那么對(duì)延時(shí)隊(duì)列又有何用呢?

試想如果score代表的是想要執(zhí)行時(shí)間的時(shí)間戳,在某個(gè)時(shí)間將它插入Zset集合中,它變會(huì)按照時(shí)間戳大小進(jìn)行排序,也就是對(duì)執(zhí)行時(shí)間前后進(jìn)行排序,這樣的話,起一個(gè)死循環(huán)線程不斷地進(jìn)行取第一個(gè)key值,如果當(dāng)前時(shí)間戳大于等于該key值的socre就將它取出來進(jìn)行消費(fèi)刪除,就可以達(dá)到延時(shí)執(zhí)行的目的, 注意不需要遍歷整個(gè)Zset集合,以免造成性能浪費(fèi)。

Zset的排列效果如下圖:

Redis和隊(duì)列的案例分析

java代碼實(shí)現(xiàn)如下:

package cn.chinotan.service.delayQueueRedis;import org.apache.commons.lang3.StringUtils;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.Tuple;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;import java.util.Set;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;/**
 * @program: test
 * @description: redis實(shí)現(xiàn)延時(shí)隊(duì)列
 * @author: xingcheng
 * @create: 2018-08-19
 **/public class AppTest {    private static final String ADDR = "127.0.0.1";    private static final int PORT = 6379;    private static JedisPool jedisPool = new JedisPool(ADDR, PORT);    private static CountDownLatch cdl = new CountDownLatch(10);    public static Jedis getJedis() {        return jedisPool.getResource();
    }    /**
     * 生產(chǎn)者,生成5個(gè)訂單     */
    public void productionDelayMessage() {        for (int i = 0; i < 5; i++) {
            Calendar instance = Calendar.getInstance();            // 3秒后執(zhí)行
            instance.add(Calendar.SECOND, 3 + i);
            AppTest.getJedis().zadd("orderId", (instance.getTimeInMillis()) / 1000, StringUtils.join("000000000", i + 1));
            System.out.println("生產(chǎn)訂單: " + StringUtils.join("000000000", i + 1) + " 當(dāng)前時(shí)間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
            System.out.println((3 + i) + "秒后執(zhí)行");
        }
    }    //消費(fèi)者,取訂單
    public static void consumerDelayMessage() {
        Jedis jedis = AppTest.getJedis();        while (true) {
            Set<Tuple> order = jedis.zrangeWithScores("orderId", 0, 0);            if (order == null || order.isEmpty()) {
                System.out.println("當(dāng)前沒有等待的任務(wù)");                try {
                    TimeUnit.MICROSECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }                continue;
            }
            Tuple tuple = (Tuple) order.toArray()[0];            double score = tuple.getScore();
            Calendar instance = Calendar.getInstance();            long nowTime = instance.getTimeInMillis() / 1000;            if (nowTime >= score) {
                String element = tuple.getElement();
                Long orderId = jedis.zrem("orderId", element);                if (orderId > 0) {
                    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ":redis消費(fèi)了一個(gè)任務(wù):消費(fèi)的訂單OrderId為" + element);
                }
            }
        }
    }    static class DelayMessage implements Runnable{
        @Override        public void run() {            try {
                cdl.await();
                consumerDelayMessage();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }    
    public static void main(String[] args) {
        AppTest appTest = new AppTest();
        appTest.productionDelayMessage();        for (int i = 0; i < 10; i++) {            new Thread(new DelayMessage()).start();
            cdl.countDown();
        }
    }
}

實(shí)現(xiàn)效果如下:

Redis和隊(duì)列的案例分析

感謝各位的閱讀!關(guān)于Redis和隊(duì)列的案例分析就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI