溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

SpringBoot?Redis發(fā)布訂閱模式的方法是什么

發(fā)布時(shí)間:2021-12-20 16:30:47 來(lái)源:億速云 閱讀:182 作者:iii 欄目:開(kāi)發(fā)技術(shù)

本篇內(nèi)容主要講解“SpringBoot Redis發(fā)布訂閱模式的方法是什么”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“SpringBoot Redis發(fā)布訂閱模式的方法是什么”吧!

注意:redis的發(fā)布訂閱模式不可以將消息進(jìn)行持久化,訂閱者發(fā)生網(wǎng)絡(luò)斷開(kāi)、宕機(jī)等可能導(dǎo)致錯(cuò)過(guò)消息。

Redis命令行下使用發(fā)布訂閱

publish 發(fā)布

發(fā)布者通過(guò)以下命令可以往指定channel發(fā)布message

redis> publish channel message

subscribe 訂閱

訂閱者通過(guò)以下命令可以訂閱一個(gè)或多個(gè)頻道,如果頻道不存在則會(huì)創(chuàng)建

redis> subscribe channel [channel ...]

對(duì)于redis的發(fā)布訂閱的命令就這么簡(jiǎn)單。那么接下來(lái)我們?cè)趕pringboot中如何使用發(fā)布訂閱的功能呢?

SpringBoot中使用Redis的發(fā)布訂閱功能

添加依賴配置redis信息和連接池什么的就不說(shuō)了,如果添加的有commons-pool2依賴的話,會(huì)自動(dòng)幫我們配置redis連接池的

發(fā)布者

相對(duì)于訂閱者來(lái)說(shuō),發(fā)布者的實(shí)現(xiàn)方式很簡(jiǎn)單,以下方式就可以往channel中發(fā)送message了。

@Resource
    private RedisTemplate<String, Object> redisTemplate;

    public void publish(){
    // 使用高級(jí)的redisTemplate
    redisTemplate.convertAndSend("channel","message");
    
    // 使用低級(jí)的connection 實(shí)際上redisTemplate的底層就是使用的下面的方式
    redisTemplate.execute(new RedisCallback<Object>() {
          @Override
          public Object doInRedis(RedisConnection connection) throws DataAccessException {
              connection.publish("channel".getBytes(StandardCharsets.UTF_8), "message".getBytes(StandardCharsets.UTF_8));
              return null;
         }
     }, true);
     // true這個(gè)參數(shù)意思是 是否將redis連接暴露給回調(diào)代碼,大多數(shù)情況下設(shè)置true就可以了,往后深入的話可以看到
     RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); 如果為false的話會(huì)創(chuàng)建redis連接的代理
}

訂閱者

訂閱者因?yàn)樯婕暗竭B接、線程等 所以內(nèi)容相對(duì)會(huì)多一點(diǎn)

@Resource
    private RedisTemplate<String, Object> redisTemplate;

    public void subscribe() {
        redisTemplate.execute(new RedisCallback<Object>() {
            @Override
            public Object doInRedis(RedisConnection connection) throws DataAccessException {
                // 我定義了一個(gè)全局的 ConcurrentHashMap 用來(lái)存放連接 因?yàn)楹竺娴娜∠嗛喌木€程要和訂閱的線程用同一個(gè)連接
                map.put("connection",connection);

                // subscribe 按頻道訂閱 該方法會(huì)阻塞該線程 只有取消訂閱才會(huì)釋放該線程
                connection.subscribe(new MessageListener() {
                    @Override
                    public void onMessage(Message message, byte[] pattern) {
                        log.info("接收到消息");
                        System.out.println(new String(message.getBody()));
                    }
                }, "channelOne".getBytes(StandardCharsets.UTF_8), "channelTwo".getBytes(StandardCharsets.UTF_8));

                // 按模式訂閱 pSubscribe 只有取消訂閱才會(huì)釋放該線程
//                connection.pSubscribe(new MessageListener() {
//                    @Override
//                    public void onMessage(Message message, byte[] pattern) {
//                        System.out.println(new String(message.getBody()));
//                    }
//                }, "patternOne".getBytes(StandardCharsets.UTF_8), "patternOne".getBytes(StandardCharsets.UTF_8));
                return null;
            }
        }, true);
    }

如何取消訂閱呢?從剛才的map里取到連接

RedisConnection the = map.get("connection");
    Subscription subscription = the.getSubscription();
    subscription.unsubscribe();

消息監(jiān)聽(tīng)容器

上面的那種訂閱為低級(jí)訂閱,由于連接在調(diào)用subscribe的時(shí)候會(huì)導(dǎo)致當(dāng)前線程阻塞,這種方式需要對(duì)每個(gè)監(jiān)聽(tīng)器連接和線程管理,所以spring提供了RedisMessageListenerContainer類來(lái)幫我們完成這些工作。

RedisMessageListenerContainer顧名思義可以知道它是一個(gè)消息監(jiān)聽(tīng)容器
詳情請(qǐng)參考官方文檔

如何實(shí)現(xiàn)

@Configuration
public class DefaultMessageListenerContainerConfig {

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory factory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        // 官方推薦我們使用自定義的線程池或者使用TaskExecutor
        container.setTaskExecutor(executor());
        container.addMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message, byte[] pattern) {
                System.out.println(Thread.currentThread().getName() + ": " + new String(message.getBody()));
            }
        }, new ChannelTopic("message"));
        return container;
    }

    @Bean
    public TaskExecutor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
        executor.setQueueCapacity(100);
        executor.initialize();
        return executor;
    }
}

這個(gè)時(shí)候我們?cè)趓edis命令行內(nèi)使用 publish channel message 的時(shí)候,我們的spring程序就可以訂閱到消息了。

再說(shuō)下 MessageListenerAdapter
我們可以通過(guò) MessageListenerAdapter 消息接收者包裝進(jìn)去,消息接收者不會(huì)和redis有任何耦合。
官方文檔給了spring傳統(tǒng)的xml的方式配置的,下面我給出基于configuration配置的代碼

public interface MessageDelegate {
    void handleMessage(String message);
}

public class DefaultMessageDelegate implements MessageDelegate {
    @Override
    public void handleMessage(String message) {
        System.out.println(message);
    }
}

@Configuration
public class MessageListenerContainerConfig {

    @Autowired
    private DefaultMessageDelegate defaultMessageDelegate;

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory factory,
                                                   MessageListenerAdapter messageListenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(factory);
        container.setTaskExecutor(executor());

        Map<MessageListenerAdapter, Collection<? extends Topic>> map = new HashMap<>();
        List<ChannelTopic> channelTopics = new ArrayList<>();
        ChannelTopic channelTopic = new ChannelTopic("message");
        channelTopics.add(channelTopic);
        map.put(messageListenerAdapter, channelTopics);
        container.setMessageListeners(map);

        return container;
    }

    @Bean
    public TaskExecutor executor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
        executor.setQueueCapacity(100);
        executor.initialize();
        return executor;
    }

    @Bean
    public MessageListenerAdapter messageListenerAdapter() {
        // handleMessage 參數(shù)消息來(lái)的時(shí)候要調(diào)用的方法 默認(rèn)是 handleMessage
        return new MessageListenerAdapter(defaultMessageDelegate, "handleMessage");
    }
}

如果我們要在程序運(yùn)行時(shí)添加訂閱或者取消訂閱的時(shí)候該怎么辦呢?
我們需要提前準(zhǔn)備好消息偵聽(tīng)器,添加的時(shí)候把偵聽(tīng)器注入到消息容器
取消的時(shí)候就調(diào)用消息容器的remove方法把偵聽(tīng)器刪除掉即可。

到此,相信大家對(duì)“SpringBoot Redis發(fā)布訂閱模式的方法是什么”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI