溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

怎么利用Jedis實(shí)現(xiàn)Redis的訂閱與發(fā)布

發(fā)布時(shí)間:2020-11-24 16:04:43 來(lái)源:億速云 閱讀:1082 作者:Leah 欄目:編程語(yǔ)言

怎么利用Jedis實(shí)現(xiàn)Redis的訂閱與發(fā)布?很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來(lái)學(xué)習(xí)下,希望你能有所收獲。

什么是Redis發(fā)布訂閱

Redis發(fā)布訂閱是一種消息通信模式,發(fā)送者通過(guò)通道A發(fā)送消息message,訂閱過(guò)通道A的客戶端就可以接收到消息message。嗯度娘上面的解釋要比我所說(shuō)的好多了,而我所理解的就是:所謂的訂閱發(fā)布模式,其實(shí)和我們看電視,聽廣播差不多,在我們沒(méi)有調(diào)臺(tái)(換頻道)的時(shí)候,那個(gè)頻道也是在傳遞消息的(發(fā)布)。我們換到那個(gè)頻道上(訂閱)就能接收到消息了。是的,雖然可能有些不恰當(dāng)~

Redis訂閱與發(fā)布命令

首先聲明,有關(guān)Redis服務(wù)器的搭建工作,請(qǐng)自行查閱相關(guān)資料進(jìn)行環(huán)境搶建

聽說(shuō)Redis中發(fā)布與訂閱只有簡(jiǎn)單的6個(gè)命令。即:

PSUBSCRIBE pattern [pattern ...]

訂閱一個(gè)或者多個(gè)符合pattern格式的頻道

PUBLISH channel message

發(fā)布消息到chanel中

PUBSUB subcommand [argument [argument ...]]

查看訂閱與發(fā)布系統(tǒng)狀態(tài)

PUNSUBSCRIBE [pattern [pattern ...]]

退訂所有符合格式的頻道

SUBSCRIBE channel [channel ...]

訂閱一個(gè)或者多個(gè)頻道

UNSUBSCRIBE [channel [channel ...]]

取消訂閱頻道

例1 - SUBSCRIBE

連接redis后鍵入命令

SUBSCRIBE study

這樣便訂閱了一個(gè)名為study的頻道。

接下來(lái)study頻道要發(fā)消息啦?!?/p>

例2 - PUBLISH

另開啟一個(gè)客戶端,我使用的是品字形布局的最上面那個(gè)做為發(fā)布者,鍵入

PUBLISH study "message1-go go go"

可以看到,當(dāng)客戶端1在study頻道發(fā)布消息時(shí),客戶端2(已訂閱study頻道)可以接收到c1發(fā)布的消息,而客戶端3由于沒(méi)有訂閱study頻道,所以接收不到c1發(fā)送的消息。

例3 - PSUBSCRIBE

現(xiàn)在,跟著博主左手,右手一個(gè)慢動(dòng)作。在c3中鍵入

PSUBSCRIBE study*

OK,現(xiàn)在在c1中鍵入

PUBLISH study "message2"

上結(jié)果圖:

c3采用的通配符的形式,也將study頻道給訂閱成功了。

接下來(lái),在c1中繼續(xù)鍵入命令:

PUBLISH study:java "I hate java forever"

可以看到,使用psubscribe不僅將study頻道訂閱了,而且將以study為首的頻道也訂閱了。

例4 - PUBSUB

在c1中鍵入pubsub channel,可以獲得:

127.0.0.1:6379> PUBSUB channels

1) "study"

意為當(dāng)前正在活躍的頻道。

Jedis實(shí)現(xiàn)訂閱發(fā)布者模式

好了,上面通過(guò)命令行熟悉了一下Redis中有關(guān)訂閱發(fā)布者模式的相關(guān)命令。下面我們要將redis的訂閱與發(fā)布者嵌入到項(xiàng)目中。

首先,我們使用jedis先訂閱一個(gè)名為:study的頻道

然后我們先從命令行處進(jìn)行消息發(fā)布:

之后 ,我們使用jedis在項(xiàng)目中進(jìn)行消息發(fā)布:

我們可以進(jìn)行正常的通信 ~噢耶~

核心代碼:

PublishMessage.java 用于開啟一個(gè)發(fā)布消息的線程

private Logger logger = LoggerFactory.getLogger(PublishMessage.class);

@Resource
private JedisCluster jedisCluster;

/**
 * 發(fā)布消息
 *
 * @param channel 頻道
 * @param message 信息
 */
public void sendMessage(final String channel, final String message) {
  Thread thread = new Thread(() -> {
    Long publish = jedisCluster.publish(channel, message);
    logger.info("服務(wù)器在: {} 頻道發(fā)布消息{} - {}", channel, message, publish);
  });
  logger.info("發(fā)布線程啟動(dòng):");
  thread.setName("publishThread");
  thread.start();
}

ChatSubscribe.java用于處理訂閱相關(guān)事件,繼承自JedisPubSub

private Logger logger = LoggerFactory.getLogger(ChatSubscribe.class);

// 取得訂閱的消息后的處理
@Override
public void onMessage(String channel, String message) {
  logger.info("訂閱成功,接收到的消息為:頻道-{},消息-{}", channel, message);
  RedisString.message = message;
}

// 取得按表達(dá)式的方式訂閱的消息后的處理
@Override
public void onPMessage(String pattern, String channel, String message) {
  System.out.println("-----取得按表達(dá)式的方式訂閱的消息后的處理-----");
  System.out.println(pattern + "=" + channel + "=" + message);
}

// 初始化按表達(dá)式的方式訂閱時(shí)候的處理
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
  System.out.println("-----初始化按表達(dá)式的方式訂閱時(shí)候的處理-----");
  System.out.println(pattern + "=" + subscribedChannels);
}

// 取消按表達(dá)式的方式訂閱時(shí)候的處理
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
  System.out.println("-----取消按表達(dá)式的方式訂閱時(shí)候的處理-----");
  System.out.println(pattern + "=" + subscribedChannels);
}

@Override
public void onPong(String pattern) {
  super.onPong(pattern);
}

// 初始化訂閱時(shí)候的處理
@Override
public void onSubscribe(String channel, int subscribedChannels) {
  logger.info("初始化訂閱信息:頻道-{},訂閱頻道-{}", channel, subscribedChannels);
}

// 取消訂閱時(shí)候的處理
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
  logger.info("已取消訂閱頻道{}", channel);
}

SubScribeMessage.java 訂閱頻道,取消頻道等動(dòng)作類

private Logger logger = LoggerFactory.getLogger(SubScribeMessage.class);

private ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

@Resource
private JedisCluster jedisCluster;
/**
   * 訂閱頻道
   *
   * @param channel     頻道
   * @param roomSubListerner
   */
  public void subscribeChannel(final String channel, final ChatSubscribe roomSubListerner) {

    cachedThreadPool.execute(new Runnable() {
      @Override
      public void run() {
        jedisCluster.subscribe(roomSubListerner, channel);
      }
    });
  }

jedisCluster是否封裝工具類,取自各位看官,核心代碼已給出,請(qǐng)各位看官根據(jù)自身業(yè)務(wù)與邏輯,自行更改與優(yōu)化代碼。

本次示例程序采用tomcat 9.0 + spring + springmvc

使用了諸如:@RestController,@GetMapping等相關(guān)注解,便于開發(fā),有興趣可自行查閱spring相關(guān)資料。

看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝您對(duì)億速云的支持。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI