溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

redis?stream怎么實(shí)現(xiàn)消息隊(duì)列

發(fā)布時(shí)間:2022-08-10 14:19:41 來源:億速云 閱讀:149 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容主要講解“redis stream怎么實(shí)現(xiàn)消息隊(duì)列”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“redis stream怎么實(shí)現(xiàn)消息隊(duì)列”吧!

    Redis5.0帶來了Stream類型。從字面上看是流類型,但其實(shí)從功能上看,應(yīng)該是Redis對(duì)消息隊(duì)列(MQ,Message Queue)的完善實(shí)現(xiàn)。

    基于redis實(shí)現(xiàn)消息隊(duì)列的方式有很多:

    • PUB/SUB,訂閱/發(fā)布模式

    • 基于List的 LPUSH+BRPOP 的實(shí)現(xiàn)

    redis 實(shí)現(xiàn)消息對(duì)列4中方法

    發(fā)布訂閱

    發(fā)布訂閱優(yōu)點(diǎn): 典型的一對(duì)的,所有消費(fèi)者都能同時(shí)消費(fèi)到消息。主動(dòng)通知訂閱者而不是訂閱者輪詢?nèi)プx。

    發(fā)布訂閱缺點(diǎn): 不支持多個(gè)消費(fèi)者公平消費(fèi)消息,消息沒有持久化,不管訂閱者是否收到消息,消息都會(huì)丟失。

    使用場景:微服務(wù)間的消息同步,如 分布式webSocker,數(shù)據(jù)同步等。

    list 隊(duì)列

    生產(chǎn)者通過lpush生成消息,消費(fèi)者通過blpop阻塞讀取消息。

    **list隊(duì)列優(yōu)點(diǎn):**支持多個(gè)消費(fèi)者公平消費(fèi)消息,對(duì)消息進(jìn)行存儲(chǔ),可以通過lrange查詢隊(duì)列內(nèi)的消息。

    **list隊(duì)列缺點(diǎn):**blpop仍然會(huì)阻塞當(dāng)前連接,導(dǎo)致連接不可用。一旦blpop成功消息就丟棄了,期間如果服務(wù)器宕機(jī)消息會(huì)丟失,不支持一對(duì)多消費(fèi)者。

    zset 隊(duì)列

    生產(chǎn)者通過zadd 創(chuàng)建消息時(shí)指定分?jǐn)?shù),可以確定消息的順序,消費(fèi)者通過zrange獲取消息后進(jìn)行消費(fèi),消費(fèi)完后通zrem刪除消息。

    zset優(yōu)點(diǎn): 保證了消息的順序,消費(fèi)者消費(fèi)失敗后重新入隊(duì)不會(huì)打亂消費(fèi)順序。

    zset缺點(diǎn): 不支持一對(duì)多消費(fèi),多個(gè)消費(fèi)者消費(fèi)時(shí)可能出現(xiàn)讀取同一條消息的情況,得通過加鎖或其他方式解決消費(fèi)的冪等性。

    zset使用場景:由于數(shù)據(jù)是有序的,常常被用于延遲隊(duì)列,如 redisson的DelayQueue

    Stream 隊(duì)列

    Redis5.0帶來了Stream類型。從字面上看是流類型,但其實(shí)從功能上看,應(yīng)該是Redis對(duì)消息隊(duì)列(MQ,Message Queue)的完善實(shí)現(xiàn)。

    參考kafka的思想,通過多個(gè)消費(fèi)者組和消費(fèi)者支持一對(duì)多消費(fèi),公平消費(fèi),消費(fèi)者內(nèi)維護(hù)了pending列表防止消息丟失。

    提供消息ack機(jī)制。

    redis?stream怎么實(shí)現(xiàn)消息隊(duì)列

    基本命令

    xadd 生產(chǎn)消息

    往 stream 內(nèi)創(chuàng)建消息 語法為:

    XADD key ID field string [field string …]

    # * 表示自動(dòng)生成id redis會(huì)根據(jù)時(shí)間戳+序列號(hào)自動(dòng)生成id,不建議我們自己指定id
    xadd stream1 * name zs age 23

    讀取消息

    讀取stream內(nèi)的消息,這個(gè)并不是消費(fèi),只是提供了查看數(shù)據(jù)的功能,語法為:

    XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

    #表示從 stream1 內(nèi)取出一條消息,從第0條消息讀取(0表示最小的id)
    xread count 1 streams stream1 0
    #表示從 stream1 內(nèi) id=1649143363972-0 開始讀取一條消息,讀取的是指定id的下一條消息
    xread count 1 streams msg 1649143363972-0
    
    #表示一直阻塞讀取最新的消息($表示獲取下一個(gè)生成的消息)
    xread count 1 block 0 streams stream1 $ 
    
    xrange stream - + 10

    XRANGE key startID endID count

    #表示從stream1內(nèi)取10條消息 起始位置為 -(最小ID) 結(jié)束位置為+(最大ID)
    xrange stream1 - + 10

    xgroup 消費(fèi)者組

    redis stream 借鑒了kafka的設(shè)計(jì),采用了消費(fèi)者和消費(fèi)者組的概念。允許多個(gè)消費(fèi)者組消費(fèi)stream的消息,每個(gè)消費(fèi)者組都能收到完整的消息,例如:stream內(nèi)有10條消息,消費(fèi)者組A和消費(fèi)者組B同時(shí)消費(fèi)時(shí),都能獲取到這10條消息。

    每個(gè)消費(fèi)者組內(nèi)可以有多個(gè)消費(fèi)者消費(fèi),消息會(huì)平均分?jǐn)偨o各個(gè)消費(fèi)者,例如:stream有10條消息,消費(fèi)者A,B,C同時(shí)在同一個(gè)組內(nèi)消費(fèi),A接收到 1,4,7,10,B接收到 2,5,8,C接收到 3,6,9

    創(chuàng)建消費(fèi)者組:

    #消費(fèi)消息首先得創(chuàng)建消費(fèi)者組
    # 表示為隊(duì)列 stream1 創(chuàng)建一個(gè)消費(fèi)者組 group1 從消息id=0(第一條消息)開始讀取消息
    xgroup create stream1 group1 0
    
    #查詢stream1內(nèi)的所有消費(fèi)者組信息
    xinfo groups stream1

    xreadgroup 消費(fèi)消息

    通過xreadgroup可以在消費(fèi)者組內(nèi)創(chuàng)建消費(fèi)者消費(fèi)消息

    XREADGROUP group groupName consumerName [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

    #創(chuàng)建消費(fèi)者讀取消息
    #在group1消費(fèi)者組內(nèi)通過consumer1消費(fèi)stream1內(nèi)的消息,消費(fèi)1條未分配的消息 (> 表示未分配過消費(fèi)者的消息)
    xreadgrup group group1 consumer1 count 1 streams stream1 >

    Pending 等待列表

    通過 xreadgroup 讀取消息時(shí)消息會(huì)分配給對(duì)應(yīng)的消費(fèi)者,每個(gè)消費(fèi)者內(nèi)都維護(hù)了一個(gè)Pending列表用于保存接收到的消息,當(dāng)消息ack后會(huì)從pending列表內(nèi)移除,也就是說pending列表內(nèi)維護(hù)的是所有未ack的消息id

    每個(gè)Pending的消息有4個(gè)屬性:

    • 消息ID

    • 所屬消費(fèi)者

    • IDLE,已讀取時(shí)長

    • delivery counter,消息被讀取次數(shù)

    XPENDING key group [start end count] [consumer]

    #查看pending列表
    # 查看group1組內(nèi)的consumer1的pending列表 - 表示最小的消息id + 表示最大的消息ID
    xpending stream1 group1 - + 10 consumer1
    # 查看group1組內(nèi)的所有消費(fèi)者pending類表
    xpending stream1 group1 - + 10

    消息確認(rèn)

    當(dāng)消費(fèi)者消費(fèi)了消息,需要通過 xack 命令確認(rèn)消息,xack后的消息會(huì)從pending列表移除

    XACK key gruopName ID

    xack stream1 group1 xxx

    消息轉(zhuǎn)移

    當(dāng)消費(fèi)者接收到消息卻不能正確消費(fèi)時(shí)(報(bào)錯(cuò)或其他原因),可以使用 XCLAIM 將消息轉(zhuǎn)移給其他消費(fèi)者消費(fèi),需要設(shè)置組、轉(zhuǎn)移的目標(biāo)消費(fèi)者和消息ID,同時(shí)需要提供IDLE(已被讀取時(shí)長),只有超過這個(gè)時(shí)長,才能被轉(zhuǎn)移。

    通過xclaim轉(zhuǎn)移的消息只是將消息移入另一個(gè)消費(fèi)者的pending列表,消費(fèi)者并不能通過xreadgroup讀取到消息,只能通過xpending讀取到。

    # 表示將ID為 1553585533795-1 的消息轉(zhuǎn)移到消費(fèi)者B消費(fèi),前提是消費(fèi)
    XCLAIM stream1 group1 consumer1 3600000 1553585533795-1

    信息監(jiān)控

    redis提供了xinfo來查看stream的信息

    #查看sream信息
    xinfo stream steam1
    #查詢消費(fèi)者組信息
    xinfo groups group1 
    
    #查詢消費(fèi)者信息
    xinfo consumers consumer1

    SpringBoot 整合

    1 引入依賴

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    2 編寫消費(fèi)者

    @Slf4j
    @Component
    public class EmailConsumer implements StreamListener<String, MapRecord<String,String,String>> {
    
        public final String streamName      = "emailStream";
        public final String groupName       = "emailGroup";
        public final String consumerName    = "emailConsumer";
    
    
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
    
        @Override
        public void onMessage(MapRecord<String, String, String> message) {
    
            //log.info("stream名稱-->{}",message.getStream());
            //log.info("消息ID-->{}",message.getId());
            log.info("消息內(nèi)容-->{}",message.getValue());
    
            Map<String, String> msgMap = message.getValue();
    
            if( msgMap.get("sID")!=null && Integer.valueOf(msgMap.get("sID")) % 3 ==0 ){
                //消費(fèi)異常導(dǎo)致未能ack時(shí),消息會(huì)進(jìn)入pending列表,我們可以啟動(dòng)定時(shí)任務(wù)來讀取pending列表處理失敗的任務(wù)
                log.info("消費(fèi)異常-->"+message);
               return;
            }
    
            StreamOperations<String, String, String> streamOperations = stringRedisTemplate.opsForStream();
            //消息應(yīng)答
            streamOperations.acknowledge( streamName,groupName,message.getId() );
    
        }
    	//我們可以啟動(dòng)定時(shí)任務(wù)不斷監(jiān)聽pending列表,處理死信消息
    }

    3 配置redis

    序列化配置

    @EnableCaching
    @Configuration
    public class RedisConfig {
    
    
        /**
         * 設(shè)置redis序列化規(guī)則
         */
        @Bean
        public Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer(){
            Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
    
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            jackson2JsonRedisSerializer.setObjectMapper(om);
    
            return jackson2JsonRedisSerializer;
        }
    
        /**
         * RedisTemplate配置
         */
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory,
                                                           Jackson2JsonRedisSerializer jackson2JsonRedisSerializer) {
    
            // 配置redisTemplate
            RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
            redisTemplate.setConnectionFactory(redisConnectionFactory);
            RedisSerializer<?> stringSerializer = new StringRedisSerializer();
    
            // key序列化
            redisTemplate.setKeySerializer(stringSerializer);
            // value序列化
            redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
    
            // Hash key序列化
            redisTemplate.setHashKeySerializer(stringSerializer);
            // Hash value序列化
            redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
    
            redisTemplate.afterPropertiesSet();
            return redisTemplate;
        }
    
    }

    消費(fèi)者組和消費(fèi)者配置

    @Slf4j
    @Configuration
    public class RedisStreamConfig {
    
        @Autowired
        private EmailConsumer emailConsumer;
    
        @Autowired
        private RedisTemplate<String,Object> redisTemplate;
    
        @Bean
        public StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> emailListenerContainerOptions(){
    
            StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
    
            return StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                    .builder()
                    //block讀取超時(shí)時(shí)間
                    .pollTimeout(Duration.ofSeconds(3))
                    //count 數(shù)量(一次只獲取一條消息)
                    .batchSize(1)
                    //序列化規(guī)則
                    .serializer( stringRedisSerializer )
                    .build();
        }
    
        /**
         * 開啟監(jiān)聽器接收消息
         */
        @Bean
        public StreamMessageListenerContainer<String,MapRecord<String,String,String>> emailListenerContainer(RedisConnectionFactory factory,
                                                                     StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> streamMessageListenerContainerOptions){
    
            StreamMessageListenerContainer<String,MapRecord<String,String,String>> listenerContainer = StreamMessageListenerContainer.create(factory,
                    streamMessageListenerContainerOptions);
    
            //如果 流不存在 創(chuàng)建 stream 流
            if( !redisTemplate.hasKey(emailConsumer.streamName)){
                redisTemplate.opsForStream().add(emailConsumer.streamName, Collections.singletonMap("", ""));
                log.info("初始化stream {} success",emailConsumer.streamName);
            }
    
            //創(chuàng)建消費(fèi)者組
            try {
                redisTemplate.opsForStream().createGroup(emailConsumer.streamName,emailConsumer.groupName);
            } catch (Exception e) {
                log.info("消費(fèi)者組 {} 已存在",emailConsumer.groupName);
            }
    
            //注冊(cè)消費(fèi)者 消費(fèi)者名稱,從哪條消息開始消費(fèi),消費(fèi)者類
            // > 表示沒消費(fèi)過的消息
            // $ 表示最新的消息
            listenerContainer.receive(
                Consumer.from(emailConsumer.groupName, emailConsumer.consumerName),
                StreamOffset.create(emailConsumer.streamName, ReadOffset.lastConsumed()),
                emailConsumer
            );
    
    
            listenerContainer.start();
            return listenerContainer;
        }
    
    }

    4.生產(chǎn)者生產(chǎn)消息

    @GetMapping("/redis/ps")
    public String redisPublish(String content,Integer count){
    
        StreamOperations streamOperations = redisTemplate.opsForStream();
    
        for (int i = 0; i < count; i++) {
            AtomicInteger num = new AtomicInteger(i);
    
            Map msgMap = new HashMap();
            msgMap.put("count", i);
            msgMap.put("sID", num);
            //新增消息
            streamOperations.add("emailStream",msgMap);
        }
        return "success";
    }

    到此,相信大家對(duì)“redis stream怎么實(shí)現(xiàn)消息隊(duì)列”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

    向AI問一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI