溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Redis監(jiān)聽過期的key實現(xiàn)流程是什么

發(fā)布時間:2023-02-28 10:38:39 來源:億速云 閱讀:95 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容介紹了“Redis監(jiān)聽過期的key實現(xiàn)流程是什么”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

    一、簡介

    我們來個最簡單的集群架構(gòu),如下圖:

    Redis監(jiān)聽過期的key實現(xiàn)流程是什么

    我們上面圖中看到是服務(wù)A和服務(wù)B就是同一個服務(wù)的不同實例。

    二、maven依賴

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.6.0</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.alian</groupId>
        <artifactId>expiration</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>expiration</name>
        <description>redis-key-expiration-listener</description>
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <project.package.directory>target</project.package.directory>
            <java.version>1.8</java.version>
            <!--com.fasterxml.jackson 版本-->
            <jackson.version>2.9.10</jackson.version>
            <!--阿里巴巴fastjson 版本-->
            <fastjson.version>1.2.68</fastjson.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
            </dependency>
            <!--redis依賴-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
                <version>${parent.version}</version>
            </dependency>
            <!--用于序列化-->
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>${jackson.version}</version>
            </dependency>
            <!--java 8時間序列化-->
            <dependency>
                <groupId>com.fasterxml.jackson.datatype</groupId>
                <artifactId>jackson-datatype-jsr310</artifactId>
                <version>${jackson.version}</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.68</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.16.14</version>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.13.2</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>

    三、編碼實現(xiàn)

    3.1、application.properties

    # 端口
    server.port=8090
    # 上下文路徑
    server.servlet.context-path=/expiration

    # Redis數(shù)據(jù)庫索引(默認為0)
    spring.redis.database=0
    # Redis服務(wù)器地址
    spring.redis.host=192.168.0.193
    #spring.redis.host=127.0.0.1
    # Redis服務(wù)器連接端口
    spring.redis.port=6379
    # Redis服務(wù)器連接密碼(默認為空)
    spring.redis.password=
    # 連接池最大連接數(shù)(使用負值表示沒有限制)
    spring.redis.jedis.pool.max-active=20
    # 連接池中的最小空閑連接
    spring.redis.jedis.pool.min-idle=10
    # 連接池中的最大空閑連接
    spring.redis.jedis.pool.max-idle=10
    # 連接池最大阻塞等待時間(使用負值表示沒有限制)
    spring.redis.jedis.pool.max-wait=20000
    # 讀時間(毫秒)
    spring.redis.timeout=10000
    # 連接超時時間(毫秒)
    spring.redis.connect-timeout=10000

    3.2、Redis配置類

    RedisConfig

    package com.alian.expiration.config;
    import com.fasterxml.jackson.annotation.JsonAutoDetect;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.fasterxml.jackson.databind.SerializationFeature;
    import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
    import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
    import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
    import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer;
    import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
    import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
    import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.RedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    import java.time.LocalDate;
    import java.time.LocalDateTime;
    import java.time.LocalTime;
    import java.time.format.DateTimeFormatter;
    @Configuration
    public class RedisConfig {
        /**
         * redis配置
         *
         * @param redisConnectionFactory
         * @return
         */
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
            // 實例化redisTemplate
            RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
            //設(shè)置連接工廠
            redisTemplate.setConnectionFactory(redisConnectionFactory);
            // key采用String的序列化
            redisTemplate.setKeySerializer(keySerializer());
            // value采用jackson序列化
            redisTemplate.setValueSerializer(valueSerializer());
            // Hash key采用String的序列化
            redisTemplate.setHashKeySerializer(keySerializer());
            // Hash value采用jackson序列化
            redisTemplate.setHashValueSerializer(valueSerializer());
            // 支持事務(wù)
            // redisTemplate.setEnableTransactionSupport(true);
            //執(zhí)行函數(shù),初始化RedisTemplate
            redisTemplate.afterPropertiesSet();
            return redisTemplate;
        }
        /**
         * key類型采用String序列化
         *
         * @return
         */
        private RedisSerializer<String> keySerializer() {
            return new StringRedisSerializer();
        }
        /**
         * value采用JSON序列化
         *
         * @return
         */
        private RedisSerializer<Object> valueSerializer() {
            //設(shè)置jackson序列化
            Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
            //設(shè)置序列化對象
            jackson2JsonRedisSerializer.setObjectMapper(getMapper());
            return jackson2JsonRedisSerializer;
        }
        /**
         * 使用com.fasterxml.jackson.databind.ObjectMapper
         * 對數(shù)據(jù)進行處理包括java8里的時間
         *
         * @return
         */
        private ObjectMapper getMapper() {
            ObjectMapper mapper = new ObjectMapper();
            //設(shè)置可見性
            mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            //默認鍵入對象
            mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            //設(shè)置Java 8 時間序列化
            JavaTimeModule timeModule = new JavaTimeModule();
            timeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            timeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
            timeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
            timeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            timeModule.addDeserializer(LocalDate.class, new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
            timeModule.addDeserializer(LocalTime.class, new LocalTimeDeserializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
            //禁用把時間轉(zhuǎn)為時間戳
            mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
            mapper.registerModule(timeModule);
            return mapper;
        }
        @Bean
        RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            return container;
        }
    }

    和我們之前整合redis差不多,只不過在最后增加了一個redis消息監(jiān)聽監(jiān)聽容器RedisMessageListenerContainer

    3.3、監(jiān)聽器

    RedisKeyExpirationListener

    package com.alian.expiration.listener;
    import com.alian.expiration.service.RedisExpirationService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.stereotype.Component;
    @Slf4j
    @Component
    public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
        @Autowired
        private RedisExpirationService redisExpirationService;
    	// 把我們上面一步配置的bean注入進去
        public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
            super(listenerContainer);
        }
        /**
         * 針對redis數(shù)據(jù)失效事件,進行數(shù)據(jù)處理
         *
         * @param message
         * @param pattern
         */
        @Override
        public void onMessage(Message message, byte[] pattern) {
            // 用戶做自己的業(yè)務(wù)處理即可,注意message.toString()可以獲取失效的key
            String expiredKey = message.toString();
            log.info("onMessage --> redis 過期的key是:{}", expiredKey);
            try {
                // 對過期key進行處理
                redisExpirationService.processingExpiredKey(expiredKey);
                log.info("過期key處理完成:{}", expiredKey);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("處理redis 過期的key異常:{}", expiredKey, e);
            }
        }
    }

    實現(xiàn)的步驟如下:

    • 繼承KeyExpirationEventMessageListener

    • 把redis消息監(jiān)聽監(jiān)聽容器RedisMessageListenerContainer 注入到密鑰空間事件消息偵 聽器中

    • 重寫onMessage方法

    • 通過Message 的 toString() 方法就可以獲取到過期的key

    • 對key中關(guān)鍵信息進行業(yè)務(wù)處理,比如 id

    3.4、服務(wù)類

    RedisExpirationService

    package com.alian.expiration.service;
    import com.alian.expiration.util.SignUtils;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Service;
    import java.util.concurrent.TimeUnit;
    @Slf4j
    @Service
    public class RedisExpirationService {
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
        public void processingExpiredKey(String expiredKey) {
            // 如果是優(yōu)惠券的key(一定要規(guī)范命名)
            if (expiredKey.startsWith("com.mall.coupon.id")) {
                // 臨時key,此key可以在業(yè)務(wù)處理完,然后延遲一定時間刪除,或者不處理
                String tempKey = SignUtils.md5(expiredKey, "UTF-8");
                // 臨時key不存在才設(shè)置值,key超時時間為10秒(此處相當于分布式鎖的應(yīng)用)
                Boolean exist = redisTemplate.opsForValue().setIfAbsent(tempKey, "1", 10, TimeUnit.SECONDS);
                if (Boolean.TRUE.equals(exist)) {
                    log.info("Business Handing...");
                    // 比如截取里面的id,然后關(guān)聯(lián)數(shù)據(jù)庫進行處理
                } else {
                    log.info("Other service is handing...");
                }
            } else {
                log.info("Expired keys without processing");
            }
        }
    }

    基本流程如下:

    • 判斷是否是需要處理的key,一般這種key通過命名規(guī)范加以處理

    • 以當前key生成一個新的key作為分布式key

    • 如果redis中不存在這個新的key,則為新的key設(shè)置一個值,達到分布式服務(wù)處理(核心)

    • 設(shè)置成功的,進行業(yè)務(wù)處理;設(shè)置失敗了,說明其他服務(wù)正在處理這個key

    • 根據(jù) key 的關(guān)鍵信息(比如截取id),進行業(yè)務(wù)處理

    3.5、工具類

    SignUtils

    package com.alian.expiration.util;
    import java.security.MessageDigest;
    public class SignUtils {
        public static final String md5(String s, String charset) {
            char[] hexDigits = new char[]{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
            try {
                byte[] btInput = s.getBytes(charset);
                MessageDigest mdInst = MessageDigest.getInstance("MD5");
                mdInst.update(btInput);
                byte[] md = mdInst.digest();
                int j = md.length;
                char[] str = new char[j * 2];
                int k = 0;
                for (byte byte0 : md) {
                    str[k++] = hexDigits[byte0 >>> 4 & 15];
                    str[k++] = hexDigits[byte0 & 15];
                }
                return new String(str);
            } catch (Exception var11) {
                return "";
            }
        }
    }

    四、測試

    4.1、測試類

    簡單模擬下發(fā)送一個優(yōu)惠券數(shù)據(jù)到redis,然后設(shè)置超時時間

    package com.alian.expiration;
    import lombok.extern.slf4j.Slf4j;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeUnit;
    @Slf4j
    @RunWith(SpringJUnit4ClassRunner.class)
    @SpringBootTest
    public class RedisKeyExpirationTest {
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
        @Test
        public void keyExpiration() {
            // 優(yōu)惠券信息
            String id = "2023021685264735";
            Map<String, String> map = new HashMap<>();
            map.put("id", id);
            map.put("amount", "1000");
            map.put("type", "1001");
            map.put("describe", "滿減紅包");
            // 緩存到redis
            redisTemplate.opsForHash().putAll("com.mall.coupon.id." + id, map);
            // 設(shè)置過期時間
            redisTemplate.expire("com.mall.coupon.id." + id, 10, TimeUnit.SECONDS);
        }
    }

    4.2、單實例

    單實例就是服務(wù)只部署了一份,我們啟動一份,端口是8090,然后通過上面的測試類,發(fā)送一個消息,結(jié)果如下:

    10:23:39 701 INFO [container-2]:onMessage --> redis 過期的key是:com.mall.coupon.id.2023021685264735
    10:23:39 988 INFO [container-2]:Business Handing...
    10:23:39 989 INFO [container-2]:過期key處理完成:com.mall.coupon.id.2023021685264735
    10:23:50 005 INFO [container-3]:onMessage --> redis 過期的key是:450FCC35415BADC16805962CA5BC7E12
    10:23:50 005 INFO [container-3]:Expired keys without processing
    10:23:50 005 INFO [container-3]:過期key處理完成:450FCC35415BADC16805962CA5BC7E12

    4.3、多實例

    多實例就是服務(wù)部署了多份,比如我們啟動兩份,端口分別為8090和8091,然后通過上面的測試類,發(fā)送一個消息,8090端口的服務(wù)結(jié)果如下(Business Handing&hellip;):

    11:39:06 691 INFO [container-2]:onMessage --> redis 過期的key是:com.mall.coupon.id.2023021685264735
    11:39:06 707 INFO [container-2]:Business Handing...
    11:39:06 707 INFO [container-2]:過期key處理完成:com.mall.coupon.id.2023021685264735
    11:39:16 796 INFO [container-3]:onMessage --> redis 過期的key是:450FCC35415BADC16805962CA5BC7E12
    11:39:16 796 INFO [container-3]:Expired keys without processing
    11:39:16 796 INFO [container-3]:過期key處理完成:450FCC35415BADC16805962CA5BC7E12

    8091端口的服務(wù)結(jié)果如下(Other service is handing&hellip;):

    11:39:06 691 INFO [container-2]:onMessage --> redis 過期的key是:com.mall.coupon.id.2023021685264735
    11:39:06 707 INFO [container-2]:Other service is handing...
    11:39:06 707 INFO [container-2]:過期key處理完成:com.mall.coupon.id.2023021685264735
    11:39:16 796 INFO [container-3]:onMessage --> redis 過期的key是:450FCC35415BADC16805962CA5BC7E12
    11:39:16 796 INFO [container-3]:Expired keys without processing
    11:39:16 796 INFO [container-3]:過期key處理完成:450FCC35415BADC16805962CA5BC7E12

    結(jié)果分析:

    • 多實例的情況下,每個實例都會收到過期key通知

    • 通過redis分布式鎖,實現(xiàn)只有一個實例會進行業(yè)務(wù)處理,防止重復(fù)

    • 使用分布式鎖會有一個新的key過期,并且收到該key的通知,你可以業(yè)務(wù)執(zhí)行完延遲一定時間(避免重復(fù)執(zhí)行),再刪除,也可以不處理(因為本就不是要處理業(yè)務(wù)的key)

    “Redis監(jiān)聽過期的key實現(xiàn)流程是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

    向AI問一下細節(jié)

    免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI