溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

分布式下的WebSocket解決方案是什么

發(fā)布時間:2021-10-27 11:02:51 來源:億速云 閱讀:150 作者:iii 欄目:web開發(fā)

本篇內(nèi)容主要講解“分布式下的WebSocket解決方案是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“分布式下的WebSocket解決方案是什么”吧!

WebSocket單體應(yīng)用介紹

在介紹分布式集群之前,我們先來看一下王子的WebSocket代碼實現(xiàn),先來看java后端代碼如下:

import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject;import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @ServerEndpoint("/webSocket/{key}") public class WebSocket {     private static int onlineCount = 0;     /**      * 存儲連接的客戶端      */     private static Map<String, WebSocket> clients = new ConcurrentHashMap<String, WebSocket>();     private Session session;     /**      * 發(fā)送的目標(biāo)科室code      */     private String key;     @OnOpen     public void onOpen(@PathParam("key") String key, Session session) throws IOException {         this.key = key;         this.session = session;         if (!clients.containsKey(key)) {             addOnlineCount();        }        clients.put(key, this);         Log.info(key+"已連接消息服務(wù)!");     }    @OnClose     public void onClose() throws IOException {         clients.remove(key);        subOnlineCount();    }    @OnMessage     public void onMessage(String message) throws IOException {         if(message.equals("ping")){             return ;         }        JSONObject jsonTo = JSON.parseObject(message);        String mes = (String) jsonTo.get("message");         if (!jsonTo.get("to").equals("All")){             sendMessageTo(mes, jsonTo.get("to").toString());         }else{             sendMessageAll(mes);        }    }    @OnError     public void onError(Session session, Throwable error) {         error.printStackTrace();    }    private void sendMessageTo(String message, String To) throws IOException {         for (WebSocket item : clients.values()) {             if (item.key.contains(To) )                 item.session.getAsyncRemote().sendText(message);        }    }    private void sendMessageAll(String message) throws IOException {         for (WebSocket item : clients.values()) {             item.session.getAsyncRemote().sendText(message);        }    }    public static synchronized int getOnlineCount() {         return onlineCount;     }    public static synchronized void addOnlineCount() {         WebSocket.onlineCount++;    }    public static synchronized void subOnlineCount() {         WebSocket.onlineCount--;    }    public static synchronized Map<String, WebSocket> getClients() {         return clients;     }}

示例代碼中并沒有使用Spring,用的是原生的java web編寫的,簡單和大家介紹一下里面的方法。

  • onOpen:在客戶端與WebSocket服務(wù)連接時觸發(fā)方法執(zhí)行

  • onClose:在客戶端與WebSocket連接斷開的時候觸發(fā)執(zhí)行

  • onMessage:在接收到客戶端發(fā)送的消息時觸發(fā)執(zhí)行

  • onError:在發(fā)生錯誤時觸發(fā)執(zhí)行

可以看到,在onMessage方法中,我們直接根據(jù)客戶端發(fā)送的消息,進行消息的轉(zhuǎn)發(fā)功能,這樣在單體消息服務(wù)中是沒有問題的。

再來看一下js代碼

var host = document.location.host;     // 獲得當(dāng)前登錄科室    var deptCodes='${sessionScope.$UserContext.departmentID}';     deptCodes=deptCodes.replace(/[\[|\]|\s]+/g, "");     var key = '${sessionScope.$UserContext.userID}'+deptCodes;     var lockReconnect = false;  //避免ws重復(fù)連接     var ws = null;          // 判斷當(dāng)前瀏覽器是否支持WebSocket    var wsUrl = 'ws://' + host + '/webSocket/'+ key;     createWebSocket(wsUrl);   //連接ws    function createWebSocket(url) {         try{            if('WebSocket' in window){                 ws = new WebSocket(url);            }else if('MozWebSocket' in window){                   ws = new MozWebSocket(url);            }else{                   layer.alert("您的瀏覽器不支持websocket協(xié)議,建議使用新版谷歌、火狐等瀏覽器,請勿使用IE10以下瀏覽器,360瀏覽器請使用極速模式,不要使用兼容模式!");              }            initEventHandle();        }catch(e){            reconnect(url);            console.log(e);         }         }    function initEventHandle() {         ws.onclose = function () {             reconnect(wsUrl);            console.log("llws連接關(guān)閉!"+new Date().toUTCString());         };        ws.onerror = function () {             reconnect(wsUrl);            console.log("llws連接錯誤!");         };        ws.onopen = function () {             heartCheck.reset().start();      //心跳檢測重置            console.log("llws連接成功!"+new Date().toUTCString());         };        ws.onmessage = function (event) {    //如果獲取到消息,心跳檢測重置             heartCheck.reset().start();      //拿到任何消息都說明當(dāng)前連接是正常的//接收到消息實際業(yè)務(wù)處理  ...        };    }    // 監(jiān)聽窗口關(guān)閉事件,當(dāng)窗口關(guān)閉時,主動去關(guān)閉websocket連接,防止連接還沒斷開就關(guān)閉窗口,server端會拋異常。    window.onbeforeunload = function() {         ws.close();     }      function reconnect(url) {         if(lockReconnect) return;         lockReconnect = true;         setTimeout(function () {     //沒連接上會一直重連,設(shè)置延遲避免請求過多             createWebSocket(url);            lockReconnect = false;         }, 2000);     }    //心跳檢測    var heartCheck = {        timeout: 300000,        //5分鐘發(fā)一次心跳         timeoutObj: null,        serverTimeoutObj: null,        reset: function(){             clearTimeout(this.timeoutObj);            clearTimeout(this.serverTimeoutObj);            return this;         },        start: function(){             var self = this;            this.timeoutObj = setTimeout(function(){                 //這里發(fā)送一個心跳,后端收到后,返回一個心跳消息,                //onmessage拿到返回的心跳就說明連接正常                ws.send("ping");                 console.log("ping!")                 self.serverTimeoutObj = setTimeout(function(){//如果超過一定時間還沒重置,說明后端主動斷開了                     ws.close();     //如果onclose會執(zhí)行reconnect,我們執(zhí)行ws.close()就行了.如果直接執(zhí)行reconnect 會觸發(fā)onclose導(dǎo)致重連兩次                 }, self.timeout)            }, this.timeout)        }}

js部分使用的是原生H5編寫的,如果為了更好的兼容瀏覽器,也可以使用SockJS,有興趣小伙伴們可以自行百度。

接下來我們就手動的優(yōu)化代碼,實現(xiàn)WebSocket對分布式架構(gòu)的支持。

解決方案的思考

現(xiàn)在我們已經(jīng)了解單體應(yīng)用下的代碼結(jié)構(gòu),也清楚了WebSocket在分布式環(huán)境下面臨的問題,那么是時候思考一下如何能夠解決這個問題了。

我們先來看一看發(fā)生這個問題的根本原因是什么。

簡單思考一下就能明白,單體應(yīng)用下只有一臺服務(wù)器,所有的客戶端連接的都是這一臺消息服務(wù)器,所以當(dāng)發(fā)布消息者發(fā)送消息時,所有的客戶端其實已經(jīng)全部與這臺服務(wù)器建立了連接,直接群發(fā)消息就可以了。

換成分布式系統(tǒng)后,假如我們有兩臺消息服務(wù)器,那么客戶端通過Nginx負載均衡后,就會有一部分連接到其中一臺服務(wù)器,另一部分連接到另一臺服務(wù)器,所以發(fā)布消息者發(fā)送消息時,只會發(fā)送到其中的一臺服務(wù)器上,而這臺消息服務(wù)器就可以執(zhí)行群發(fā)操作,但問題是,另一臺服務(wù)器并不知道這件事,也就無法發(fā)送消息了。

現(xiàn)在我們知道了根本原因是生產(chǎn)消息時,只有一臺消息服務(wù)器能夠感知到,所以我們只要讓另一臺消息服務(wù)器也能感知到就可以了,這樣感知到之后,它就可以群發(fā)消息給連接到它上邊的客戶端了。

那么什么方法可以實現(xiàn)這種功能呢,王子很快想到了引入消息中間件,并使用它的發(fā)布訂閱模式來通知所有消息服務(wù)器就可以了。

引入RabbitMQ解決分布式下的WebSocket問題

在消息中間件的選擇上,王子選擇了RabbitMQ,原因是它的搭建比較簡單,功能也很強大,而且我們只是用到它群發(fā)消息的功能。

RabbitMQ有一個廣播模式(fanout),我們使用的就是這種模式。

首先我們寫一個RabbitMQ的連接類:

import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitMQUtil {     private static Connection connection;     /**      * 與rabbitmq建立連接      * @return      */     public static Connection getConnection() {         if (connection != null&&connection.isOpen()) {             return connection;         }        ConnectionFactory factory = new ConnectionFactory();         factory.setVirtualHost("/");         factory.setHost("192.168.220.110"); // 用的是虛擬IP地址         factory.setPort(5672);         factory.setUsername("guest");         factory.setPassword("guest");         try {             connection = factory.newConnection();         } catch (IOException e) {             e.printStackTrace();         } catch (TimeoutException e) {             e.printStackTrace();         }         return connection;     } }

這個類沒什么說的,就是獲取MQ連接的一個工廠類。

然后按照我們的思路,就是每次服務(wù)器啟動的時候,都會創(chuàng)建一個MQ的消費者監(jiān)聽MQ的消息,王子這里測試使用的是Servlet的監(jiān)聽器,如下:

import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; public class InitListener implements ServletContextListener {     @Override     public void contextInitialized(ServletContextEvent servletContextEvent) {         WebSocket.init();    }    @Override     public void contextDestroyed(ServletContextEvent servletContextEvent) {     }}

記得要在Web.xml中配置監(jiān)聽器信息

<?xml version="1.0" encoding="UTF-8"?> <web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"          version="4.0">     <listener>         <listener-class>InitListener</listener-class>     </listener> </web-app>

WebSocket中增加init方法,作為MQ消費者部分

public  static void init() {         try {            Connection connection = RabbitMQUtil.getConnection();            Channel channel = connection.createChannel();            //交換機聲明(參數(shù)為:交換機名稱;交換機類型)             channel.exchangeDeclare("fanoutLogs",BuiltinExchangeType.FANOUT);             //獲取一個臨時隊列             String queueName = channel.queueDeclare().getQueue();            //隊列與交換機綁定(參數(shù)為:隊列名稱;交換機名稱;routingKey忽略)             channel.queueBind(queueName,"fanoutLogs","");             //這里重寫了DefaultConsumer的handleDelivery方法,因為發(fā)送的時候?qū)ο⑦M行了getByte(),在這里要重新組裝成String             Consumer consumer = new DefaultConsumer(channel) {                @Override                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                    super.handleDelivery(consumerTag, envelope, properties, body);                     String message = new String(body,"UTF-8");                     System.out.println(message);//這里可以使用WebSocket通過消息內(nèi)容發(fā)送消息給對應(yīng)的客戶端                 }            };            //聲明隊列中被消費掉的消息(參數(shù)為:隊列名稱;消息是否自動確認;consumer主體)             channel.basicConsume(queueName,true,consumer);             //這里不能關(guān)閉連接,調(diào)用了消費方法后,消費者會一直連接著rabbitMQ等待消費         } catch (IOException e) {            e.printStackTrace();        }    }

同時在接收到消息時,不是直接通過WebSocket發(fā)送消息給對應(yīng)客戶端,而是發(fā)送消息給MQ,這樣如果消息服務(wù)器有多個,就都會從MQ中獲得消息,之后通過獲取的消息內(nèi)容再使用WebSocket推送給對應(yīng)的客戶端就可以了。

WebSocket的onMessage方法增加內(nèi)容如下:

try {             //嘗試獲取一個連接             Connection connection = RabbitMQUtil.getConnection();            //嘗試創(chuàng)建一個channel             Channel channel = connection.createChannel();            //聲明交換機(參數(shù)為:交換機名稱; 交換機類型,廣播模式)             channel.exchangeDeclare("fanoutLogs", BuiltinExchangeType.FANOUT);             //消息發(fā)布(參數(shù)為:交換機名稱; routingKey,忽略。在廣播模式中,生產(chǎn)者聲明交換機的名稱和類型即可)             channel.basicPublish("fanoutLogs","", null,msg.getBytes("UTF-8"));             System.out.println("發(fā)布消息");             channel.close();        } catch (IOException |TimeoutException e) {             e.printStackTrace();         }

增加后刪除掉原來的Websocket推送部分代碼。

到此,相信大家對“分布式下的WebSocket解決方案是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI