溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

用純Java實現(xiàn)一個即時通訊系統(tǒng)

發(fā)布時間:2021-06-24 11:45:54 來源:億速云 閱讀:715 作者:chen 欄目:編程語言

這篇文章主要介紹“用純Java實現(xiàn)一個即時通訊系統(tǒng)”,在日常操作中,相信很多人在用純Java實現(xiàn)一個即時通訊系統(tǒng)問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”用純Java實現(xiàn)一個即時通訊系統(tǒng)”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

 項目背景

和各位讀者大致介紹下具體場景,線上的小程序中開放一些語音麥克風(fēng)的房間,讓用戶進(jìn)入房間之后可以互相通過語音聊天的方式進(jìn)行互動。

用純Java實現(xiàn)一個即時通訊系統(tǒng)

這里分享一下相關(guān)的技術(shù)設(shè)計方案。這款系統(tǒng)的核心點設(shè)計在于如何能讓一個用戶發(fā)出的語音通知到其他用戶上邊。語音數(shù)據(jù)在客戶端同事的處理下最終變成了io數(shù)據(jù)流請求到了后端,后端只需要將這些數(shù)據(jù)流傳達(dá)給各個不同的終端即可達(dá)到廣播通知的效果。

單機版架構(gòu)

最初期上線的時候,為了趕速度,快速試錯,所以簡單地采用了單機版架構(gòu)去設(shè)計。結(jié)合技術(shù)棧為 SpringBoot,WebSocket,MySQL技術(shù)。

線上一間語音房間的同時在線人數(shù)并不會特別多,大概在15-50人的區(qū)間段內(nèi),系統(tǒng)核心代碼是通過SpringBoot內(nèi)部的WebSocket技術(shù)去進(jìn)行數(shù)據(jù)的主動推送。

設(shè)計思路

整體的設(shè)計圖比較簡單,基本就是一臺服務(wù)器存儲WebSocket連接,如下圖所示:

用純Java實現(xiàn)一個即時通訊系統(tǒng)

用戶進(jìn)行WebSocket初始化連接的時候需要一個連接分配和存儲的過程:

用純Java實現(xiàn)一個即時通訊系統(tǒng)

早期的存儲是存放在了服務(wù)器本地的一個Map集合中。

用純Java實現(xiàn)一個即時通訊系統(tǒng)

當(dāng)WebSocket進(jìn)行連接的時候就會往內(nèi)存中寫入一條數(shù)據(jù)信息,當(dāng)鏈接斷開的時候,就將內(nèi)存中的數(shù)據(jù)移除。然后進(jìn)行語音廣播的時候需要結(jié)合WebSocket內(nèi)部的廣播發(fā)送功能進(jìn)行通知

用純Java實現(xiàn)一個即時通訊系統(tǒng)

看似設(shè)計比較簡單,但是在后期業(yè)務(wù)變得龐大的時候出現(xiàn)了瓶頸。因為隨著參加語音活動用戶的增加,越來越多的WebSocketSession對象需要被存儲到內(nèi)存當(dāng)中,這種有狀態(tài)性的存儲對于單機擴容不靈活。

設(shè)計缺陷

1.假設(shè)原先的服務(wù)器擴容到了A,B兩臺機器,A用戶在A機器上邊建立了WebSocketSession,B用戶在B機器上邊建立的WebSocketSession連接。此時如果A想要和B進(jìn)行對話發(fā)送,需要先查找到具體WebSocketSession存放在哪臺機器上邊。

2.當(dāng)用戶出現(xiàn)了網(wǎng)絡(luò)異常,臨時斷開連接進(jìn)行重連的時候,也可能會出現(xiàn)1所說的問題。

集群架構(gòu)

設(shè)計思路

一旦出現(xiàn)需要發(fā)送語音通知的時候,發(fā)送一條廣播的mq消息,每個機器都接收到消息之后,觸發(fā)自己的廣播操作即可。

RocketMq的接入系統(tǒng)設(shè)計里面mq采用的是廣播模式,這和我們通常使用的集群模式有一定的區(qū)別。

消息隊列RocketMQ版是基于發(fā)布或訂閱模型的消息系統(tǒng)。消費者,即消息的訂閱方訂閱關(guān)注的Topic,以獲取并消費消息。由于消費者應(yīng)用一般是分布式系統(tǒng),以集群方式部署,因此消息隊列RocketMQ版約定以下概念:

  •  集群:使用相同Group ID的消費者屬于同一個集群。同一個集群下的消費者消費邏輯必須完全一致(包括Tag的使用)。

  •  集群消費:當(dāng)使用集群消費模式時,消息隊列RocketMQ版認(rèn)為任意一條消息只需要被集群內(nèi)的任意一個消費者處理即可。

  •  廣播消費:當(dāng)使用廣播消費模式時,消息隊列RocketMQ版會將每條消息推送給集群內(nèi)所有注冊過的消費者,保證消息至少被每個消費者消費一次。

集群消費模式適用場景 適用于消費端集群化部署,每條消息只需要被處理一次的場景。此外,由于消費進(jìn)度在服務(wù)端維護,可靠性更高。具體消費示例如下圖所示。

用純Java實現(xiàn)一個即時通訊系統(tǒng)

注意事項

  •  集群消費模式下,每一條消息都只會被分發(fā)到一臺機器上處理。如果需要被集群下的每一臺機器都處理,請使用廣播模式。

  •  集群消費模式下,不保證每一次失敗重投的消息路由到同一臺機器上。

廣播消費模式適用場景 適用于消費端集群化部署,每條消息需要被集群下的每個消費者處理的場景。具體消費示例如下圖所示。

用純Java實現(xiàn)一個即時通訊系統(tǒng)

注意事項

  •  廣播消費模式下不支持順序消息。

  •  廣播消費模式下不支持重置消費位點。

  •  每條消息都需要被相同訂閱邏輯的多臺機器處理。

  •  消費進(jìn)度在客戶端維護,出現(xiàn)重復(fù)消費的概率稍大于集群模式。

  •  廣播模式下,消息隊列RocketMQ版保證每條消息至少被每臺客戶端消費一次,但是并不會重投消費失敗的消息,因此業(yè)務(wù)方需要關(guān)注消費失敗的情況。

  •  廣播模式下,客戶端每一次重啟都會從最新消息消費??蛻舳嗽诒煌V蛊陂g發(fā)送至服務(wù)端的消息將會被自動跳過,請謹(jǐn)慎選擇。

  •  廣播模式下,每條消息都會被大量的客戶端重復(fù)處理,因此推薦盡可能使用集群模式。

  •  廣播模式下服務(wù)端不維護消費進(jìn)度,所以消息隊列RocketMQ版控制臺不支持消息堆積查詢、消息堆積報警和訂閱關(guān)系查詢功能。

這里面的應(yīng)用場景需要對集群內(nèi)部對每個消費者都對服務(wù)器內(nèi)存中的socket連接進(jìn)行session是否存在對判斷,因此需要采用mq的廣播模式。

關(guān)于mq部分的接入代碼

Consumer模塊的配置:

package org.idea.web.socket.config;  import org.springframework.boot.context.properties.ConfigurationProperties;  /**   * @Author linhao   * @Date created in 10:30 上午 2021/5/10   */  @ConfigurationProperties(prefix = "rocketmq.consumer")  public class MqConsumerConfig {      private boolean isOn;      private String groupName;      private String nameSrvAddr;      private String topics;      private Integer consumeThreadMin;      private Integer consumeThreadMax;      private Integer consumeMessageBatchMaxSize;        /**       getter 和 setter部分省略      **/  }

Producer模塊的配置展示:

package org.idea.web.socket.config;  import org.springframework.boot.context.properties.ConfigurationProperties;  /**   * @Author linhao   * @Date created in 10:26 上午 2021/5/10   */  @ConfigurationProperties(prefix = "rocketmq.producer")  public class MqProducerConfig {      private boolean isOn;      private String groupName;      private String nameSrvAddr;      private Integer maxMessageSize;      private Integer sendMsgTimeout;      private Integer retryTimesWhenSendFailed;        /**       getter 和 setter部分省略      **/  }

RocketMq內(nèi)部的消費端Bean配置

package org.idea.web.socket.mq;  import lombok.extern.slf4j.Slf4j;  import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;  import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;  import org.apache.rocketmq.client.exception.MQClientException;  import org.apache.rocketmq.common.consumer.ConsumeFromWhere;  import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;  import org.idea.web.socket.config.MqConsumerConfig;  import org.idea.web.socket.config.MqProducerConfig;  import org.springframework.boot.autoconfigure.AutoConfigureAfter;  import org.springframework.boot.autoconfigure.AutoConfigureBefore;  import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;  import org.springframework.boot.context.properties.EnableConfigurationProperties;  import org.springframework.context.annotation.Bean;  import org.springframework.context.annotation.Configuration;  import javax.annotation.Resource;  /**   * @Author linhao   * @Date created in 10:34 上午 2021/5/10   */  @Configuration  @Slf4j  @EnableConfigurationProperties({MqConsumerConfig.class})  public class MqConsumerAutoConfig {      @Resource      private MqConsumerConfig mqConsumerConfig;      @Resource      //這個接口需要手動實現(xiàn)順序消費的邏輯 每次獲取到消息隊列的第一條數(shù)據(jù)      private MessageListenerHandler messageListenerConcurrently;      @Bean      @ConditionalOnMissingBean      public DefaultMQPushConsumer defaultMQPushConsumer() {          DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();          consumer.setNamesrvAddr(mqConsumerConfig.getNameSrvAddr());          consumer.setConsumerGroup(mqConsumerConfig.getGroupName());          consumer.setConsumeThreadMin(mqConsumerConfig.getConsumeThreadMin());          consumer.setConsumeThreadMax(mqConsumerConfig.getConsumeThreadMax());          consumer.registerMessageListener(messageListenerConcurrently);          consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);          //消費模型是什么?          consumer.setMessageModel(MessageModel.BROADCASTING);          //默認(rèn)一次拉取一條消費          consumer.setConsumeMessageBatchMaxSize(mqConsumerConfig.getConsumeMessageBatchMaxSize());          //*表示訂閱所有的tag          try {              consumer.subscribe(mqConsumerConfig.getTopics(), "*");              consumer.start();              log.info("【 MqConsumerAutoConfig 】mq consumer is started!");          } catch (Exception e) {              log.error("mq start fail,e is ", e);          }          return consumer;      }  }

RocketMq的服務(wù)生產(chǎn)者Bean配置

package org.idea.web.socket.mq;  import lombok.extern.slf4j.Slf4j;  import org.apache.rocketmq.client.producer.DefaultMQProducer;  import org.idea.web.socket.config.MqProducerConfig;  import org.springframework.boot.autoconfigure.AutoConfigureAfter;  import org.springframework.boot.autoconfigure.AutoConfigureBefore;  import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;  import org.springframework.boot.context.properties.EnableConfigurationProperties;  import org.springframework.context.annotation.Bean;  import org.springframework.context.annotation.Configuration;  import javax.annotation.Resource;  /**   * @Author linhao   * @Date created in 11:05 上午 2021/5/10   */  @Configuration  @Slf4j  @EnableConfigurationProperties({MqProducerConfig.class})  public class MqProducerAutoConfig {      @Resource      private MqProducerConfig mqProducerConfig;      @Bean      @ConditionalOnMissingBean      //意味著DefaultMQProducer的配置可以被覆蓋      public DefaultMQProducer defaultMQProducer() {          DefaultMQProducer producer = new DefaultMQProducer(mqProducerConfig.getGroupName());          producer.setNamesrvAddr(mqProducerConfig.getNameSrvAddr());          //沒有則自動創(chuàng)建topic的key  //        producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");          producer.setMaxMessageSize(mqProducerConfig.getMaxMessageSize());         producer.setSendMsgTimeout(mqProducerConfig.getSendMsgTimeout());          producer.setRetryTimesWhenSendFailed(mqProducerConfig.getRetryTimesWhenSendFailed());          try {              producer.start();              log.info("【 MqProducerAutoConfig 】mq producer is started!");          } catch (Exception e) {              log.error("[MqProducerAutoConfig] start fail, e is ", e);          }          return producer;      }  }

然后是對RocketMq內(nèi)部發(fā)送消息事件的一層函數(shù)封裝

package org.idea.web.socket.mq;  import com.alibaba.fastjson.JSON;  import lombok.extern.slf4j.Slf4j;  import org.apache.commons.lang3.StringUtils;  import org.apache.rocketmq.client.producer.DefaultMQProducer;  import org.apache.rocketmq.client.producer.SendResult;  import org.apache.rocketmq.common.message.Message;  import org.apache.rocketmq.remoting.common.RemotingHelper;  import org.idea.web.socket.config.MqProducerConfig;  import org.idea.web.socket.dto.BroadcastMqDTO;  import org.springframework.stereotype.Component;  import javax.annotation.Resource;  import java.io.UnsupportedEncodingException;  /**   * 消息廣播發(fā)送端   *   * @Author linhao   * @Date created in 10:43 下午 2021/5/9   */  @Component  @Slf4j  public class BroadcastMqProducer {      @Resource      private DefaultMQProducer defaultMQProducer;      @Resource      private MqProducerConfig mqProducerConfig;      private static String TOPIC = "ws-topic";      private static String TAGS = "ws-tag";      public static Integer ALL_USER_RECEIVE_TYPE = 1;      public static Integer ONE_USER_RECEIVE_TYPE = 2;      /**       * 點對點之間的消息發(fā)送       *       * @param destSessionKey       * @param msg       * @return       */      public SendResult sendWebSocketToUser(String destSessionKey,String msg) {          if (StringUtils.isEmpty(msg)) {              log.error("[sendWebSocketToUser] msg can not be null!");              return null;          }          Message message = null;          SendResult sendResult = null;          try {              BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();              broadcastMqDTO.setEventType(ONE_USER_RECEIVE_TYPE);              broadcastMqDTO.setMessage(msg);              broadcastMqDTO.setSessionKey(destSessionKey);              message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));              sendResult = defaultMQProducer.send(message);          } catch (Exception e) {              log.error("[sendWebSocketBroadcastMsg] e is ", e);          }          return sendResult;      }      /**       * 廣播消息發(fā)送       *       * @param msg      * @return       */      public SendResult sendWebSocketBroadcastMsg(String msg) {          if (StringUtils.isEmpty(msg)) {              log.error("[sendWebSocketBroadcastMsg] msg can not be null!");              return null;          }          Message message = null;          SendResult sendResult = null;          try {              BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();              broadcastMqDTO.setEventType(ALL_USER_RECEIVE_TYPE);              broadcastMqDTO.setMessage(msg);              message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));              sendResult = defaultMQProducer.send(message);          } catch (Exception e) {              log.error("[sendWebSocketBroadcastMsg] e is ", e);          }          return sendResult;      }  }

對消息的訂閱模塊實現(xiàn)代碼如下:

package org.idea.web.socket.mq;  import com.alibaba.fastjson.JSON;  import com.oracle.tools.packager.Log;  import lombok.extern.slf4j.Slf4j;  import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;  import org.apache.rocketmq.common.message.MessageExt;  import org.idea.web.socket.dto.BroadcastMqDTO;  import org.idea.web.socket.manager.SocketManager;  import org.springframework.messaging.simp.SimpMessagingTemplate;  import org.springframework.stereotype.Component;  import org.springframework.util.CollectionUtils;  import org.springframework.web.socket.WebSocketSession;  import javax.annotation.Resource;  import java.util.List;  import static org.idea.web.socket.mq.BroadcastMqProducer.ALL_USER_RECEIVE_TYPE;  import static org.idea.web.socket.mq.BroadcastMqProducer.ONE_USER_RECEIVE_TYPE;  /**   * @Author linhao   * @Date created in 10:59 上午 2021/5/10   */  @Component  @Slf4j  public class MessageListenerHandler implements MessageListenerConcurrently {      @Resource      private SocketManager socketManager;      @Resource      private SimpMessagingTemplate template;      @Override      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {          if (CollectionUtils.isEmpty(list)) {              Log.info("receive empty msg");              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;          }          MessageExt messageExt = list.get(0);          byte[] bytes = messageExt.getBody();          String json = new String(bytes);          BroadcastMqDTO broadcastMqDTO = JSON.parseObject(json, BroadcastMqDTO.class);          log.info("[MessageListenerHandler] broadcastMqDTO is " + broadcastMqDTO);          if (ALL_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {              log.info("[consumeMessage] 廣播發(fā)送消息:觸發(fā)----》消息內(nèi)容為:" + broadcastMqDTO);              template.convertAndSend("/topic/sendTopic", broadcastMqDTO);          } else if (ONE_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {              String sessionKey = broadcastMqDTO.getSessionKey();              WebSocketSession webSocketSession = socketManager.get(sessionKey);              if (webSocketSession != null) {                  template.convertAndSendToUser(sessionKey, "/queue/sendUser", broadcastMqDTO.getMessage());                  log.info("[consumeMessage] 點對點發(fā)送消息;觸發(fā)----》消息內(nèi)容為:" + broadcastMqDTO);              }          }          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;      }  }

整體設(shè)計結(jié)構(gòu)如下圖:

用純Java實現(xiàn)一個即時通訊系統(tǒng)

于是按照這個結(jié)構(gòu)進(jìn)行了一版本的緊急開發(fā)迭代,原先的單臺服務(wù)器擴展為了服務(wù)集群。

業(yè)務(wù)拓展后續(xù)產(chǎn)品經(jīng)理提出一個需求,要求支持在同一間房內(nèi)的兩個用戶之間發(fā)送悄悄話功能。這就需要我們進(jìn)行一個點對點之間傳輸通訊的功能了。因此需要在mq通知到每臺機器的時候加一個本地Session遍歷的邏輯,如果當(dāng)前機器存有用戶token對應(yīng)的session變量,那么就單獨針對那個Session進(jìn)行WebSocket的發(fā)送通知。

用純Java實現(xiàn)一個即時通訊系統(tǒng)

設(shè)計弊端一旦某臺機器出現(xiàn)了異常崩潰,那么就意味著這臺機器上的所有語音連接可能會出現(xiàn)中斷情況。目前這一塊的問題也在考慮解決,計劃是將WebSocketSession存入到分布式緩存的redis中保證數(shù)據(jù)可靠存儲,但是在后續(xù)嘗試的時候發(fā)現(xiàn)WebSocketSession對象沒有實現(xiàn)序列化接口,在存儲到Redis的時候會出現(xiàn)異常。目前這個問題還在尋找解決思路中,不知道各位讀者朋友們有什么好的思路。

遇到的問題點用戶請求直接訪問到了我們的內(nèi)部服務(wù)器,如果在請求的中間加入一臺nginx做負(fù)載均衡則需要在nginx中配置一些額外信息。

項目的源代碼比較多,這里我把核心部分的代碼整理了一份,感興趣的朋友可以到我的gitee上邊去下載:

https://gitee.com/IdeaHome_admin/socket-framework 

到此,關(guān)于“用純Java實現(xiàn)一個即時通訊系統(tǒng)”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI