溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

基于Java NIO的即時聊天服務器模型怎么實現(xiàn)

發(fā)布時間:2022-01-06 18:12:51 來源:億速云 閱讀:99 作者:iii 欄目:編程語言

這篇文章主要講解了“基于Java NIO的即時聊天服務器模型怎么實現(xiàn)”,文中的講解內(nèi)容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“基于Java NIO的即時聊天服務器模型怎么實現(xiàn)”吧!

廢話不多說,關于NIO的SelectionKey、Selector、Channel網(wǎng)上的介紹例子都很多,直接上代碼:

JsonParser

Json的解析類,隨便封裝了下,使用的最近比較火的fastjson

public class JsonParser {          private static JSONObject mJson;          public synchronized static String get(String json,String key) {         mJson = JSON.parseObject(json);         return mJson.getString(key);     } }

Main

入口,不解釋

public class Main {      public static void main(String... args) {         new SeekServer().start();     } }

Log

public class Log {      public static void i(Object obj) {         System.out.println(obj);     }     public static void e(Object e) {         System.err.println(e);     } }

SeekServer:

服務器端的入口,請求的封裝和接收都在此類,端口暫時寫死在了代碼里,mSelector.select(TIME_OUT) > 0 目的是為了當服務器空閑的時候(沒有任何讀寫甚至請求斷開事件),循環(huán)時有個間隔時間,不然基本上相當于while(true){//nothing}了,你懂的。

public class SeekServer extends Thread{     private final int ACCPET_PORT = 55555;     private final int TIME_OUT = 1000;     private Selector mSelector = null;     private ServerSocketChannel mSocketChannel = null;     private ServerSocket mServerSocket = null;     private InetSocketAddress mAddress = null;          public SeekServer() {         long sign = System.currentTimeMillis();         try {             mSocketChannel = ServerSocketChannel.open();             if(mSocketChannel == null) {                 System.out.println("can't open server socket channel");             }             mServerSocket = mSocketChannel.socket();             mAddress = new InetSocketAddress(ACCPET_PORT);             mServerSocket.bind(mAddress);             Log.i("server bind port is " + ACCPET_PORT);             mSelector = Selector.open();             mSocketChannel.configureBlocking(false);             SelectionKey key = mSocketChannel.register(mSelector, SelectionKey.OP_ACCEPT);             key.attach(new Acceptor());                          //檢測Session狀態(tài)             Looper.getInstance().loop();                          //開始處理Session             SessionProcessor.start();                          Log.i("Seek server startup in " + (System.currentTimeMillis() - sign) + "ms!");         } catch (ClosedChannelException e) {             Log.e(e.getMessage());         } catch (IOException e) {             Log.e(e.getMessage());         }      }          public void run() {         Log.i("server is listening...");         while(!Thread.interrupted()) {             try {                 if(mSelector.select(TIME_OUT) > 0) {                     Set<SelectionKey> keys = mSelector.selectedKeys();                     Iterator<SelectionKey> iterator = keys.iterator();                     SelectionKey key = null;                     while(iterator.hasNext()) {                         key = iterator.next();                         Handler at = (Handler) key.attachment();                         if(at != null) {                             at.exec();                         }                         iterator.remove();                     }                 }             } catch (IOException e) {                 Log.e(e.getMessage());             }         }     }      class Acceptor extends Handler{          public void exec(){             try {                 SocketChannel sc = mSocketChannel.accept();                 new Session(sc, mSelector);             } catch (ClosedChannelException e) {                 Log.e(e);             } catch (IOException e) {                 Log.e(e);             }         }     } }

Handler:

只有一個抽象方法exec,Session將會繼承它。

public abstract class Handler {          public abstract void exec(); }

Session:

封裝了用戶的請求和SelectionKey和SocketChannel,每次接收到新的請求時都重置它的最后活動時間,通過狀態(tài)mState=READING or SENDING 去執(zhí)行消息的接收與發(fā)送,當客戶端異常斷開時則從SessionManager清除該會話。

public class Session extends Handler{      private SocketChannel mChannel;     private SelectionKey  mKey;     private ByteBuffer mRreceiveBuffer = ByteBuffer.allocate(10240);       private Charset charset = Charset.forName("UTF-8");     private CharsetDecoder mDecoder = charset.newDecoder();     private CharsetEncoder mEncoder = charset.newEncoder();     private long lastPant;//最后活動時間     private final int TIME_OUT = 1000 * 60 * 5; //Session超時時間     private String key;          private String sendData = "";     private String receiveData = null;          public static final int READING = 0,SENDING = 1;     int mState = READING;          public Session(SocketChannel socket, Selector selector) throws IOException {         this.mChannel = socket;         mChannel = socket;         mChannel.configureBlocking(false);         mKey = mChannel.register(selector, 0);         mKey.attach(this);         mKey.interestOps(SelectionKey.OP_READ);         selector.wakeup();         lastPant = Calendar.getInstance().getTimeInMillis();     }          public String getReceiveData() {         return receiveData;     }          public void clear() {         receiveData = null;     }      public void setSendData(String sendData) {         mState = SENDING;         mKey.interestOps(SelectionKey.OP_WRITE);         this.sendData = sendData + "\n";     }      public boolean isKeekAlive() {         return lastPant + TIME_OUT > Calendar.getInstance().getTimeInMillis();     }          public void setAlive() {         lastPant = Calendar.getInstance().getTimeInMillis();     }          /**      * 注銷當前Session      */     public void distroy() {         try {             mChannel.close();             mKey.cancel();         } catch (IOException e) {}     }          @Override     public synchronized void exec() {         try {             if(mState == READING) {                 read();             }else if(mState == SENDING) {                 write();             }         } catch (IOException e) {             SessionManager.remove(key);             try {                 mChannel.close();             } catch (IOException e1) {                 Log.e(e1);             }             mKey.cancel();         }     }          public void read() throws IOException{         mRreceiveBuffer.clear();         int sign = mChannel.read(mRreceiveBuffer);         if(sign == -1) { //客戶端連接關閉             mChannel.close();             mKey.cancel();         }         if(sign > 0) {             mRreceiveBuffer.flip();             receiveData = mDecoder.decode(mRreceiveBuffer).toString();             setAlive();             setSign();             SessionManager.addSession(key, this);         }     }          private void setSign() {         //設置當前Session的Key         key = JsonParser.get(receiveData,"imei");         //檢測消息類型是否為心跳包 //        String type = jo.getString("type"); //        if(type.equals("HEART_BEAT")) { //            setAlive(); //        }     }               /**      * 寫消息      */     public void write() {         try {             mChannel.write(mEncoder.encode(CharBuffer.wrap(sendData)));             sendData = null;             mState = READING;             mKey.interestOps(SelectionKey.OP_READ);         } catch (CharacterCodingException e) {             e.printStackTrace();         } catch (IOException e) {             try {                 mChannel.close();             } catch (IOException e1) {                 Log.e(e1);             }         }     } }

SessionManager:

將所有Session存放到ConcurrentHashMap,這里使用手機用戶的imei做key,ConcurrentHashMap因為是線程安全的,所以能很大程度上避免自己去實現(xiàn)同步的過程,
封裝了一些操作Session的方法例如get,remove等。

public class SessionManager {      private static ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>();          public static void addSession(String key,Session session) {         sessions.put(key, session);     }         public static Session getSession(String key) {         return sessions.get(key);     }          public static Set<String> getSessionKeys() {         return sessions.keySet();     }          public static int getSessionCount() {         return sessions.size();     }          public static void remove(String[] keys) {         for(String key:keys) {             if(sessions.containsKey(key)) {                 sessions.get(key).distroy();                 sessions.remove(key);             }         }     }     public static void remove(String key) {         if(sessions.containsKey(key)) {             sessions.get(key).distroy();             sessions.remove(key);         }     } }

SessionProcessor

里面使用了JDK自帶的線程池,用來分發(fā)處理所有Session中當前需要處理的請求(線程池的初始化參數(shù)不是太熟,望有了解的童鞋能告訴我),內(nèi)部類Process則是將Session再次封裝成SocketRequest和SocketResponse(看到這里是不是有點熟悉的感覺,對沒錯,JavaWeb里到處都是request和response)。

public class SessionProcessor implements Runnable{          private static Runnable processor = new SessionProcessor();     private static ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 200, 500, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.CallerRunsPolicy());     public static void start() {         new Thread(processor).start();     }          @Override     public void run() {         while(true) {             Session tmp = null;             for(String key:SessionManager.getSessionKeys()) {                 tmp = SessionManager.getSession(key);                 //處理Session未處理的請求                 if(tmp.getReceiveData() != null) {                     pool.execute(new Process(tmp));                 }             }             try {                 Thread.sleep(10);             } catch (InterruptedException e) {                 Log.e(e);             }         }     }          class Process implements Runnable {          private SocketRequest request;         private SocketResponse response;                  public Process(Session session) {             //將Session封裝成Request和Response             request = new SocketRequest(session);             response = new SocketResponse(session);         }                  @Override         public void run() {             new RequestTransform().transfer(request, response);         }     }  }

RequestTransform里的transfer方法利用反射對請求參數(shù)中的請求類別和請求動作來調(diào)用不同類的不同方法(UserHandler和MessageHandler)

public class RequestTransform {      public void transfer(SocketRequest request,SocketResponse response) {         String action = request.getValue("action");         String handlerName = request.getValue("handler");         //根據(jù)Session的請求類型,讓不同的類方法去處理         try {             Class<?> c= Class.forName("com.seek.server.handler." + handlerName);             Class<?>[] arg=new Class[]{SocketRequest.class,SocketResponse.class};             Method method=c.getMethod(action,arg);             method.invoke(c.newInstance(), new Object[]{request,response});         } catch (Exception e) {             e.printStackTrace();         }     } }

SocketRequest和SocketResponse

public class SocketRequest {      private Session mSession;     private String  mReceive;          public SocketRequest(Session session) {         mSession = session;         mReceive = session.getReceiveData();         mSession.clear();     }          public String getValue(String key) {         return JsonParser.get(mReceive, key);     }          public String getQueryString() {         return mReceive;     } }
public class SocketResponse {        private Session mSession;      public SocketResponse(Session session) {          mSession = session;      }            public void write(String msg) {          mSession.setSendData(msg);      }  }

最后則是兩個處理請求的Handler

public class UserHandler {      public void login(SocketRequest request,SocketResponse response) {         System.out.println(request.getQueryString());         //TODO: 處理用戶登錄         response.write("你肯定收到消息了");     } }
public class MessageHandler {     public void send(SocketRequest request,SocketResponse response) {         System.out.println(request.getQueryString());         //消息發(fā)送         String key = request.getValue("imei");         Session session = SessionManager.getSession(key);         new SocketResponse(session).write(request.getValue("sms"));     } }

還有個監(jiān)測是否超時的類Looper,定期去刪除Session

public class Looper extends Thread{     private static Looper looper = new Looper();     private static boolean isStart = false;     private final int INTERVAL = 1000 * 60 * 5;     private Looper(){}     public static Looper getInstance() {         return looper;     }          public void loop() {         if(!isStart) {             isStart = true;             this.start();         }     }          public void run() {         Task task = new Task();         while(true) {             //Session過期檢測             task.checkState();             //心跳包檢測             //task.sendAck();             try {                 Thread.sleep(INTERVAL);             } catch (InterruptedException e) {                 Log.e(e);             }         }     } }
public class Task {     public void checkState() {         Set<String> keys = SessionManager.getSessionKeys();         if(keys.size() == 0) {             return;         }         List<String> removes = new ArrayList<String>();         Iterator<String> iterator = keys.iterator();         String key = null;         while(iterator.hasNext()) {             key = iterator.next();             if(!SessionManager.getSession(key).isKeekAlive()) {                 removes.add(key);             }        }         if(removes.size() > 0) {             Log.i("sessions is time out,remove " + removes.size() + "session");         }         SessionManager.remove(removes.toArray(new String[removes.size()]));     }          public void sendAck() {         Set<String> keys = SessionManager.getSessionKeys();         if(keys.size() == 0) {             return;         }         Iterator<String> iterator = keys.iterator();         while(iterator.hasNext()) {             iterator.next();             //TODO 發(fā)送心跳包         }     } }

注意,在Task和SessionProcessor類里都有對SessionManager的sessions做遍歷,文中使用的方法并不是很好,主要是效率問題,推薦使用遍歷Entry的方式來獲取Key和Value,因為一直在JavaWeb上折騰,所以會的童鞋看到Request和Response會挺親切,這個例子沒有經(jīng)過任何安全和性能測試,如果需要放到生產(chǎn)環(huán)境上得話請先自行做測試- -!

客戶端請求時的數(shù)據(jù)內(nèi)容例如{handler:"UserHandler",action:"login",imei:"2364656512636".......},這些約定就自己來定了。

感謝各位的閱讀,以上就是“基于Java NIO的即時聊天服務器模型怎么實現(xiàn)”的內(nèi)容了,經(jīng)過本文的學習后,相信大家對基于Java NIO的即時聊天服務器模型怎么實現(xiàn)這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI