溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Java中怎么實(shí)現(xiàn)NIO非阻塞網(wǎng)絡(luò)編程

發(fā)布時(shí)間:2021-06-30 17:24:33 來源:億速云 閱讀:142 作者:Leah 欄目:大數(shù)據(jù)

這篇文章將為大家詳細(xì)講解有關(guān)Java中怎么實(shí)現(xiàn)NIO非阻塞網(wǎng)絡(luò)編程,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。

Buffer(緩沖區(qū))

緩沖區(qū)本質(zhì)上是一個(gè)可以寫入數(shù)據(jù)的內(nèi)存塊(類似數(shù)組),然后可以再次讀取。此內(nèi)存塊包含在NIO Buffer對(duì)象中,該對(duì)象提供了一組方法,可以更輕松的使用內(nèi)存塊。
相對(duì)于直接操作數(shù)組,Buffer API提供了更加容易的操作和管理,其進(jìn)行數(shù)據(jù)的操作分為寫入和讀取,主要步驟如下:

  1. 將數(shù)據(jù)寫入緩沖區(qū)

  2. 調(diào)用buffer.flip(),轉(zhuǎn)換為讀取模式

  3. 緩沖區(qū)讀取數(shù)據(jù)

  4. 調(diào)用buffer.clear()或buffer.compact()清除緩沖區(qū)

Buffer中有三個(gè)重要屬性:

capacity(容量):作為一個(gè)內(nèi)存塊,Buffer具有一定的固定大小,也稱為容量
position(位置):寫入模式時(shí)代表寫數(shù)據(jù)的位置,讀取模式時(shí)代表讀取數(shù)據(jù)的位置
limit(限制):寫入模式等于Buffer的容量,讀取模式時(shí)等于寫入的數(shù)據(jù)量

Java中怎么實(shí)現(xiàn)NIO非阻塞網(wǎng)絡(luò)編程

Buffer使用代碼示例:

public class BufferDemo {
 public static void main(String[] args) {
   // 構(gòu)建一個(gè)byte字節(jié)緩沖區(qū),容量是4
   ByteBuffer byteBuffer = ByteBuffer.allocate(4);

   // 默認(rèn)寫入模式,查看三個(gè)重要的指標(biāo)
   System.out.println(
       String.format(
           "初始化:capacity容量:%s, position位置:%s, limit限制:%s",
           byteBuffer.capacity(), byteBuffer.position(), byteBuffer.limit()));

   // 寫入數(shù)據(jù)
   byteBuffer.put((byte) 1);
   byteBuffer.put((byte) 2);
   byteBuffer.put((byte) 3);

   // 再次查看三個(gè)重要的指標(biāo)
   System.out.println(
       String.format(
           "寫入3字節(jié)后后:capacity容量:%s, position位置:%s, limit限制:%s",
           byteBuffer.capacity(), byteBuffer.position(), byteBuffer.limit()));

   // 轉(zhuǎn)換為讀取模式(不調(diào)用flip方法,也是可以讀取數(shù)據(jù)的,但是position記錄讀取的位置不對(duì))
   System.out.println("開始讀取");
   byteBuffer.flip();
   byte a = byteBuffer.get();
   System.out.println(a);
   byte b = byteBuffer.get();
   System.out.println(b);
   System.out.println(
       String.format(
           "讀取2字節(jié)數(shù)據(jù)后,capacity容量:%s, position位置:%s, limit限制:%s",
           byteBuffer.capacity(), byteBuffer.position(), byteBuffer.limit()));

   // 繼續(xù)寫入3字節(jié),此時(shí)讀模式下,limit=3,position=2.繼續(xù)寫入只能覆蓋寫入一條數(shù)據(jù)
   // clear()方法清除整個(gè)緩沖區(qū)。compact()方法僅清除已閱讀的數(shù)據(jù)。轉(zhuǎn)為寫入模式
   byteBuffer.compact();
   // 清除了已經(jīng)讀取的2字節(jié),剩余1字節(jié),還可以寫入3字節(jié)數(shù)據(jù)
   // 多寫的話會(huì)報(bào)java.nio.BufferOverflowException異常
   byteBuffer.put((byte) 3);
   byteBuffer.put((byte) 4);
   byteBuffer.put((byte) 5);
   System.out.println(
       String.format(
           "最終的情況,capacity容量:%s, position位置:%s, limit限制:%s",
           byteBuffer.capacity(), byteBuffer.position(), byteBuffer.limit()));
 }
}
ByteBuffer堆外內(nèi)存

ByteBuffer為性能關(guān)鍵型代碼提供了直接內(nèi)存(direct,堆外)和非直接內(nèi)存(heap,堆)兩種實(shí)現(xiàn)。堆外內(nèi)存實(shí)現(xiàn)將內(nèi)存對(duì)象分配在Java虛擬機(jī)的堆以外的內(nèi)存,這些內(nèi)存直接受操作系統(tǒng)管理,而不是虛擬機(jī),這樣做的結(jié)果就是能夠在一定程度上減少垃圾回收對(duì)應(yīng)用程序造成的影響,提供運(yùn)行的速度。

堆外內(nèi)存的獲取方式:                   

ByteBuffer byteBuffer = ByteBuffer.allocateDirect(noBytes)

堆外內(nèi)存的好處:

  • 進(jìn)行網(wǎng)絡(luò)IO或者文件IO時(shí)比heap buffer少一次拷貝。

    (file/socket — OS memory — jvm heap)在寫file和socket的過程中,GC會(huì)移動(dòng)對(duì)象,JVM的實(shí)現(xiàn)中會(huì)把數(shù)據(jù)復(fù)制到堆外,再進(jìn)行寫入。

  • GC范圍之外,降低GC壓力,但實(shí)現(xiàn)了自動(dòng)管理,DirectByteBuffer中有一個(gè)Cleaner對(duì)象(PhantomReference),Cleaner被GC執(zhí)行前會(huì)執(zhí)行clean方法,觸發(fā)DirectByteBuffer中定義的Deallocator

堆外內(nèi)存的使用建議:

  • 性能確實(shí)可觀的時(shí)候才去使用,分配給大型,長(zhǎng)壽命的對(duì)象(網(wǎng)絡(luò)傳輸,文件讀寫等場(chǎng)景)

  • 通過虛擬機(jī)參數(shù)MaxDirectMemorySize限制大小,防止耗盡整個(gè)機(jī)器的內(nèi)存

Channel(通道)

Channel用于源節(jié)點(diǎn)與目標(biāo)節(jié)點(diǎn)之間的連接,Channel類似于傳統(tǒng)的IO Stream,Channel本身不能直接訪問數(shù)據(jù),Channel只能與Buffer進(jìn)行交互。

Channel的API涵蓋了TCP/UDP網(wǎng)絡(luò)和文件IO,常用的類有FileChannel,DatagramChannel,SocketChannel,ServerSocketChannel

標(biāo)準(zhǔn)IO Stream通常是單向的(InputStream/OutputStream),而Channel是一個(gè)雙向的通道,可以在一個(gè)通道內(nèi)進(jìn)行讀取和寫入,可以非阻塞的讀取和寫入通道,而且通道始終讀取和寫入緩沖區(qū)(即Channel必須配合Buffer進(jìn)行使用)。

Java中怎么實(shí)現(xiàn)NIO非阻塞網(wǎng)絡(luò)編程

SocketChannel

SocketChannel用于建立TCP網(wǎng)絡(luò)連接,類似java.net.Socket。有兩種創(chuàng)建SocketChannel的形式,一個(gè)是客戶端主動(dòng)發(fā)起和服務(wù)器的連接,還有一個(gè)就是服務(wù)端獲取的新連接。SocketChannel中有兩個(gè)重要的方法,一個(gè)是write()寫方法,write()寫方法有可能在尚未寫入內(nèi)容的時(shí)候就返回了,需要在循環(huán)中調(diào)用write()方法。還有一個(gè)就是read()讀方法,read()方法可能直接返回根本不讀取任何數(shù)據(jù),可以根據(jù)返回的int值判斷讀取了多少字節(jié)。

核心代碼代碼示例片段:

// 客戶端主動(dòng)發(fā)起連接SocketChannel
socketChannel = SocketChannel.open();

// 設(shè)置為非阻塞模式socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));

// 發(fā)生請(qǐng)求數(shù)據(jù) - 向通道寫入數(shù)據(jù)
socketChannel.write(byteBuffer);

// 讀取服務(wù)端返回 - 讀取緩沖區(qū)數(shù)據(jù)
int readBytes = socketChannel.read(requestBuffer);

socketChannel.close(); // 關(guān)閉連接

ServerSocketChannel

ServerSocketChannel可以監(jiān)聽新建的TCP連接通道,類似ServerSocket。ServerSocketChannel的核心方法accept()方法,如果通道處于非阻塞模式,那么如果沒有掛起的連接,該方法將立即返回null,實(shí)際使用中必須檢查返回的SocketChannel是否為null。

核心代碼示例片段:

// 創(chuàng)建網(wǎng)絡(luò)服務(wù)端
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

// 設(shè)置為非阻塞模式
serverSocketChannel.configureBlocking(false);

// 綁定端口
serverSocketChannel.socket().bind(new InetSocketAddress(8080));

while (true) {

 // 獲取新tcp連接通道
 SocketChannel socketChannel = serverSocketChannel.accept();

 if (socketChannel != null) {
   // tcp請(qǐng)求 讀取/響應(yīng)

 }

}

Selector選擇器

Selector也是Java NIO核心組件,可以檢查一個(gè)或多個(gè)NIO通道,并確定哪些通道已經(jīng)準(zhǔn)備好進(jìn)行讀取或?qū)懭?。?shí)現(xiàn)單個(gè)線程可以管理多個(gè)通道,從而管理多個(gè)網(wǎng)絡(luò)連接。

一個(gè)線程使用Selector可以監(jiān)聽多個(gè)Channel的不同事件,其中主要有四種事件,分別對(duì)應(yīng)SelectionKey中的四個(gè)常量,分別為:

  • 連接事件 SelectionKey.OP_CONNECT

  • 準(zhǔn)備就緒事件 SelectionKey.OP_ACCEPT

  • 讀取事件 SelectionKey.OP_READ

  • 寫入事件 SelectionKey.OP_WRITE

Java中怎么實(shí)現(xiàn)NIO非阻塞網(wǎng)絡(luò)編程

Selector實(shí)現(xiàn)一個(gè)線程處理多個(gè)通道的核心在于事件驅(qū)動(dòng)機(jī)制,非阻塞的網(wǎng)絡(luò)通道下,開發(fā)者通過Selector注冊(cè)對(duì)于通道感興趣的事件類型,線程通過監(jiān)聽事件來觸發(fā)相應(yīng)的代碼執(zhí)行。(更底層其實(shí)是操作系統(tǒng)的多路復(fù)用機(jī)制)

核心代碼示例片段:

// 構(gòu)建一個(gè)Selector選擇器,并且將channel注冊(cè)上去
Selector selector = Selector.open();

// 將serverSocketChannel注冊(cè)到selector
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);
// 對(duì)serverSocketChannel上面的accept事件感興趣(serverSocketChannel只能支持accept操作)
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
while (true) {

 // 用下面輪詢事件的方式.select方法有阻塞效果,直到有事件通知才會(huì)有返回
 selector.select();

 // 獲取事件
 Set<SelectionKey> keys = selector.selectedKeys();

 // 遍歷查詢結(jié)果
 Iterator<SelectionKey> iterator = keys.iterator();

 while (iterator.hasNext()) {

   // 被封裝的查詢結(jié)果
   SelectionKey key = iterator.next();

   // 判斷不同的事件類型,執(zhí)行對(duì)應(yīng)的邏輯處理
   if (key.isAcceptable()) {
     // 處理連接的邏輯

   }

   if (key.isReadable()) {
     //處理讀數(shù)據(jù)的邏輯

   }

   iterator.remove();
 }
}

NIO網(wǎng)絡(luò)編程完整代碼

服務(wù)端代碼示例:

// 結(jié)合Selector實(shí)現(xiàn)的非阻塞服務(wù)端(放棄對(duì)channel的輪詢,借助消息通知機(jī)制)
public class NIOServer {
 public static void main(String[] args) throws IOException {
   // 創(chuàng)建網(wǎng)絡(luò)服務(wù)端ServerSocketChannel
   ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

   // 設(shè)置為非阻塞模式
   serverSocketChannel.configureBlocking(false);

   // 構(gòu)建一個(gè)Selector選擇器,并且將channel注冊(cè)上去
   Selector selector = Selector.open();

   // 將serverSocketChannel注冊(cè)到selector
   SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);
   // 對(duì)serverSocketChannel上面的accept事件感興趣(serverSocketChannel只能支持accept操作)
   selectionKey.interestOps(SelectionKey.OP_ACCEPT);

   // 綁定端口
   serverSocketChannel.socket().bind(new InetSocketAddress(8080));

   System.out.println("啟動(dòng)成功");
   while (true) {
     // 不再輪詢通道,改用下面輪詢事件的方式.select方法有阻塞效果,直到有事件通知才會(huì)有返回      selector.select();
     // 獲取事件
     Set<SelectionKey> keys = selector.selectedKeys();

     // 遍歷查詢結(jié)果
     Iterator<SelectionKey> iterator = keys.iterator();

     while (iterator.hasNext()) {

       // 被封裝的查詢結(jié)果
       SelectionKey key = iterator.next();

       iterator.remove();

       // 關(guān)注 Read 和 Accept兩個(gè)事件
       if (key.isAcceptable()) {

         ServerSocketChannel server = (ServerSocketChannel) key.attachment();

         // 將拿到的客戶端連接通道,注冊(cè)到selector上面
         SocketChannel clientSocketChannel = server.accept();

         clientSocketChannel.configureBlocking(false);
         clientSocketChannel.register(selector, SelectionKey.OP_READ, clientSocketChannel);
         System.out.println("收到新連接 : " + clientSocketChannel.getRemoteAddress());
       }
       if (key.isReadable()) {
         SocketChannel socketChannel = (SocketChannel) key.attachment();
         try {
           ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
           while (socketChannel.isOpen() && socketChannel.read(byteBuffer) != -1) {
             // 長(zhǎng)連接情況下,需要手動(dòng)判斷數(shù)據(jù)有沒有讀取結(jié)束 (此處做一個(gè)簡(jiǎn)單的判斷: 超過0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了)
             if (byteBuffer.position() > 0) break;
           }
           if (byteBuffer.position() == 0) continue;
           byteBuffer.flip();
           byte[] content = new byte[byteBuffer.limit()];
           byteBuffer.get(content);
           System.out.println(new String(content));
           System.out.println("收到數(shù)據(jù),來自:" + socketChannel.getRemoteAddress());
           // 響應(yīng)結(jié)果 200
           String response = "HTTP/1.1 200 OK\r\n" + "Content-Length: 11\r\n\r\n" + "Hello World";
           ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
           while (buffer.hasRemaining()) {
             socketChannel.write(buffer);
           }
         } catch (Exception e) {
           e.printStackTrace();

           key.cancel(); // 取消事件訂閱

         }

       }
       selector.selectNow();
     }
   }
 }
}

客戶端代碼示例:

public class NIOClient {
 public static void main(String[] args) throws IOException {

   // 客戶端主動(dòng)發(fā)起連接
   SocketChannel socketChannel = SocketChannel.open();

   // 設(shè)置為非阻塞模式
   socketChannel.configureBlocking(false);

   socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
   while (!socketChannel.finishConnect()) {
     // 沒連接上,則一直等待
     Thread.yield();

   }
   Scanner scanner = new Scanner(System.in);
   System.out.println("請(qǐng)輸入:");

   // 發(fā)送內(nèi)容
   String msg = scanner.nextLine();
   ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
   while (byteBuffer.hasRemaining()) {
     socketChannel.write(byteBuffer);
   }

   // 讀取響應(yīng)
   System.out.println("收到服務(wù)端響應(yīng):");
   ByteBuffer buffer = ByteBuffer.allocate(1024);
   while (socketChannel.isOpen() && socketChannel.read(buffer) != -1) {
     // 長(zhǎng)連接情況下,需要手動(dòng)判斷數(shù)據(jù)有沒有讀取結(jié)束 (此處做一個(gè)簡(jiǎn)單的判斷: 超過0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了)
     if (buffer.position() > 0) break;
   }
   buffer.flip();
   byte[] content = new byte[buffer.limit()];
   buffer.get(content);
   System.out.println(new String(content));
   scanner.close();
   socketChannel.close();
 }
}

NIO與BIO的比較

Java中怎么實(shí)現(xiàn)NIO非阻塞網(wǎng)絡(luò)編程

關(guān)于Java中怎么實(shí)現(xiàn)NIO非阻塞網(wǎng)絡(luò)編程就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI