您好,登錄后才能下訂單哦!
這篇文章主要介紹“用純Java實現(xiàn)一個即時通訊系統(tǒng)”,在日常操作中,相信很多人在用純Java實現(xiàn)一個即時通訊系統(tǒng)問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”用純Java實現(xiàn)一個即時通訊系統(tǒng)”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
和各位讀者大致介紹下具體場景,線上的小程序中開放一些語音麥克風(fēng)的房間,讓用戶進(jìn)入房間之后可以互相通過語音聊天的方式進(jìn)行互動。
這里分享一下相關(guān)的技術(shù)設(shè)計方案。這款系統(tǒng)的核心點設(shè)計在于如何能讓一個用戶發(fā)出的語音通知到其他用戶上邊。語音數(shù)據(jù)在客戶端同事的處理下最終變成了io數(shù)據(jù)流請求到了后端,后端只需要將這些數(shù)據(jù)流傳達(dá)給各個不同的終端即可達(dá)到廣播通知的效果。
最初期上線的時候,為了趕速度,快速試錯,所以簡單地采用了單機版架構(gòu)去設(shè)計。結(jié)合技術(shù)棧為 SpringBoot,WebSocket,MySQL技術(shù)。
線上一間語音房間的同時在線人數(shù)并不會特別多,大概在15-50人的區(qū)間段內(nèi),系統(tǒng)核心代碼是通過SpringBoot內(nèi)部的WebSocket技術(shù)去進(jìn)行數(shù)據(jù)的主動推送。
設(shè)計思路
整體的設(shè)計圖比較簡單,基本就是一臺服務(wù)器存儲WebSocket連接,如下圖所示:
用戶進(jìn)行WebSocket初始化連接的時候需要一個連接分配和存儲的過程:
早期的存儲是存放在了服務(wù)器本地的一個Map集合中。
當(dāng)WebSocket進(jìn)行連接的時候就會往內(nèi)存中寫入一條數(shù)據(jù)信息,當(dāng)鏈接斷開的時候,就將內(nèi)存中的數(shù)據(jù)移除。然后進(jìn)行語音廣播的時候需要結(jié)合WebSocket內(nèi)部的廣播發(fā)送功能進(jìn)行通知
看似設(shè)計比較簡單,但是在后期業(yè)務(wù)變得龐大的時候出現(xiàn)了瓶頸。因為隨著參加語音活動用戶的增加,越來越多的WebSocketSession對象需要被存儲到內(nèi)存當(dāng)中,這種有狀態(tài)性的存儲對于單機擴容不靈活。
設(shè)計缺陷
1.假設(shè)原先的服務(wù)器擴容到了A,B兩臺機器,A用戶在A機器上邊建立了WebSocketSession,B用戶在B機器上邊建立的WebSocketSession連接。此時如果A想要和B進(jìn)行對話發(fā)送,需要先查找到具體WebSocketSession存放在哪臺機器上邊。
2.當(dāng)用戶出現(xiàn)了網(wǎng)絡(luò)異常,臨時斷開連接進(jìn)行重連的時候,也可能會出現(xiàn)1所說的問題。
設(shè)計思路
一旦出現(xiàn)需要發(fā)送語音通知的時候,發(fā)送一條廣播的mq消息,每個機器都接收到消息之后,觸發(fā)自己的廣播操作即可。
RocketMq的接入系統(tǒng)設(shè)計里面mq采用的是廣播模式,這和我們通常使用的集群模式有一定的區(qū)別。
消息隊列RocketMQ版是基于發(fā)布或訂閱模型的消息系統(tǒng)。消費者,即消息的訂閱方訂閱關(guān)注的Topic,以獲取并消費消息。由于消費者應(yīng)用一般是分布式系統(tǒng),以集群方式部署,因此消息隊列RocketMQ版約定以下概念:
集群:使用相同Group ID的消費者屬于同一個集群。同一個集群下的消費者消費邏輯必須完全一致(包括Tag的使用)。
集群消費:當(dāng)使用集群消費模式時,消息隊列RocketMQ版認(rèn)為任意一條消息只需要被集群內(nèi)的任意一個消費者處理即可。
廣播消費:當(dāng)使用廣播消費模式時,消息隊列RocketMQ版會將每條消息推送給集群內(nèi)所有注冊過的消費者,保證消息至少被每個消費者消費一次。
集群消費模式適用場景 適用于消費端集群化部署,每條消息只需要被處理一次的場景。此外,由于消費進(jìn)度在服務(wù)端維護,可靠性更高。具體消費示例如下圖所示。
集群消費模式下,每一條消息都只會被分發(fā)到一臺機器上處理。如果需要被集群下的每一臺機器都處理,請使用廣播模式。
集群消費模式下,不保證每一次失敗重投的消息路由到同一臺機器上。
廣播消費模式適用場景 適用于消費端集群化部署,每條消息需要被集群下的每個消費者處理的場景。具體消費示例如下圖所示。
廣播消費模式下不支持順序消息。
廣播消費模式下不支持重置消費位點。
每條消息都需要被相同訂閱邏輯的多臺機器處理。
消費進(jìn)度在客戶端維護,出現(xiàn)重復(fù)消費的概率稍大于集群模式。
廣播模式下,消息隊列RocketMQ版保證每條消息至少被每臺客戶端消費一次,但是并不會重投消費失敗的消息,因此業(yè)務(wù)方需要關(guān)注消費失敗的情況。
廣播模式下,客戶端每一次重啟都會從最新消息消費??蛻舳嗽诒煌V蛊陂g發(fā)送至服務(wù)端的消息將會被自動跳過,請謹(jǐn)慎選擇。
廣播模式下,每條消息都會被大量的客戶端重復(fù)處理,因此推薦盡可能使用集群模式。
廣播模式下服務(wù)端不維護消費進(jìn)度,所以消息隊列RocketMQ版控制臺不支持消息堆積查詢、消息堆積報警和訂閱關(guān)系查詢功能。
這里面的應(yīng)用場景需要對集群內(nèi)部對每個消費者都對服務(wù)器內(nèi)存中的socket連接進(jìn)行session是否存在對判斷,因此需要采用mq的廣播模式。
Consumer模塊的配置:
package org.idea.web.socket.config; import org.springframework.boot.context.properties.ConfigurationProperties; /** * @Author linhao * @Date created in 10:30 上午 2021/5/10 */ @ConfigurationProperties(prefix = "rocketmq.consumer") public class MqConsumerConfig { private boolean isOn; private String groupName; private String nameSrvAddr; private String topics; private Integer consumeThreadMin; private Integer consumeThreadMax; private Integer consumeMessageBatchMaxSize; /** getter 和 setter部分省略 **/ }
Producer模塊的配置展示:
package org.idea.web.socket.config; import org.springframework.boot.context.properties.ConfigurationProperties; /** * @Author linhao * @Date created in 10:26 上午 2021/5/10 */ @ConfigurationProperties(prefix = "rocketmq.producer") public class MqProducerConfig { private boolean isOn; private String groupName; private String nameSrvAddr; private Integer maxMessageSize; private Integer sendMsgTimeout; private Integer retryTimesWhenSendFailed; /** getter 和 setter部分省略 **/ }
RocketMq內(nèi)部的消費端Bean配置
package org.idea.web.socket.mq; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.idea.web.socket.config.MqConsumerConfig; import org.idea.web.socket.config.MqProducerConfig; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; /** * @Author linhao * @Date created in 10:34 上午 2021/5/10 */ @Configuration @Slf4j @EnableConfigurationProperties({MqConsumerConfig.class}) public class MqConsumerAutoConfig { @Resource private MqConsumerConfig mqConsumerConfig; @Resource //這個接口需要手動實現(xiàn)順序消費的邏輯 每次獲取到消息隊列的第一條數(shù)據(jù) private MessageListenerHandler messageListenerConcurrently; @Bean @ConditionalOnMissingBean public DefaultMQPushConsumer defaultMQPushConsumer() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setNamesrvAddr(mqConsumerConfig.getNameSrvAddr()); consumer.setConsumerGroup(mqConsumerConfig.getGroupName()); consumer.setConsumeThreadMin(mqConsumerConfig.getConsumeThreadMin()); consumer.setConsumeThreadMax(mqConsumerConfig.getConsumeThreadMax()); consumer.registerMessageListener(messageListenerConcurrently); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //消費模型是什么? consumer.setMessageModel(MessageModel.BROADCASTING); //默認(rèn)一次拉取一條消費 consumer.setConsumeMessageBatchMaxSize(mqConsumerConfig.getConsumeMessageBatchMaxSize()); //*表示訂閱所有的tag try { consumer.subscribe(mqConsumerConfig.getTopics(), "*"); consumer.start(); log.info("【 MqConsumerAutoConfig 】mq consumer is started!"); } catch (Exception e) { log.error("mq start fail,e is ", e); } return consumer; } }
RocketMq的服務(wù)生產(chǎn)者Bean配置
package org.idea.web.socket.mq; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.idea.web.socket.config.MqProducerConfig; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; /** * @Author linhao * @Date created in 11:05 上午 2021/5/10 */ @Configuration @Slf4j @EnableConfigurationProperties({MqProducerConfig.class}) public class MqProducerAutoConfig { @Resource private MqProducerConfig mqProducerConfig; @Bean @ConditionalOnMissingBean //意味著DefaultMQProducer的配置可以被覆蓋 public DefaultMQProducer defaultMQProducer() { DefaultMQProducer producer = new DefaultMQProducer(mqProducerConfig.getGroupName()); producer.setNamesrvAddr(mqProducerConfig.getNameSrvAddr()); //沒有則自動創(chuàng)建topic的key // producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY"); producer.setMaxMessageSize(mqProducerConfig.getMaxMessageSize()); producer.setSendMsgTimeout(mqProducerConfig.getSendMsgTimeout()); producer.setRetryTimesWhenSendFailed(mqProducerConfig.getRetryTimesWhenSendFailed()); try { producer.start(); log.info("【 MqProducerAutoConfig 】mq producer is started!"); } catch (Exception e) { log.error("[MqProducerAutoConfig] start fail, e is ", e); } return producer; } }
然后是對RocketMq內(nèi)部發(fā)送消息事件的一層函數(shù)封裝
package org.idea.web.socket.mq; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.idea.web.socket.config.MqProducerConfig; import org.idea.web.socket.dto.BroadcastMqDTO; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.UnsupportedEncodingException; /** * 消息廣播發(fā)送端 * * @Author linhao * @Date created in 10:43 下午 2021/5/9 */ @Component @Slf4j public class BroadcastMqProducer { @Resource private DefaultMQProducer defaultMQProducer; @Resource private MqProducerConfig mqProducerConfig; private static String TOPIC = "ws-topic"; private static String TAGS = "ws-tag"; public static Integer ALL_USER_RECEIVE_TYPE = 1; public static Integer ONE_USER_RECEIVE_TYPE = 2; /** * 點對點之間的消息發(fā)送 * * @param destSessionKey * @param msg * @return */ public SendResult sendWebSocketToUser(String destSessionKey,String msg) { if (StringUtils.isEmpty(msg)) { log.error("[sendWebSocketToUser] msg can not be null!"); return null; } Message message = null; SendResult sendResult = null; try { BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO(); broadcastMqDTO.setEventType(ONE_USER_RECEIVE_TYPE); broadcastMqDTO.setMessage(msg); broadcastMqDTO.setSessionKey(destSessionKey); message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET)); sendResult = defaultMQProducer.send(message); } catch (Exception e) { log.error("[sendWebSocketBroadcastMsg] e is ", e); } return sendResult; } /** * 廣播消息發(fā)送 * * @param msg * @return */ public SendResult sendWebSocketBroadcastMsg(String msg) { if (StringUtils.isEmpty(msg)) { log.error("[sendWebSocketBroadcastMsg] msg can not be null!"); return null; } Message message = null; SendResult sendResult = null; try { BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO(); broadcastMqDTO.setEventType(ALL_USER_RECEIVE_TYPE); broadcastMqDTO.setMessage(msg); message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET)); sendResult = defaultMQProducer.send(message); } catch (Exception e) { log.error("[sendWebSocketBroadcastMsg] e is ", e); } return sendResult; } }
對消息的訂閱模塊實現(xiàn)代碼如下:
package org.idea.web.socket.mq; import com.alibaba.fastjson.JSON; import com.oracle.tools.packager.Log; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.idea.web.socket.dto.BroadcastMqDTO; import org.idea.web.socket.manager.SocketManager; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import org.springframework.web.socket.WebSocketSession; import javax.annotation.Resource; import java.util.List; import static org.idea.web.socket.mq.BroadcastMqProducer.ALL_USER_RECEIVE_TYPE; import static org.idea.web.socket.mq.BroadcastMqProducer.ONE_USER_RECEIVE_TYPE; /** * @Author linhao * @Date created in 10:59 上午 2021/5/10 */ @Component @Slf4j public class MessageListenerHandler implements MessageListenerConcurrently { @Resource private SocketManager socketManager; @Resource private SimpMessagingTemplate template; @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { if (CollectionUtils.isEmpty(list)) { Log.info("receive empty msg"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } MessageExt messageExt = list.get(0); byte[] bytes = messageExt.getBody(); String json = new String(bytes); BroadcastMqDTO broadcastMqDTO = JSON.parseObject(json, BroadcastMqDTO.class); log.info("[MessageListenerHandler] broadcastMqDTO is " + broadcastMqDTO); if (ALL_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) { log.info("[consumeMessage] 廣播發(fā)送消息:觸發(fā)----》消息內(nèi)容為:" + broadcastMqDTO); template.convertAndSend("/topic/sendTopic", broadcastMqDTO); } else if (ONE_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) { String sessionKey = broadcastMqDTO.getSessionKey(); WebSocketSession webSocketSession = socketManager.get(sessionKey); if (webSocketSession != null) { template.convertAndSendToUser(sessionKey, "/queue/sendUser", broadcastMqDTO.getMessage()); log.info("[consumeMessage] 點對點發(fā)送消息;觸發(fā)----》消息內(nèi)容為:" + broadcastMqDTO); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
整體設(shè)計結(jié)構(gòu)如下圖:
于是按照這個結(jié)構(gòu)進(jìn)行了一版本的緊急開發(fā)迭代,原先的單臺服務(wù)器擴展為了服務(wù)集群。
業(yè)務(wù)拓展后續(xù)產(chǎn)品經(jīng)理提出一個需求,要求支持在同一間房內(nèi)的兩個用戶之間發(fā)送悄悄話功能。這就需要我們進(jìn)行一個點對點之間傳輸通訊的功能了。因此需要在mq通知到每臺機器的時候加一個本地Session遍歷的邏輯,如果當(dāng)前機器存有用戶token對應(yīng)的session變量,那么就單獨針對那個Session進(jìn)行WebSocket的發(fā)送通知。
設(shè)計弊端一旦某臺機器出現(xiàn)了異常崩潰,那么就意味著這臺機器上的所有語音連接可能會出現(xiàn)中斷情況。目前這一塊的問題也在考慮解決,計劃是將WebSocketSession存入到分布式緩存的redis中保證數(shù)據(jù)可靠存儲,但是在后續(xù)嘗試的時候發(fā)現(xiàn)WebSocketSession對象沒有實現(xiàn)序列化接口,在存儲到Redis的時候會出現(xiàn)異常。目前這個問題還在尋找解決思路中,不知道各位讀者朋友們有什么好的思路。
遇到的問題點用戶請求直接訪問到了我們的內(nèi)部服務(wù)器,如果在請求的中間加入一臺nginx做負(fù)載均衡則需要在nginx中配置一些額外信息。
項目的源代碼比較多,這里我把核心部分的代碼整理了一份,感興趣的朋友可以到我的gitee上邊去下載:
https://gitee.com/IdeaHome_admin/socket-framework
到此,關(guān)于“用純Java實現(xiàn)一個即時通訊系統(tǒng)”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。