溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

JAVA NIO下I/O的阻塞與非阻塞實現(xiàn)

發(fā)布時間:2021-09-13 11:30:53 來源:億速云 閱讀:108 作者:chen 欄目:編程語言

本篇內容介紹了“JAVA NIO下I/O的阻塞與非阻塞實現(xiàn)”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

當前環(huán)境

  1. jdk == 1.8

代碼地址

git 地址: https://github.com/jasonGeng88/java-network-programming

知識點

  • nio 下 I/O 阻塞與非阻塞實現(xiàn)

  • SocketChannel 介紹

  • I/O 多路復用的原理

  • 事件選擇器與 SocketChannel 的關系

  • 事件監(jiān)聽類型

  • 字節(jié)緩沖 ByteBuffer 數(shù)據(jù)結構

nio 的阻塞實現(xiàn)

關于什么是 nio,從字面上理解為 New IO,就是為了彌補原本 I/O 上的不足,而在 JDK 1.4 中引入的一種新的 I/O 實現(xiàn)方式。簡單理解,就是它提供了 I/O 的阻塞與非阻塞的兩種實現(xiàn)方式(當然,默認實現(xiàn)方式是阻塞的。)。

下面,我們先來看下 nio 以阻塞方式是如何處理的。

建立連接

有了上一篇 socket 的經驗,我們的第一步一定也是建立 socket 連接。只不過,這里不是采用 new socket() 的方式,而是引入了一個新的概念 SocketChannel。它可以看作是 socket 的一個完善類,除了提供 Socket 的相關功能外,還提供了許多其他特性,如后面要講到的向選擇器注冊的功能。

類圖如下: JAVA NIO下I/O的阻塞與非阻塞實現(xiàn)

建立連接代碼實現(xiàn):

// 初始化 socket,建立 socket 與 channel 的綁定關系
SocketChannel socketChannel = SocketChannel.open();
// 初始化遠程連接地址
SocketAddress remote = new InetSocketAddress(this.host, port);
// I/O 處理設置阻塞,這也是默認的方式,可不設置
socketChannel.configureBlocking(true);
// 建立連接
socketChannel.connect(remote);

獲取 socket 連接

因為是同樣是 I/O 阻塞的實現(xiàn),所以后面的關于 socket 輸入輸出流的處理,和上一篇的基本相同。唯一差別是,這里需要通過 channel 來獲取 socket 連接。

  • 獲取 socket 連接

Socket socket = socketChannel.socket();
  • 處理輸入輸出流

PrintWriter pw = getWriter(socketChannel.socket());
BufferedReader br = getReader(socketChannel.socket());

完整示例

package com.jason.network.mode.nio;
import com.jason.network.constant.HttpConstant;
import com.jason.network.util.HttpUtil;
import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
public class NioBlockingHttpClient {
    private SocketChannel socketChannel;
    private String host;
    public static void main(String[] args) throws IOException {
        for (String host: HttpConstant.HOSTS) {
            NioBlockingHttpClient client = new NioBlockingHttpClient(host, HttpConstant.PORT);
            client.request();
        }
    }
    public NioBlockingHttpClient(String host, int port) throws IOException {
        this.host = host;
        socketChannel = SocketChannel.open();
        socketChannel.socket().setSoTimeout(5000);
        SocketAddress remote = new InetSocketAddress(this.host, port);
        this.socketChannel.connect(remote);
    }
    public void request() throws IOException {
        PrintWriter pw = getWriter(socketChannel.socket());
        BufferedReader br = getReader(socketChannel.socket());
        pw.write(HttpUtil.compositeRequest(host));
        pw.flush();
        String msg;
        while ((msg = br.readLine()) != null){
            System.out.println(msg);
        }
    }
    private PrintWriter getWriter(Socket socket) throws IOException {
        OutputStream out = socket.getOutputStream();
        return new PrintWriter(out);
    }
    private BufferedReader getReader(Socket socket) throws IOException {
        InputStream in = socket.getInputStream();
        return new BufferedReader(new InputStreamReader(in));
    }
}

nio 的非阻塞實現(xiàn)

原理分析

nio 的阻塞實現(xiàn),基本與使用原生的 socket 類似,沒有什么特別大的差別。

下面我們來看看它真正強大的地方。到目前為止,我們將的都是阻塞 I/O。何為阻塞 I/O,看下圖:

JAVA NIO下I/O的阻塞與非阻塞實現(xiàn)

我們主要觀察圖中的前三種 I/O 模型,關于異步 I/O,一般需要依靠操作系統(tǒng)的支持,這里不討論。

從圖中可以發(fā)現(xiàn),阻塞過程主要發(fā)生在兩個階段上:

  • 第一階段:等待數(shù)據(jù)就緒;

  • 第二階段:將已就緒的數(shù)據(jù)從內核緩沖區(qū)拷貝到用戶空間;

這里產生了一個從內核到用戶空間的拷貝,主要是為了系統(tǒng)的性能優(yōu)化考慮。假設,從網卡讀到的數(shù)據(jù)直接返回給用戶空間,那勢必會造成頻繁的系統(tǒng)中斷,因為從網卡讀到的數(shù)據(jù)不一定是完整的,可能斷斷續(xù)續(xù)的過來。通過內核緩沖區(qū)作為緩沖,等待緩沖區(qū)有足夠的數(shù)據(jù),或者讀取完結后,進行一次的系統(tǒng)中斷,將數(shù)據(jù)返回給用戶,這樣就能避免頻繁的中斷產生。

了解了 I/O 阻塞的兩個階段,下面我們進入正題。看看一個線程是如何實現(xiàn)同時處理多個 I/O 調用的。從上圖中的非阻塞 I/O 可以看出,僅僅只有第二階段需要阻塞,第一階段的數(shù)據(jù)等待過程,我們是不需要關心的。不過該模型是頻繁地去檢查是否就緒,造成了 CPU 無效的處理,反而效果不好。如果有一種類似的好萊塢原則— “不要給我們打電話,我們會打給你” 。這樣一個線程可以同時發(fā)起多個 I/O 調用,并且不需要同步等待數(shù)據(jù)就緒。在數(shù)據(jù)就緒完成的時候,會以事件的機制,來通知我們。這樣不就實現(xiàn)了單線程同時處理多個 IO 調用的問題了嗎?即所說的“I/O 多路復用模型”。


廢話講了一大堆,下面就來實際操刀一下。

創(chuàng)建選擇器

由上面分析可以,我們得有一個選擇器,它能監(jiān)聽所有的 I/O 操作,并且以事件的方式通知我們哪些 I/O 已經就緒了。

代碼如下:

import java.nio.channels.Selector;
...
private static Selector selector;
static {
    try {
        selector = Selector.open();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

創(chuàng)建非阻塞 I/O

下面,我們來創(chuàng)建一個非阻塞的 SocketChannel,代碼與阻塞實現(xiàn)類型,唯一不同是socketChannel.configureBlocking(false)

注意:只有在socketChannel.configureBlocking(false)之后的代碼,才是非阻塞的,如果socketChannel.connect()在設置非阻塞模式之前,那么連接操作依舊是阻塞調用的。

SocketChannel socketChannel = SocketChannel.open();
SocketAddress remote = new InetSocketAddress(host, port);
// 設置非阻塞模式
socketChannel.configureBlocking(false);
socketChannel.connect(remote);

建立選擇器與 socket 的關聯(lián)

選擇器與 socket 都創(chuàng)建好了,下一步就是將兩者進行關聯(lián),好讓選擇器和監(jiān)聽到 Socket 的變化。這里采用了以 SocketChannel 主動注冊到選擇器的方式進行關聯(lián)綁定,這也就解釋了,為什么不直接new Socket(),而是以SocketChannel的方式來創(chuàng)建 socket。

代碼如下:

socketChannel.register(selector,
                        SelectionKey.OP_CONNECT
                        | SelectionKey.OP_READ
                        | SelectionKey.OP_WRITE);

上面代碼,我們將 socketChannel 注冊到了選擇器中,并且對它的連接、可讀、可寫事件進行了監(jiān)聽。

具體的事件監(jiān)聽類型如下:

操作類型描述所屬對象
OP_READ1 << 0讀操作SocketChannel
OP_WRITE1 << 2寫操作SocketChannel
OP_CONNECT1 << 3連接socket操作SocketChannel
OP_ACCEPT1 << 4接受socket操作ServerSocketChannel

選擇器監(jiān)聽 socket 變化

現(xiàn)在,選擇器已經與我們關心的 socket 進行了關聯(lián)。下面就是感知事件的變化,然后調用相應的處理機制。

這里與 Linux 下的 selector 有點不同,nio 下的 selecotr 不會去遍歷所有關聯(lián)的 socket。我們在注冊時設置了我們關心的事件類型,每次從選擇器中獲取的,只會是那些符合事件類型,并且完成就緒操作的 socket,減少了大量無效的遍歷操作。

public void select() throws IOException {
    // 獲取就緒的 socket 個數(shù)
    while (selector.select() > 0){
        // 獲取符合的 socket 在選擇器中對應的事件句柄 key
        Set keys = selector.selectedKeys();
        // 遍歷所有的key
        Iterator it = keys.iterator();
        while (it.hasNext()){
            // 獲取對應的 key,并從已選擇的集合中移除
            SelectionKey key = (SelectionKey)it.next();
            it.remove();
            if (key.isConnectable()){
                // 進行連接操作
                connect(key);
            }
            else if (key.isWritable()){
                // 進行寫操作
                write(key);
            }
            else if (key.isReadable()){
                // 進行讀操作
                receive(key);
            }
        }
    }
}

注意:這里的selector.select()是同步阻塞的,等待有事件發(fā)生后,才會被喚醒。這就防止了 CPU 空轉的產生。當然,我們也可以給它設置超時時間,selector.select(long timeout)來結束阻塞過程。

處理連接就緒事件

下面,我們分別來看下,一個 socket 是如何來處理連接、寫入數(shù)據(jù)和讀取數(shù)據(jù)的(這些操作都是阻塞的過程,只是我們將等待就緒的過程變成了非阻塞的了)。

處理連接代碼:

// SelectionKey 代表 SocketChannel 在選擇器中注冊的事件句柄
private void connect(SelectionKey key) throws IOException {
    // 獲取事件句柄對應的 SocketChannel
    SocketChannel channel = (SocketChannel) key.channel();
   // 真正的完成 socket 連接
    channel.finishConnect();
   // 打印連接信息
    InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
    String host = remote.getHostName();
    int port = remote.getPort();
    System.out.println(String.format("訪問地址: %s:%s 連接成功!", host, port));
}

處理寫入就緒事件

// 字符集處理類
private Charset charset = Charset.forName("utf8");
private void write(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
    String host = remote.getHostName();
    // 獲取 HTTP 請求,同上一篇
    String request = HttpUtil.compositeRequest(host);
    // 向 SocketChannel 寫入事件 
    channel.write(charset.encode(request));
    // 修改 SocketChannel 所關心的事件
    key.interestOps(SelectionKey.OP_READ);
}

這里有兩個地方需要注意:

  • 第一個是使用 channel.write(charset.encode(request)); 進行數(shù)據(jù)寫入。有人會說,為什么不能像上面同步阻塞那樣,通過PrintWriter包裝類進行操作。因為PrintWriterwrite() 方法是阻塞的,也就是說要等數(shù)據(jù)真正從 socket 發(fā)送出去后才返回。

這與我們這里所講的阻塞是不一致的,這里的操作雖然也是阻塞的,但它發(fā)生的過程是在數(shù)據(jù)從用戶空間到內核緩沖區(qū)拷貝過程。至于系統(tǒng)將緩沖區(qū)的數(shù)據(jù)通過 socket 發(fā)送出去,這不在阻塞范圍內。也解釋了為什么要用 Charset 對寫入內容進行編碼了,因為緩沖區(qū)接收的格式是ByteBuffer。

  • 第二,選擇器用來監(jiān)聽事件變化的兩個參數(shù)是 interestOpsreadyOps。

    • interestOps:表示 SocketChannel 所關心的事件類型,也就是告訴選擇器,當有這幾種事件發(fā)生時,才來通知我。這里通過key.interestOps(SelectionKey.OP_READ);告訴選擇器,之后我只關心“讀就緒”事件,其他的不用通知我了。

    • readyOps:表示 SocketChannel 當前就緒的事件類型。以key.isReadable()為例,判斷依據(jù)就是:return (readyOps() & OP_READ) != 0;

處理讀取就緒事件

private void receive(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    channel.read(buffer);
    buffer.flip();
    String receiveData = charset.decode(buffer).toString();
    // 當再沒有數(shù)據(jù)可讀時,取消在選擇器中的關聯(lián),并關閉 socket 連接
    if ("".equals(receiveData)) {
        key.cancel();
        channel.close();
        return;
    }
    System.out.println(receiveData);
}

這里的處理基本與寫入一致,唯一要注意的是,這里我們需要自行處理去緩沖區(qū)讀取數(shù)據(jù)的操作。首先會分配一個固定大小的緩沖區(qū),然后從內核緩沖區(qū)中,拷貝數(shù)據(jù)至我們剛分配固定緩沖區(qū)上。這里存在兩種情況:

  • 我們分配的緩沖區(qū)過大,那多余的部分以0補充(初始化時,其實會自動補0)。

  • 我們分配的緩沖去過小,因為選擇器會不停的遍歷。只要 SocketChannel 處理讀就緒狀態(tài),那下一次會繼續(xù)讀取。當然,分配過小,會增加遍歷次數(shù)。

最后,將一下 ByteBuffer 的結構,它主要有 position, limit,capacity 以及 mark 屬性。以 buffer.flip(); 為例,講下各屬性的作用(mark 主要是用來標記之前 position 的位置,是在當前 postion 無法滿足的情況下使用的,這里不作討論)。

JAVA NIO下I/O的阻塞與非阻塞實現(xiàn)

從圖中看出,

  • 容量(capacity):表示緩沖區(qū)可以保存的數(shù)據(jù)容量;

  • 極限(limit):表示緩沖區(qū)的當前終點,即寫入、讀取都不可超過該重點;

  • 位置(position):表示緩沖區(qū)下一個讀寫單元的位置;

完整代碼

package com.jason.network.mode.nio;
import com.jason.network.constant.HttpConstant;
import com.jason.network.util.HttpUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
public class NioNonBlockingHttpClient {
    private static Selector selector;
    private Charset charset = Charset.forName("utf8");
    static {
        try {
            selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws IOException {
        NioNonBlockingHttpClient client = new NioNonBlockingHttpClient();
        for (String host: HttpConstant.HOSTS) {
            client.request(host, HttpConstant.PORT);
        }
        client.select();
    }
    public void request(String host, int port) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.socket().setSoTimeout(5000);
        SocketAddress remote = new InetSocketAddress(host, port);
        socketChannel.configureBlocking(false);
        socketChannel.connect(remote);
        socketChannel.register(selector,
                        SelectionKey.OP_CONNECT
                        | SelectionKey.OP_READ
                        | SelectionKey.OP_WRITE);
    }
    public void select() throws IOException {
        while (selector.select(500) > 0){
            Set keys = selector.selectedKeys();
            Iterator it = keys.iterator();
            while (it.hasNext()){
                SelectionKey key = (SelectionKey)it.next();
                it.remove();
                if (key.isConnectable()){
                    connect(key);
                }
                else if (key.isWritable()){
                    write(key);
                }
                else if (key.isReadable()){
                    receive(key);
                }
            }
        }
    }
    private void connect(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        channel.finishConnect();
        InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
        String host = remote.getHostName();
        int port = remote.getPort();
        System.out.println(String.format("訪問地址: %s:%s 連接成功!", host, port));
    }
    private void write(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
        String host = remote.getHostName();
        String request = HttpUtil.compositeRequest(host);
        System.out.println(request);
        channel.write(charset.encode(request));
        key.interestOps(SelectionKey.OP_READ);
    }
    private void receive(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        channel.read(buffer);
        buffer.flip();
        String receiveData = charset.decode(buffer).toString();
        if ("".equals(receiveData)) {
            key.cancel();
            channel.close();
            return;
        }
        System.out.println(receiveData);
    }
}

示例效果

JAVA NIO下I/O的阻塞與非阻塞實現(xiàn)

“JAVA NIO下I/O的阻塞與非阻塞實現(xiàn)”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。

AI