溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Java中網(wǎng)絡(luò)IO編程的示例分析

發(fā)布時(shí)間:2021-07-21 14:02:51 來源:億速云 閱讀:115 作者:小新 欄目:編程語(yǔ)言

這篇文章給大家分享的是有關(guān)Java中網(wǎng)絡(luò)IO編程的示例分析的內(nèi)容。小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,一起跟隨小編過來看看吧。

下面代碼中會(huì)使用這樣一個(gè)例子:客戶端發(fā)送一段算式的字符串到服務(wù)器,服務(wù)器計(jì)算后返回結(jié)果到客戶端。

代碼的所有說明,都直接作為注釋,嵌入到代碼中,看代碼時(shí)就能更容易理解,代碼中會(huì)用到一個(gè)計(jì)算結(jié)果的工具類,見文章代碼部分。

相關(guān)的基礎(chǔ)知識(shí)文章推薦:
Linux 網(wǎng)絡(luò) I/O 模型簡(jiǎn)介(圖文)
Java 并發(fā)(多線程)   

1、BIO編程

1.1、傳統(tǒng)的BIO編程

網(wǎng)絡(luò)編程的基本模型是C/S模型,即兩個(gè)進(jìn)程間的通信。

服務(wù)端提供IP和監(jiān)聽端口,客戶端通過連接操作想服務(wù)端監(jiān)聽的地址發(fā)起連接請(qǐng)求,通過三次握手連接,如果連接成功建立,雙方就可以通過套接字進(jìn)行通信。

傳統(tǒng)的同步阻塞模型開發(fā)中,ServerSocket負(fù)責(zé)綁定IP地址,啟動(dòng)監(jiān)聽端口;Socket負(fù)責(zé)發(fā)起連接操作。連接成功后,雙方通過輸入和輸出流進(jìn)行同步阻塞式通信。 

簡(jiǎn)單的描述一下BIO的服務(wù)端通信模型:采用BIO通信模型的服務(wù)端,通常由一個(gè)獨(dú)立的Acceptor線程負(fù)責(zé)監(jiān)聽客戶端的連接,它接收到客戶端連接請(qǐng)求之后為每個(gè)客戶端創(chuàng)建一個(gè)新的線程進(jìn)行鏈路處理沒處理完成后,通過輸出流返回應(yīng)答給客戶端,線程銷毀。即典型的一請(qǐng)求一應(yīng)答通宵模型。

傳統(tǒng)BIO通信模型圖:

Java中網(wǎng)絡(luò)IO編程的示例分析

該模型最大的問題就是缺乏彈性伸縮能力,當(dāng)客戶端并發(fā)訪問量增加后,服務(wù)端的線程個(gè)數(shù)和客戶端并發(fā)訪問數(shù)呈1:1的正比關(guān)系,Java中的線程也是比較寶貴的系統(tǒng)資源,線程數(shù)量快速膨脹后,系統(tǒng)的性能將急劇下降,隨著訪問量的繼續(xù)增大,系統(tǒng)最終就死-掉-了。

同步阻塞式I/O創(chuàng)建的Server源碼:

package com.anxpp.io.calculator.bio; 
import java.io.IOException; 
import java.net.ServerSocket; 
import java.net.Socket; 
/** 
 * BIO服務(wù)端源碼 
 * @author yangtao__anxpp.com 
 * @version 1.0 
 */ 
public final class ServerNormal { 
 //默認(rèn)的端口號(hào) 
 private static int DEFAULT_PORT = 12345; 
 //單例的ServerSocket 
 private static ServerSocket server; 
 //根據(jù)傳入?yún)?shù)設(shè)置監(jiān)聽端口,如果沒有參數(shù)調(diào)用以下方法并使用默認(rèn)值 
 public static void start() throws IOException{ 
  //使用默認(rèn)值 
  start(DEFAULT_PORT); 
 } 
 //這個(gè)方法不會(huì)被大量并發(fā)訪問,不太需要考慮效率,直接進(jìn)行方法同步就行了 
 public synchronized static void start(int port) throws IOException{ 
  if(server != null) return; 
  try{ 
   //通過構(gòu)造函數(shù)創(chuàng)建ServerSocket 
   //如果端口合法且空閑,服務(wù)端就監(jiān)聽成功 
   server = new ServerSocket(port); 
   System.out.println("服務(wù)器已啟動(dòng),端口號(hào):" + port); 
   //通過無(wú)線循環(huán)監(jiān)聽客戶端連接 
   //如果沒有客戶端接入,將阻塞在accept操作上。 
   while(true){ 
    Socket socket = server.accept(); 
    //當(dāng)有新的客戶端接入時(shí),會(huì)執(zhí)行下面的代碼 
    //然后創(chuàng)建一個(gè)新的線程處理這條Socket鏈路 
    new Thread(new ServerHandler(socket)).start(); 
   } 
  }finally{ 
   //一些必要的清理工作 
   if(server != null){ 
    System.out.println("服務(wù)器已關(guān)閉。"); 
    server.close(); 
    server = null; 
   } 
  } 
 } 
}

客戶端消息處理線程ServerHandler源碼:

package com.anxpp.io.calculator.bio; 
import java.io.BufferedReader; 
import java.io.IOException; 
import java.io.InputStreamReader; 
import java.io.PrintWriter; 
import java.net.Socket; 
 
import com.anxpp.io.utils.Calculator; 
/** 
 * 客戶端線程 
 * @author yangtao__anxpp.com 
 * 用于處理一個(gè)客戶端的Socket鏈路 
 */ 
public class ServerHandler implements Runnable{ 
 private Socket socket; 
 public ServerHandler(Socket socket) { 
  this.socket = socket; 
 } 
 @Override 
 public void run() { 
  BufferedReader in = null; 
  PrintWriter out = null; 
  try{ 
   in = new BufferedReader(new InputStreamReader(socket.getInputStream())); 
   out = new PrintWriter(socket.getOutputStream(),true); 
   String expression; 
   String result; 
   while(true){ 
    //通過BufferedReader讀取一行 
    //如果已經(jīng)讀到輸入流尾部,返回null,退出循環(huán) 
    //如果得到非空值,就嘗試計(jì)算結(jié)果并返回 
    if((expression = in.readLine())==null) break; 
    System.out.println("服務(wù)器收到消息:" + expression); 
    try{ 
     result = Calculator.cal(expression).toString(); 
    }catch(Exception e){ 
     result = "計(jì)算錯(cuò)誤:" + e.getMessage(); 
    } 
    out.println(result); 
   } 
  }catch(Exception e){ 
   e.printStackTrace(); 
  }finally{ 
   //一些必要的清理工作 
   if(in != null){ 
    try { 
     in.close(); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
    in = null; 
   } 
   if(out != null){ 
    out.close(); 
    out = null; 
   } 
   if(socket != null){ 
    try { 
     socket.close(); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
    socket = null; 
   } 
  } 
 } 
}

同步阻塞式I/O創(chuàng)建的Client源碼:

package com.anxpp.io.calculator.bio; 
import java.io.BufferedReader; 
import java.io.IOException; 
import java.io.InputStreamReader; 
import java.io.PrintWriter; 
import java.net.Socket; 
/** 
 * 阻塞式I/O創(chuàng)建的客戶端 
 * @author yangtao__anxpp.com 
 * @version 1.0 
 */ 
public class Client { 
 //默認(rèn)的端口號(hào) 
 private static int DEFAULT_SERVER_PORT = 12345; 
 private static String DEFAULT_SERVER_IP = "127.0.0.1"; 
 public static void send(String expression){ 
  send(DEFAULT_SERVER_PORT,expression); 
 } 
 public static void send(int port,String expression){ 
  System.out.println("算術(shù)表達(dá)式為:" + expression); 
  Socket socket = null; 
  BufferedReader in = null; 
  PrintWriter out = null; 
  try{ 
   socket = new Socket(DEFAULT_SERVER_IP,port); 
   in = new BufferedReader(new InputStreamReader(socket.getInputStream())); 
   out = new PrintWriter(socket.getOutputStream(),true); 
   out.println(expression); 
   System.out.println("___結(jié)果為:" + in.readLine()); 
  }catch(Exception e){ 
   e.printStackTrace(); 
  }finally{ 
   //一下必要的清理工作 
   if(in != null){ 
    try { 
     in.close(); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
    in = null; 
   } 
   if(out != null){ 
    out.close(); 
    out = null; 
   } 
   if(socket != null){ 
    try { 
     socket.close(); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
    socket = null; 
   } 
  } 
 } 
}

測(cè)試代碼,為了方便在控制臺(tái)看輸出結(jié)果,放到同一個(gè)程序(jvm)中運(yùn)行:

package com.anxpp.io.calculator.bio; 
import java.io.IOException; 
import java.util.Random; 
/** 
 * 測(cè)試方法 
 * @author yangtao__anxpp.com 
 * @version 1.0 
 */ 
public class Test { 
 //測(cè)試主方法 
 public static void main(String[] args) throws InterruptedException { 
  //運(yùn)行服務(wù)器 
  new Thread(new Runnable() { 
   @Override 
   public void run() { 
    try { 
     ServerBetter.start(); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
   } 
  }).start(); 
  //避免客戶端先于服務(wù)器啟動(dòng)前執(zhí)行代碼 
  Thread.sleep(100); 
  //運(yùn)行客戶端 
  char operators[] = {'+','-','*','/'}; 
  Random random = new Random(System.currentTimeMillis()); 
  new Thread(new Runnable() { 
   @SuppressWarnings("static-access") 
   @Override 
   public void run() { 
    while(true){ 
     //隨機(jī)產(chǎn)生算術(shù)表達(dá)式 
     String expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1); 
     Client.send(expression); 
     try { 
      Thread.currentThread().sleep(random.nextInt(1000)); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
   } 
  }).start(); 
 } 
}

其中一次的運(yùn)行結(jié)果:

服務(wù)器已啟動(dòng),端口號(hào):12345
算術(shù)表達(dá)式為:4-2
服務(wù)器收到消息:4-2
___結(jié)果為:2
算術(shù)表達(dá)式為:5-10
服務(wù)器收到消息:5-10
___結(jié)果為:-5
算術(shù)表達(dá)式為:0-9
服務(wù)器收到消息:0-9
___結(jié)果為:-9
算術(shù)表達(dá)式為:0+6
服務(wù)器收到消息:0+6
___結(jié)果為:6
算術(shù)表達(dá)式為:1/6
服務(wù)器收到消息:1/6
___結(jié)果為:0.16666666666666666
...

從以上代碼,很容易看出,BIO主要的問題在于每當(dāng)有一個(gè)新的客戶端請(qǐng)求接入時(shí),服務(wù)端必須創(chuàng)建一個(gè)新的線程來處理這條鏈路,在需要滿足高性能、高并發(fā)的場(chǎng)景是沒法應(yīng)用的(大量創(chuàng)建新的線程會(huì)嚴(yán)重影響服務(wù)器性能,甚至罷工)。

1.2、偽異步I/O編程

為了改進(jìn)這種一連接一線程的模型,我們可以使用線程池來管理這些線程(需要了解更多請(qǐng)參考前面提供的文章),實(shí)現(xiàn)1個(gè)或多個(gè)線程處理N個(gè)客戶端的模型(但是底層還是使用的同步阻塞I/O),通常被稱為“偽異步I/O模型“。

偽異步I/O模型圖:

Java中網(wǎng)絡(luò)IO編程的示例分析

實(shí)現(xiàn)很簡(jiǎn)單,我們只需要將新建線程的地方,交給線程池管理即可,只需要改動(dòng)剛剛的Server代碼即可:

package com.anxpp.io.calculator.bio; 
import java.io.IOException; 
import java.net.ServerSocket; 
import java.net.Socket; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
/** 
 * BIO服務(wù)端源碼__偽異步I/O 
 * @author yangtao__anxpp.com 
 * @version 1.0 
 */ 
public final class ServerBetter { 
 //默認(rèn)的端口號(hào) 
 private static int DEFAULT_PORT = 12345; 
 //單例的ServerSocket 
 private static ServerSocket server; 
 //線程池 懶漢式的單例 
 private static ExecutorService executorService = Executors.newFixedThreadPool(60); 
 //根據(jù)傳入?yún)?shù)設(shè)置監(jiān)聽端口,如果沒有參數(shù)調(diào)用以下方法并使用默認(rèn)值 
 public static void start() throws IOException{ 
  //使用默認(rèn)值 
  start(DEFAULT_PORT); 
 } 
 //這個(gè)方法不會(huì)被大量并發(fā)訪問,不太需要考慮效率,直接進(jìn)行方法同步就行了 
 public synchronized static void start(int port) throws IOException{ 
  if(server != null) return; 
  try{ 
   //通過構(gòu)造函數(shù)創(chuàng)建ServerSocket 
   //如果端口合法且空閑,服務(wù)端就監(jiān)聽成功 
   server = new ServerSocket(port); 
   System.out.println("服務(wù)器已啟動(dòng),端口號(hào):" + port); 
   //通過無(wú)線循環(huán)監(jiān)聽客戶端連接 
   //如果沒有客戶端接入,將阻塞在accept操作上。 
   while(true){ 
    Socket socket = server.accept(); 
    //當(dāng)有新的客戶端接入時(shí),會(huì)執(zhí)行下面的代碼 
    //然后創(chuàng)建一個(gè)新的線程處理這條Socket鏈路 
    executorService.execute(new ServerHandler(socket)); 
   } 
  }finally{ 
   //一些必要的清理工作 
   if(server != null){ 
    System.out.println("服務(wù)器已關(guān)閉。"); 
    server.close(); 
    server = null; 
   } 
  } 
 } 
}

測(cè)試運(yùn)行結(jié)果是一樣的。

我們知道,如果使用CachedThreadPool線程池(不限制線程數(shù)量,如果不清楚請(qǐng)參考文首提供的文章),其實(shí)除了能自動(dòng)幫我們管理線程(復(fù)用),看起來也就像是1:1的客戶端:線程數(shù)模型,而使用FixedThreadPool我們就有效的控制了線程的最大數(shù)量,保證了系統(tǒng)有限的資源的控制,實(shí)現(xiàn)了N:M的偽異步I/O模型。

但是,正因?yàn)橄拗屏司€程數(shù)量,如果發(fā)生大量并發(fā)請(qǐng)求,超過最大數(shù)量的線程就只能等待,直到線程池中的有空閑的線程可以被復(fù)用。而對(duì)Socket的輸入流就行讀取時(shí),會(huì)一直阻塞,直到發(fā)生:

  1. 有數(shù)據(jù)可讀

  2. 可用數(shù)據(jù)以及讀取完畢

  3. 發(fā)生空指針或I/O異常

所以在讀取數(shù)據(jù)較慢時(shí)(比如數(shù)據(jù)量大、網(wǎng)絡(luò)傳輸慢等),大量并發(fā)的情況下,其他接入的消息,只能一直等待,這就是最大的弊端。

而后面即將介紹的NIO,就能解決這個(gè)難題。

2、NIO 編程

JDK 1.4中的java.nio.*包中引入新的Java I/O庫(kù),其目的是提高速度。實(shí)際上,“舊”的I/O包已經(jīng)使用NIO重新實(shí)現(xiàn)過,即使我們不顯式的使用NIO編程,也能從中受益。速度的提高在文件I/O和網(wǎng)絡(luò)I/O中都可能會(huì)發(fā)生,但本文只討論后者。

2.1、簡(jiǎn)介

NIO我們一般認(rèn)為是New I/O(也是官方的叫法),因?yàn)樗窍鄬?duì)于老的I/O類庫(kù)新增的(其實(shí)在JDK 1.4中就已經(jīng)被引入了,但這個(gè)名詞還會(huì)繼續(xù)用很久,即使它們?cè)诂F(xiàn)在看來已經(jīng)是“舊”的了,所以也提示我們?cè)诿麜r(shí),需要好好考慮),做了很大的改變。但民間跟多人稱之為Non-block I/O,即非阻塞I/O,因?yàn)檫@樣叫,更能體現(xiàn)它的特點(diǎn)。而下文中的NIO,不是指整個(gè)新的I/O庫(kù),而是非阻塞I/O。

NIO提供了與傳統(tǒng)BIO模型中的Socket和ServerSocket相對(duì)應(yīng)的SocketChannel和ServerSocketChannel兩種不同的套接字通道實(shí)現(xiàn)。

新增的著兩種通道都支持阻塞和非阻塞兩種模式。

阻塞模式使用就像傳統(tǒng)中的支持一樣,比較簡(jiǎn)單,但是性能和可靠性都不好;非阻塞模式正好與之相反。

對(duì)于低負(fù)載、低并發(fā)的應(yīng)用程序,可以使用同步阻塞I/O來提升開發(fā)速率和更好的維護(hù)性;對(duì)于高負(fù)載、高并發(fā)的(網(wǎng)絡(luò))應(yīng)用,應(yīng)使用NIO的非阻塞模式來開發(fā)。

下面會(huì)先對(duì)基礎(chǔ)知識(shí)進(jìn)行介紹。

2.2、緩沖區(qū) Buffer

Buffer是一個(gè)對(duì)象,包含一些要寫入或者讀出的數(shù)據(jù)。

在NIO庫(kù)中,所有數(shù)據(jù)都是用緩沖區(qū)處理的。在讀取數(shù)據(jù)時(shí),它是直接讀到緩沖區(qū)中的;在寫入數(shù)據(jù)時(shí),也是寫入到緩沖區(qū)中。任何時(shí)候訪問NIO中的數(shù)據(jù),都是通過緩沖區(qū)進(jìn)行操作。

緩沖區(qū)實(shí)際上是一個(gè)數(shù)組,并提供了對(duì)數(shù)據(jù)結(jié)構(gòu)化訪問以及維護(hù)讀寫位置等信息。

具體的緩存區(qū)有這些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他們實(shí)現(xiàn)了相同的接口:Buffer。

2.3、通道 Channel

我們對(duì)數(shù)據(jù)的讀取和寫入要通過Channel,它就像水管一樣,是一個(gè)通道。通道不同于流的地方就是通道是雙向的,可以用于讀、寫和同時(shí)讀寫操作。

底層的操作系統(tǒng)的通道一般都是全雙工的,所以全雙工的Channel比流能更好的映射底層操作系統(tǒng)的API。

Channel主要分兩大類:

  1. SelectableChannel:用戶網(wǎng)絡(luò)讀寫

  2. FileChannel:用于文件操作

后面代碼會(huì)涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子類。

 2.4、多路復(fù)用器 Selector

Selector是Java  NIO 編程的基礎(chǔ)。

Selector提供選擇已經(jīng)就緒的任務(wù)的能力:Selector會(huì)不斷輪詢注冊(cè)在其上的Channel,如果某個(gè)Channel上面發(fā)生讀或者寫事件,這個(gè)Channel就處于就緒狀態(tài),會(huì)被Selector輪詢出來,然后通過SelectionKey可以獲取就緒Channel的集合,進(jìn)行后續(xù)的I/O操作。

一個(gè)Selector可以同時(shí)輪詢多個(gè)Channel,因?yàn)镴DK使用了epoll()代替?zhèn)鹘y(tǒng)的select實(shí)現(xiàn),所以沒有最大連接句柄1024/2048的限制。所以,只需要一個(gè)線程負(fù)責(zé)Selector的輪詢,就可以接入成千上萬(wàn)的客戶端。

2.5、NIO服務(wù)端

代碼比傳統(tǒng)的Socket編程看起來要復(fù)雜不少。

直接貼代碼吧,以注釋的形式給出代碼說明。

NIO創(chuàng)建的Server源碼:

package com.anxpp.io.calculator.nio; 
public class Server { 
 private static int DEFAULT_PORT = 12345; 
 private static ServerHandle serverHandle; 
 public static void start(){ 
  start(DEFAULT_PORT); 
 } 
 public static synchronized void start(int port){ 
  if(serverHandle!=null) 
   serverHandle.stop(); 
  serverHandle = new ServerHandle(port); 
  new Thread(serverHandle,"Server").start(); 
 } 
 public static void main(String[] args){ 
  start(); 
 } 
}

ServerHandle:

package com.anxpp.io.calculator.nio; 
import java.io.IOException; 
import java.net.InetSocketAddress; 
import java.nio.ByteBuffer; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.ServerSocketChannel; 
import java.nio.channels.SocketChannel; 
import java.util.Iterator; 
import java.util.Set; 
 
import com.anxpp.io.utils.Calculator; 
/** 
 * NIO服務(wù)端 
 * @author yangtao__anxpp.com 
 * @version 1.0 
 */ 
public class ServerHandle implements Runnable{ 
 private Selector selector; 
 private ServerSocketChannel serverChannel; 
 private volatile boolean started; 
 /** 
  * 構(gòu)造方法 
  * @param port 指定要監(jiān)聽的端口號(hào) 
  */ 
 public ServerHandle(int port) { 
  try{ 
   //創(chuàng)建選擇器 
   selector = Selector.open(); 
   //打開監(jiān)聽通道 
   serverChannel = ServerSocketChannel.open(); 
   //如果為 true,則此通道將被置于阻塞模式;如果為 false,則此通道將被置于非阻塞模式 
   serverChannel.configureBlocking(false);//開啟非阻塞模式 
   //綁定端口 backlog設(shè)為1024 
   serverChannel.socket().bind(new InetSocketAddress(port),1024); 
   //監(jiān)聽客戶端連接請(qǐng)求 
   serverChannel.register(selector, SelectionKey.OP_ACCEPT); 
   //標(biāo)記服務(wù)器已開啟 
   started = true; 
   System.out.println("服務(wù)器已啟動(dòng),端口號(hào):" + port); 
  }catch(IOException e){ 
   e.printStackTrace(); 
   System.exit(1); 
  } 
 } 
 public void stop(){ 
  started = false; 
 } 
 @Override 
 public void run() { 
  //循環(huán)遍歷selector 
  while(started){ 
   try{ 
    //無(wú)論是否有讀寫事件發(fā)生,selector每隔1s被喚醒一次 
    selector.select(1000); 
    //阻塞,只有當(dāng)至少一個(gè)注冊(cè)的事件發(fā)生的時(shí)候才會(huì)繼續(xù). 
//    selector.select(); 
    Set<SelectionKey> keys = selector.selectedKeys(); 
    Iterator<SelectionKey> it = keys.iterator(); 
    SelectionKey key = null; 
    while(it.hasNext()){ 
     key = it.next(); 
     it.remove(); 
     try{ 
      handleInput(key); 
     }catch(Exception e){ 
      if(key != null){ 
       key.cancel(); 
       if(key.channel() != null){ 
        key.channel().close(); 
       } 
      } 
     } 
    } 
   }catch(Throwable t){ 
    t.printStackTrace(); 
   } 
  } 
  //selector關(guān)閉后會(huì)自動(dòng)釋放里面管理的資源 
  if(selector != null) 
   try{ 
    selector.close(); 
   }catch (Exception e) { 
    e.printStackTrace(); 
   } 
 } 
 private void handleInput(SelectionKey key) throws IOException{ 
  if(key.isValid()){ 
   //處理新接入的請(qǐng)求消息 
   if(key.isAcceptable()){ 
    ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); 
    //通過ServerSocketChannel的accept創(chuàng)建SocketChannel實(shí)例 
    //完成該操作意味著完成TCP三次握手,TCP物理鏈路正式建立 
    SocketChannel sc = ssc.accept(); 
    //設(shè)置為非阻塞的 
    sc.configureBlocking(false); 
    //注冊(cè)為讀 
    sc.register(selector, SelectionKey.OP_READ); 
   } 
   //讀消息 
   if(key.isReadable()){ 
    SocketChannel sc = (SocketChannel) key.channel(); 
    //創(chuàng)建ByteBuffer,并開辟一個(gè)1M的緩沖區(qū) 
    ByteBuffer buffer = ByteBuffer.allocate(1024); 
    //讀取請(qǐng)求碼流,返回讀取到的字節(jié)數(shù) 
    int readBytes = sc.read(buffer); 
    //讀取到字節(jié),對(duì)字節(jié)進(jìn)行編解碼 
    if(readBytes>0){ 
     //將緩沖區(qū)當(dāng)前的limit設(shè)置為position=0,用于后續(xù)對(duì)緩沖區(qū)的讀取操作 
     buffer.flip(); 
     //根據(jù)緩沖區(qū)可讀字節(jié)數(shù)創(chuàng)建字節(jié)數(shù)組 
     byte[] bytes = new byte[buffer.remaining()]; 
     //將緩沖區(qū)可讀字節(jié)數(shù)組復(fù)制到新建的數(shù)組中 
     buffer.get(bytes); 
     String expression = new String(bytes,"UTF-8"); 
     System.out.println("服務(wù)器收到消息:" + expression); 
     //處理數(shù)據(jù) 
     String result = null; 
     try{ 
      result = Calculator.cal(expression).toString(); 
     }catch(Exception e){ 
      result = "計(jì)算錯(cuò)誤:" + e.getMessage(); 
     } 
     //發(fā)送應(yīng)答消息 
     doWrite(sc,result); 
    } 
    //沒有讀取到字節(jié) 忽略 
//    else if(readBytes==0); 
    //鏈路已經(jīng)關(guān)閉,釋放資源 
    else if(readBytes<0){ 
     key.cancel(); 
     sc.close(); 
    } 
   } 
  } 
 } 
 //異步發(fā)送應(yīng)答消息 
 private void doWrite(SocketChannel channel,String response) throws IOException{ 
  //將消息編碼為字節(jié)數(shù)組 
  byte[] bytes = response.getBytes(); 
  //根據(jù)數(shù)組容量創(chuàng)建ByteBuffer 
  ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); 
  //將字節(jié)數(shù)組復(fù)制到緩沖區(qū) 
  writeBuffer.put(bytes); 
  //flip操作 
  writeBuffer.flip(); 
  //發(fā)送緩沖區(qū)的字節(jié)數(shù)組 
  channel.write(writeBuffer); 
  //****此處不含處理“寫半包”的代碼 
 } 
}

    可以看到,創(chuàng)建NIO服務(wù)端的主要步驟如下:

  1. 打開ServerSocketChannel,監(jiān)聽客戶端連接

  2. 綁定監(jiān)聽端口,設(shè)置連接為非阻塞模式

  3. 創(chuàng)建Reactor線程,創(chuàng)建多路復(fù)用器并啟動(dòng)線程

  4. 將ServerSocketChannel注冊(cè)到Reactor線程中的Selector上,監(jiān)聽ACCEPT事件

  5. Selector輪詢準(zhǔn)備就緒的key

  6. Selector監(jiān)聽到新的客戶端接入,處理新的接入請(qǐng)求,完成TCP三次握手,簡(jiǎn)歷物理鏈路

  7. 設(shè)置客戶端鏈路為非阻塞模式

  8. 將新接入的客戶端連接注冊(cè)到Reactor線程的Selector上,監(jiān)聽讀操作,讀取客戶端發(fā)送的網(wǎng)絡(luò)消息

  9. 異步讀取客戶端消息到緩沖區(qū)

  10. 對(duì)Buffer編解碼,處理半包消息,將解碼成功的消息封裝成Task

  11. 將應(yīng)答消息編碼為Buffer,調(diào)用SocketChannel的write將消息異步發(fā)送給客戶端

因?yàn)閼?yīng)答消息的發(fā)送,SocketChannel也是異步非阻塞的,所以不能保證一次能吧需要發(fā)送的數(shù)據(jù)發(fā)送完,此時(shí)就會(huì)出現(xiàn)寫半包的問題。我們需要注冊(cè)寫操作,不斷輪詢Selector將沒有發(fā)送完的消息發(fā)送完畢,然后通過Buffer的hasRemain()方法判斷消息是否發(fā)送完成。

2.6、NIO客戶端

還是直接上代碼吧,過程也不需要太多解釋了,跟服務(wù)端代碼有點(diǎn)類似。

Client:

package com.anxpp.io.calculator.nio; 
public class Client { 
 private static String DEFAULT_HOST = "127.0.0.1"; 
 private static int DEFAULT_PORT = 12345; 
 private static ClientHandle clientHandle; 
 public static void start(){ 
  start(DEFAULT_HOST,DEFAULT_PORT); 
 } 
 public static synchronized void start(String ip,int port){ 
  if(clientHandle!=null) 
   clientHandle.stop(); 
  clientHandle = new ClientHandle(ip,port); 
  new Thread(clientHandle,"Server").start(); 
 } 
 //向服務(wù)器發(fā)送消息 
 public static boolean sendMsg(String msg) throws Exception{ 
  if(msg.equals("q")) return false; 
  clientHandle.sendMsg(msg); 
  return true; 
 } 
 public static void main(String[] args){ 
  start(); 
 } 
}

ClientHandle:

package com.anxpp.io.calculator.nio; 
import java.io.IOException; 
import java.net.InetSocketAddress; 
import java.nio.ByteBuffer; 
import java.nio.channels.SelectionKey; 
import java.nio.channels.Selector; 
import java.nio.channels.SocketChannel; 
import java.util.Iterator; 
import java.util.Set; 
/** 
 * NIO客戶端 
 * @author yangtao__anxpp.com 
 * @version 1.0 
 */ 
public class ClientHandle implements Runnable{ 
 private String host; 
 private int port; 
 private Selector selector; 
 private SocketChannel socketChannel; 
 private volatile boolean started; 
 
 public ClientHandle(String ip,int port) { 
  this.host = ip; 
  this.port = port; 
  try{ 
   //創(chuàng)建選擇器 
   selector = Selector.open(); 
   //打開監(jiān)聽通道 
   socketChannel = SocketChannel.open(); 
   //如果為 true,則此通道將被置于阻塞模式;如果為 false,則此通道將被置于非阻塞模式 
   socketChannel.configureBlocking(false);//開啟非阻塞模式 
   started = true; 
  }catch(IOException e){ 
   e.printStackTrace(); 
   System.exit(1); 
  } 
 } 
 public void stop(){ 
  started = false; 
 } 
 @Override 
 public void run() { 
  try{ 
   doConnect(); 
  }catch(IOException e){ 
   e.printStackTrace(); 
   System.exit(1); 
  } 
  //循環(huán)遍歷selector 
  while(started){ 
   try{ 
    //無(wú)論是否有讀寫事件發(fā)生,selector每隔1s被喚醒一次 
    selector.select(1000); 
    //阻塞,只有當(dāng)至少一個(gè)注冊(cè)的事件發(fā)生的時(shí)候才會(huì)繼續(xù). 
//    selector.select(); 
    Set<SelectionKey> keys = selector.selectedKeys(); 
    Iterator<SelectionKey> it = keys.iterator(); 
    SelectionKey key = null; 
    while(it.hasNext()){ 
     key = it.next(); 
     it.remove(); 
     try{ 
      handleInput(key); 
     }catch(Exception e){ 
      if(key != null){ 
       key.cancel(); 
       if(key.channel() != null){ 
        key.channel().close(); 
       } 
      } 
     } 
    } 
   }catch(Exception e){ 
    e.printStackTrace(); 
    System.exit(1); 
   } 
  } 
  //selector關(guān)閉后會(huì)自動(dòng)釋放里面管理的資源 
  if(selector != null) 
   try{ 
    selector.close(); 
   }catch (Exception e) { 
    e.printStackTrace(); 
   } 
 } 
 private void handleInput(SelectionKey key) throws IOException{ 
  if(key.isValid()){ 
   SocketChannel sc = (SocketChannel) key.channel(); 
   if(key.isConnectable()){ 
    if(sc.finishConnect()); 
    else System.exit(1); 
   } 
   //讀消息 
   if(key.isReadable()){ 
    //創(chuàng)建ByteBuffer,并開辟一個(gè)1M的緩沖區(qū) 
    ByteBuffer buffer = ByteBuffer.allocate(1024); 
    //讀取請(qǐng)求碼流,返回讀取到的字節(jié)數(shù) 
    int readBytes = sc.read(buffer); 
    //讀取到字節(jié),對(duì)字節(jié)進(jìn)行編解碼 
    if(readBytes>0){ 
     //將緩沖區(qū)當(dāng)前的limit設(shè)置為position=0,用于后續(xù)對(duì)緩沖區(qū)的讀取操作 
     buffer.flip(); 
     //根據(jù)緩沖區(qū)可讀字節(jié)數(shù)創(chuàng)建字節(jié)數(shù)組 
     byte[] bytes = new byte[buffer.remaining()]; 
     //將緩沖區(qū)可讀字節(jié)數(shù)組復(fù)制到新建的數(shù)組中 
     buffer.get(bytes); 
     String result = new String(bytes,"UTF-8"); 
     System.out.println("客戶端收到消息:" + result); 
    } 
    //沒有讀取到字節(jié) 忽略 
//    else if(readBytes==0); 
    //鏈路已經(jīng)關(guān)閉,釋放資源 
    else if(readBytes<0){ 
     key.cancel(); 
     sc.close(); 
    } 
   } 
  } 
 } 
 //異步發(fā)送消息 
 private void doWrite(SocketChannel channel,String request) throws IOException{ 
  //將消息編碼為字節(jié)數(shù)組 
  byte[] bytes = request.getBytes(); 
  //根據(jù)數(shù)組容量創(chuàng)建ByteBuffer 
  ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); 
  //將字節(jié)數(shù)組復(fù)制到緩沖區(qū) 
  writeBuffer.put(bytes); 
  //flip操作 
  writeBuffer.flip(); 
  //發(fā)送緩沖區(qū)的字節(jié)數(shù)組 
  channel.write(writeBuffer); 
  //****此處不含處理“寫半包”的代碼 
 } 
 private void doConnect() throws IOException{ 
  if(socketChannel.connect(new InetSocketAddress(host,port))); 
  else socketChannel.register(selector, SelectionKey.OP_CONNECT); 
 } 
 public void sendMsg(String msg) throws Exception{ 
  socketChannel.register(selector, SelectionKey.OP_READ); 
  doWrite(socketChannel, msg); 
 } 
}

2.7、演示結(jié)果

首先運(yùn)行服務(wù)器,順便也運(yùn)行一個(gè)客戶端:

package com.anxpp.io.calculator.nio; 
import java.util.Scanner; 
/** 
 * 測(cè)試方法 
 * @author yangtao__anxpp.com 
 * @version 1.0 
 */ 
public class Test { 
 //測(cè)試主方法 
 @SuppressWarnings("resource") 
 public static void main(String[] args) throws Exception{ 
  //運(yùn)行服務(wù)器 
  Server.start(); 
  //避免客戶端先于服務(wù)器啟動(dòng)前執(zhí)行代碼 
  Thread.sleep(100); 
  //運(yùn)行客戶端 
  Client.start(); 
  while(Client.sendMsg(new Scanner(System.in).nextLine())); 
 } 
}

我們也可以單獨(dú)運(yùn)行客戶端,效果都是一樣的。

一次測(cè)試的結(jié)果:

服務(wù)器已啟動(dòng),端口號(hào):12345
1+2+3+4+5+6
服務(wù)器收到消息:1+2+3+4+5+6
客戶端收到消息:21
1*2/3-4+5*6/7-8
服務(wù)器收到消息:1*2/3-4+5*6/7-8
客戶端收到消息:-7.0476190476190474

運(yùn)行多個(gè)客戶端,都是沒有問題的。

3、AIO編程

NIO 2.0引入了新的異步通道的概念,并提供了異步文件通道和異步套接字通道的實(shí)現(xiàn)。

異步的套接字通道時(shí)真正的異步非阻塞I/O,對(duì)應(yīng)于UNIX網(wǎng)絡(luò)編程中的事件驅(qū)動(dòng)I/O(AIO)。他不需要過多的Selector對(duì)注冊(cè)的通道進(jìn)行輪詢即可實(shí)現(xiàn)異步讀寫,從而簡(jiǎn)化了NIO的編程模型。

直接上代碼吧。

3.1、Server端代碼

Server:

package com.anxpp.io.calculator.aio.server; 
/** 
 * AIO服務(wù)端 
 * @author yangtao__anxpp.com 
 * @version 1.0 
 */ 
public class Server { 
 private static int DEFAULT_PORT = 12345; 
 private static AsyncServerHandler serverHandle; 
 public volatile static long clientCount = 0; 
 public static void start(){ 
  start(DEFAULT_PORT); 
 } 
 public static synchronized void start(int port){ 
  if(serverHandle!=null) 
   return; 
  serverHandle = new AsyncServerHandler(port); 
  new Thread(serverHandle,"Server").start(); 
 } 
 public static void main(String[] args){ 
  Server.start(); 
 } 
}

AsyncServerHandler:

package com.anxpp.io.calculator.aio.server; 
import java.io.IOException; 
import java.net.InetSocketAddress; 
import java.nio.channels.AsynchronousServerSocketChannel; 
import java.util.concurrent.CountDownLatch; 
public class AsyncServerHandler implements Runnable { 
 public CountDownLatch latch; 
 public AsynchronousServerSocketChannel channel; 
 public AsyncServerHandler(int port) { 
  try { 
   //創(chuàng)建服務(wù)端通道 
   channel = AsynchronousServerSocketChannel.open(); 
   //綁定端口 
   channel.bind(new InetSocketAddress(port)); 
   System.out.println("服務(wù)器已啟動(dòng),端口號(hào):" + port); 
  } catch (IOException e) { 
   e.printStackTrace(); 
  } 
 } 
 @Override 
 public void run() { 
  //CountDownLatch初始化 
  //它的作用:在完成一組正在執(zhí)行的操作之前,允許當(dāng)前的現(xiàn)場(chǎng)一直阻塞 
  //此處,讓現(xiàn)場(chǎng)在此阻塞,防止服務(wù)端執(zhí)行完成后退出 
  //也可以使用while(true)+sleep 
  //生成環(huán)境就不需要擔(dān)心這個(gè)問題,以為服務(wù)端是不會(huì)退出的 
  latch = new CountDownLatch(1); 
  //用于接收客戶端的連接 
  channel.accept(this,new AcceptHandler()); 
  try { 
   latch.await(); 
  } catch (InterruptedException e) { 
   e.printStackTrace(); 
  } 
 } 
}

AcceptHandler:

package com.anxpp.io.calculator.aio.server; 
import java.nio.ByteBuffer; 
import java.nio.channels.AsynchronousSocketChannel; 
import java.nio.channels.CompletionHandler; 
//作為handler接收客戶端連接 
public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> { 
 @Override 
 public void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) { 
  //繼續(xù)接受其他客戶端的請(qǐng)求 
  Server.clientCount++; 
  System.out.println("連接的客戶端數(shù):" + Server.clientCount); 
  serverHandler.channel.accept(serverHandler, this); 
  //創(chuàng)建新的Buffer 
  ByteBuffer buffer = ByteBuffer.allocate(1024); 
  //異步讀 第三個(gè)參數(shù)為接收消息回調(diào)的業(yè)務(wù)Handler 
  channel.read(buffer, buffer, new ReadHandler(channel)); 
 } 
 @Override 
 public void failed(Throwable exc, AsyncServerHandler serverHandler) { 
  exc.printStackTrace(); 
  serverHandler.latch.countDown(); 
 } 
}

ReadHandler:

package com.anxpp.io.calculator.aio.server; 
import java.io.IOException; 
import java.io.UnsupportedEncodingException; 
import java.nio.ByteBuffer; 
import java.nio.channels.AsynchronousSocketChannel; 
import java.nio.channels.CompletionHandler; 
import com.anxpp.io.utils.Calculator; 
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { 
 //用于讀取半包消息和發(fā)送應(yīng)答 
 private AsynchronousSocketChannel channel; 
 public ReadHandler(AsynchronousSocketChannel channel) { 
   this.channel = channel; 
 } 
 //讀取到消息后的處理 
 @Override 
 public void completed(Integer result, ByteBuffer attachment) { 
  //flip操作 
  attachment.flip(); 
  //根據(jù) 
  byte[] message = new byte[attachment.remaining()]; 
  attachment.get(message); 
  try { 
   String expression = new String(message, "UTF-8"); 
   System.out.println("服務(wù)器收到消息: " + expression); 
   String calrResult = null; 
   try{ 
    calrResult = Calculator.cal(expression).toString(); 
   }catch(Exception e){ 
    calrResult = "計(jì)算錯(cuò)誤:" + e.getMessage(); 
   } 
   //向客戶端發(fā)送消息 
   doWrite(calrResult); 
  } catch (UnsupportedEncodingException e) { 
   e.printStackTrace(); 
  } 
 } 
 //發(fā)送消息 
 private void doWrite(String result) { 
  byte[] bytes = result.getBytes(); 
  ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); 
  writeBuffer.put(bytes); 
  writeBuffer.flip(); 
  //異步寫數(shù)據(jù) 參數(shù)與前面的read一樣 
  channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() { 
   @Override 
   public void completed(Integer result, ByteBuffer buffer) { 
    //如果沒有發(fā)送完,就繼續(xù)發(fā)送直到完成 
    if (buffer.hasRemaining()) 
     channel.write(buffer, buffer, this); 
    else{ 
     //創(chuàng)建新的Buffer 
     ByteBuffer readBuffer = ByteBuffer.allocate(1024); 
     //異步讀 第三個(gè)參數(shù)為接收消息回調(diào)的業(yè)務(wù)Handler 
     channel.read(readBuffer, readBuffer, new ReadHandler(channel)); 
    } 
   } 
   @Override 
   public void failed(Throwable exc, ByteBuffer attachment) { 
    try { 
     channel.close(); 
    } catch (IOException e) { 
    } 
   } 
  }); 
 } 
 @Override 
 public void failed(Throwable exc, ByteBuffer attachment) { 
  try { 
   this.channel.close(); 
  } catch (IOException e) { 
   e.printStackTrace(); 
  } 
 } 
}

OK,這樣就已經(jīng)完成了,其實(shí)說起來也簡(jiǎn)單,雖然代碼感覺很多,但是API比NIO的使用起來真的簡(jiǎn)單多了,主要就是監(jiān)聽、讀、寫等各種CompletionHandler。此處本應(yīng)有一個(gè)WriteHandler的,確實(shí),我們?cè)赗eadHandler中,以一個(gè)匿名內(nèi)部類實(shí)現(xiàn)了它。

下面看客戶端代碼。

3.2、Client端代碼

Client:

package com.anxpp.io.calculator.aio.client; 
import java.util.Scanner; 
public class Client { 
 private static String DEFAULT_HOST = "127.0.0.1"; 
 private static int DEFAULT_PORT = 12345; 
 private static AsyncClientHandler clientHandle; 
 public static void start(){ 
  start(DEFAULT_HOST,DEFAULT_PORT); 
 } 
 public static synchronized void start(String ip,int port){ 
  if(clientHandle!=null) 
   return; 
  clientHandle = new AsyncClientHandler(ip,port); 
  new Thread(clientHandle,"Client").start(); 
 } 
 //向服務(wù)器發(fā)送消息 
 public static boolean sendMsg(String msg) throws Exception{ 
  if(msg.equals("q")) return false; 
  clientHandle.sendMsg(msg); 
  return true; 
 } 
 @SuppressWarnings("resource") 
 public static void main(String[] args) throws Exception{ 
  Client.start(); 
  System.out.println("請(qǐng)輸入請(qǐng)求消息:"); 
  Scanner scanner = new Scanner(System.in); 
  while(Client.sendMsg(scanner.nextLine())); 
 } 
}

AsyncClientHandler:

package com.anxpp.io.calculator.aio.client; 
import java.io.IOException; 
import java.net.InetSocketAddress; 
import java.nio.ByteBuffer; 
import java.nio.channels.AsynchronousSocketChannel; 
import java.nio.channels.CompletionHandler; 
import java.util.concurrent.CountDownLatch; 
public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable { 
 private AsynchronousSocketChannel clientChannel; 
 private String host; 
 private int port; 
 private CountDownLatch latch; 
 public AsyncClientHandler(String host, int port) { 
  this.host = host; 
  this.port = port; 
  try { 
   //創(chuàng)建異步的客戶端通道 
   clientChannel = AsynchronousSocketChannel.open(); 
  } catch (IOException e) { 
   e.printStackTrace(); 
  } 
 } 
 @Override 
 public void run() { 
  //創(chuàng)建CountDownLatch等待 
  latch = new CountDownLatch(1); 
  //發(fā)起異步連接操作,回調(diào)參數(shù)就是這個(gè)類本身,如果連接成功會(huì)回調(diào)completed方法 
  clientChannel.connect(new InetSocketAddress(host, port), this, this); 
  try { 
   latch.await(); 
  } catch (InterruptedException e1) { 
   e1.printStackTrace(); 
  } 
  try { 
   clientChannel.close(); 
  } catch (IOException e) { 
   e.printStackTrace(); 
  } 
 } 
 //連接服務(wù)器成功 
 //意味著TCP三次握手完成 
 @Override 
 public void completed(Void result, AsyncClientHandler attachment) { 
  System.out.println("客戶端成功連接到服務(wù)器..."); 
 } 
 //連接服務(wù)器失敗 
 @Override 
 public void failed(Throwable exc, AsyncClientHandler attachment) { 
  System.err.println("連接服務(wù)器失敗..."); 
  exc.printStackTrace(); 
  try { 
   clientChannel.close(); 
   latch.countDown(); 
  } catch (IOException e) { 
   e.printStackTrace(); 
  } 
 } 
 //向服務(wù)器發(fā)送消息 
 public void sendMsg(String msg){ 
  byte[] req = msg.getBytes(); 
  ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); 
  writeBuffer.put(req); 
  writeBuffer.flip(); 
  //異步寫 
  clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch)); 
 } 
}

WriteHandler:
 

package com.anxpp.io.calculator.aio.client; 
import java.io.IOException; 
import java.nio.ByteBuffer; 
import java.nio.channels.AsynchronousSocketChannel; 
import java.nio.channels.CompletionHandler; 
import java.util.concurrent.CountDownLatch; 
public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { 
 private AsynchronousSocketChannel clientChannel; 
 private CountDownLatch latch; 
 public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { 
  this.clientChannel = clientChannel; 
  this.latch = latch; 
 } 
 @Override 
 public void completed(Integer result, ByteBuffer buffer) { 
  //完成全部數(shù)據(jù)的寫入 
  if (buffer.hasRemaining()) { 
   clientChannel.write(buffer, buffer, this); 
  } 
  else { 
   //讀取數(shù)據(jù) 
   ByteBuffer readBuffer = ByteBuffer.allocate(1024); 
   clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch)); 
  } 
 } 
 @Override 
 public void failed(Throwable exc, ByteBuffer attachment) { 
  System.err.println("數(shù)據(jù)發(fā)送失敗..."); 
  try { 
   clientChannel.close(); 
   latch.countDown(); 
  } catch (IOException e) { 
  } 
 } 
}

ReadHandler:

package com.anxpp.io.calculator.aio.client; 
import java.io.IOException; 
import java.io.UnsupportedEncodingException; 
import java.nio.ByteBuffer; 
import java.nio.channels.AsynchronousSocketChannel; 
import java.nio.channels.CompletionHandler; 
import java.util.concurrent.CountDownLatch; 
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { 
 private AsynchronousSocketChannel clientChannel; 
 private CountDownLatch latch; 
 public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { 
  this.clientChannel = clientChannel; 
  this.latch = latch; 
 } 
 @Override 
 public void completed(Integer result,ByteBuffer buffer) { 
  buffer.flip(); 
  byte[] bytes = new byte[buffer.remaining()]; 
  buffer.get(bytes); 
  String body; 
  try { 
   body = new String(bytes,"UTF-8"); 
   System.out.println("客戶端收到結(jié)果:"+ body); 
  } catch (UnsupportedEncodingException e) { 
   e.printStackTrace(); 
  } 
 } 
 @Override 
 public void failed(Throwable exc,ByteBuffer attachment) { 
  System.err.println("數(shù)據(jù)讀取失敗..."); 
  try { 
   clientChannel.close(); 
   latch.countDown(); 
  } catch (IOException e) { 
  } 
 } 
}

這個(gè)API使用起來真的是很順手。

3.3、測(cè)試

Test:

package com.anxpp.io.calculator.aio; 
import java.util.Scanner; 
import com.anxpp.io.calculator.aio.client.Client; 
import com.anxpp.io.calculator.aio.server.Server; 
/** 
 * 測(cè)試方法 
 * @author yangtao__anxpp.com 
 * @version 1.0 
 */ 
public class Test { 
 //測(cè)試主方法 
 @SuppressWarnings("resource") 
 public static void main(String[] args) throws Exception{ 
  //運(yùn)行服務(wù)器 
  Server.start(); 
  //避免客戶端先于服務(wù)器啟動(dòng)前執(zhí)行代碼 
  Thread.sleep(100); 
  //運(yùn)行客戶端 
  Client.start(); 
  System.out.println("請(qǐng)輸入請(qǐng)求消息:"); 
  Scanner scanner = new Scanner(System.in); 
  while(Client.sendMsg(scanner.nextLine())); 
 } 
}

我們可以在控制臺(tái)輸入我們需要計(jì)算的算數(shù)字符串,服務(wù)器就會(huì)返回結(jié)果,當(dāng)然,我們也可以運(yùn)行大量的客戶端,都是沒有問題的,以為此處設(shè)計(jì)為單例客戶端,所以也就沒有演示大量客戶端并發(fā)。

讀者可以自己修改Client類,然后開辟大量線程,并使用構(gòu)造方法創(chuàng)建很多的客戶端測(cè)試。

下面是其中一次參數(shù)的輸出:

服務(wù)器已啟動(dòng),端口號(hào):12345
請(qǐng)輸入請(qǐng)求消息:
客戶端成功連接到服務(wù)器...
連接的客戶端數(shù):1
123456+789+456
服務(wù)器收到消息: 123456+789+456
客戶端收到結(jié)果:124701
9526*56
服務(wù)器收到消息: 9526*56
客戶端收到結(jié)果:533456
...

AIO是真正的異步非阻塞的,所以,在面對(duì)超級(jí)大量的客戶端,更能得心應(yīng)手。

下面就比較一下,幾種I/O編程的優(yōu)缺點(diǎn)。

4、各種I/O的對(duì)比

先以一張表來直觀的對(duì)比一下:

Java中網(wǎng)絡(luò)IO編程的示例分析

具體選擇什么樣的模型或者NIO框架,完全基于業(yè)務(wù)的實(shí)際應(yīng)用場(chǎng)景和性能需求,如果客戶端很少,服務(wù)器負(fù)荷不重,就沒有必要選擇開發(fā)起來相對(duì)不那么簡(jiǎn)單的NIO做服務(wù)端;相反,就應(yīng)考慮使用NIO或者相關(guān)的框架了。

5、附錄

上文中服務(wù)端使用到的用于計(jì)算的工具類:

package com.anxpp.utils;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
public final class Calculator { 
 private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript");
 public static Object cal(String expression) throws ScriptException{
  return jse.eval(expression);
 }
}

感謝各位的閱讀!關(guān)于“Java中網(wǎng)絡(luò)IO編程的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,讓大家可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到吧!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI