您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)Java中I/O模型的示例分析,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
相關(guān)概念
同步和異步
描述的是用戶線程與內(nèi)核的交互方式:
同步是指用戶線程發(fā)起 I/O 請求后需要等待或者輪詢內(nèi)核 I/O 操作完成后才能繼續(xù)執(zhí)行;
異步是指用戶線程發(fā)起 I/O 請求后仍繼續(xù)執(zhí)行,當(dāng)內(nèi)核 I/O 操作完成后會通知用戶線程,或者調(diào)用用戶線程注冊的回調(diào)函數(shù)。
阻塞和非阻塞
描述的是用戶線程調(diào)用內(nèi)核 I/O 操作的方式:
阻塞是指 I/O 操作需要徹底完成后才返回到用戶空間;
非阻塞是指 I/O 操作被調(diào)用后立即返回給用戶一個狀態(tài)值,無需等到 I/O 操作徹底完成。
一個 I/O 操作其實分成了兩個步驟:發(fā)起 I/O 請求和實際的 I/O 操作。 阻塞 I/O 和非阻塞 I/O 的區(qū)別在于第一步,發(fā)起 I/O 請求是否會被阻塞,如果阻塞直到完成那么就是傳統(tǒng)的阻塞 I/O ,如果不阻塞,那么就是非阻塞 I/O 。 同步 I/O 和異步 I/O 的區(qū)別就在于第二個步驟是否阻塞,如果實際的 I/O 讀寫阻塞請求進(jìn)程,那么就是同步 I/O 。
Unix I/O 模型
Unix 下共有五種 I/O 模型:
阻塞 I/O
非阻塞 I/O
I/O 復(fù)用(select 和 poll)
信號驅(qū)動 I/O(SIGIO)
異步 I/O(POSIX 的 aio_系列函數(shù))
阻塞 I/O
請求無法立即完成則保持阻塞。
階段1:等待數(shù)據(jù)就緒。網(wǎng)絡(luò) I/O 的情況就是等待遠(yuǎn)端數(shù)據(jù)陸續(xù)抵達(dá);磁盤I/O的情況就是等待磁盤數(shù)據(jù)從磁盤上讀取到內(nèi)核態(tài)內(nèi)存中。
階段2:數(shù)據(jù)從內(nèi)核拷貝到進(jìn)程。出于系統(tǒng)安全,用戶態(tài)的程序沒有權(quán)限直接讀取內(nèi)核態(tài)內(nèi)存,因此內(nèi)核負(fù)責(zé)把內(nèi)核態(tài)內(nèi)存中的數(shù)據(jù)拷貝一份到用戶態(tài)內(nèi)存中。
非阻塞 I/O
socket 設(shè)置為 NONBLOCK(非阻塞)就是告訴內(nèi)核,當(dāng)所請求的 I/O 操作無法完成時,不要將進(jìn)程睡眠,而是返回一個錯誤碼(EWOULDBLOCK) ,這樣請求就不會阻塞
I/O 操作函數(shù)將不斷的測試數(shù)據(jù)是否已經(jīng)準(zhǔn)備好,如果沒有準(zhǔn)備好,繼續(xù)測試,直到數(shù)據(jù)準(zhǔn)備好為止。整個 I/O 請求的過程中,雖然用戶線程每次發(fā)起 I/O 請求后可以立即返回,但是為了等到數(shù)據(jù),仍需要不斷地輪詢、重復(fù)請求,消耗了大量的 CPU 的資源
數(shù)據(jù)準(zhǔn)備好了,從內(nèi)核拷貝到用戶空間。
一般很少直接使用這種模型,而是在其他 I/O 模型中使用非阻塞 I/O 這一特性。這種方式對單個 I/O 請求意義不大,但給 I/O 多路復(fù)用鋪平了道路.
I/O 復(fù)用(異步阻塞 I/O)
I/O 多路復(fù)用會用到 select 或者 poll 函數(shù),這兩個函數(shù)也會使進(jìn)程阻塞,但是和阻塞 I/O 所不同的的,這兩個函數(shù)可以同時阻塞多個 I/O 操作。而且可以同時對多個讀操作,多個寫操作的 I/O 函數(shù)進(jìn)行檢測,直到有數(shù)據(jù)可讀或可寫時,才真正調(diào)用 I/O 操作函數(shù)。
從流程上來看,使用 select 函數(shù)進(jìn)行 I/O 請求和同步阻塞模型沒有太大的區(qū)別,甚至還多了添加監(jiān)視 socket,以及調(diào)用 select 函數(shù)的額外操作,效率更差。但是,使用 select 以后最大的優(yōu)勢是用戶可以在一個線程內(nèi)同時處理多個 socket 的 I/O 請求。用戶可以注冊多個 socket,然后不斷地調(diào)用 select 讀取被激活的 socket,即可達(dá)到在同一個線程內(nèi)同時處理多個 I/O 請求的目的。而在同步阻塞模型中,必須通過多線程的方式才能達(dá)到這個目的。
I/O 多路復(fù)用模型使用了 Reactor 設(shè)計模式實現(xiàn)了這一機制。
調(diào)用 select / poll 該方法由一個用戶態(tài)線程負(fù)責(zé)輪詢多個 socket,直到某個階段1的數(shù)據(jù)就緒,再通知實際的用戶線程執(zhí)行階段2的拷貝。 通過一個專職的用戶態(tài)線程執(zhí)行非阻塞I/O輪詢,模擬實現(xiàn)了階段一的異步化
信號驅(qū)動 I/O(SIGIO)
首先我們允許 socket 進(jìn)行信號驅(qū)動 I/O,并安裝一個信號處理函數(shù),進(jìn)程繼續(xù)運行并不阻塞。當(dāng)數(shù)據(jù)準(zhǔn)備好時,進(jìn)程會收到一個 SIGIO 信號,可以在信號處理函數(shù)中調(diào)用 I/O 操作函數(shù)處理數(shù)據(jù)。
異步 I/O
調(diào)用 aio_read 函數(shù),告訴內(nèi)核描述字,緩沖區(qū)指針,緩沖區(qū)大小,文件偏移以及通知的方式,然后立即返回。當(dāng)內(nèi)核將數(shù)據(jù)拷貝到緩沖區(qū)后,再通知應(yīng)用程序。
異步 I/O 模型使用了 Proactor 設(shè)計模式實現(xiàn)了這一機制。
告知內(nèi)核,當(dāng)整個過程(包括階段1和階段2)全部完成時,通知應(yīng)用程序來讀數(shù)據(jù).
幾種 I/O 模型的比較
前四種模型的區(qū)別是階段1不相同,階段2基本相同,都是將數(shù)據(jù)從內(nèi)核拷貝到調(diào)用者的緩沖區(qū)。而異步 I/O 的兩個階段都不同于前四個模型。
同步 I/O 操作引起請求進(jìn)程阻塞,直到 I/O 操作完成。異步 I/O 操作不引起請求進(jìn)程阻塞。
常見 Java I/O 模型
在了解了 UNIX 的 I/O 模型之后,其實 Java 的 I/O 模型也是類似。
“阻塞I/O”模式
在上一節(jié) Socket 章節(jié)中的 EchoServer 就是一個簡單的阻塞 I/O 例子,服務(wù)器啟動后,等待客戶端連接。在客戶端連接服務(wù)器后,服務(wù)器就阻塞讀寫取數(shù)據(jù)流。
EchoServer 代碼:
public class EchoServer { public static int DEFAULT_PORT = 7; public static void main(String[] args) throws IOException { int port; try { port = Integer.parseInt(args[0]); } catch (RuntimeException ex) { port = DEFAULT_PORT; } try ( ServerSocket serverSocket = new ServerSocket(port); Socket clientSocket = serverSocket.accept(); PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); BufferedReader in = new BufferedReader( new InputStreamReader(clientSocket.getInputStream())); ) { String inputLine; while ((inputLine = in.readLine()) != null) { out.println(inputLine); } } catch (IOException e) { System.out.println("Exception caught when trying to listen on port " + port + " or listening for a connection"); System.out.println(e.getMessage()); } } }
改進(jìn)為“阻塞I/O+多線程”模式
使用多線程來支持多個客戶端來訪問服務(wù)器。
主線程 MultiThreadEchoServer.java
public class MultiThreadEchoServer { public static int DEFAULT_PORT = 7; public static void main(String[] args) throws IOException { int port; try { port = Integer.parseInt(args[0]); } catch (RuntimeException ex) { port = DEFAULT_PORT; } Socket clientSocket = null; try (ServerSocket serverSocket = new ServerSocket(port);) { while (true) { clientSocket = serverSocket.accept(); // MultiThread new Thread(new EchoServerHandler(clientSocket)).start(); } } catch (IOException e) { System.out.println( "Exception caught when trying to listen on port " + port + " or listening for a connection"); System.out.println(e.getMessage()); } } }
處理器類 EchoServerHandler.java
public class EchoServerHandler implements Runnable { private Socket clientSocket; public EchoServerHandler(Socket clientSocket) { this.clientSocket = clientSocket; } @Override public void run() { try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));) { String inputLine; while ((inputLine = in.readLine()) != null) { out.println(inputLine); } } catch (IOException e) { System.out.println(e.getMessage()); } } }
存在問題:每次接收到新的連接都要新建一個線程,處理完成后銷毀線程,代價大。當(dāng)有大量地短連接出現(xiàn)時,性能比較低。
改進(jìn)為“阻塞I/O+線程池”模式
針對上面多線程的模型中,出現(xiàn)的線程重復(fù)創(chuàng)建、銷毀帶來的開銷,可以采用線程池來優(yōu)化。每次接收到新連接后從池中取一個空閑線程進(jìn)行處理,處理完成后再放回池中,重用線程避免了頻率地創(chuàng)建和銷毀線程帶來的開銷。
主線程 ThreadPoolEchoServer.java
public class ThreadPoolEchoServer { public static int DEFAULT_PORT = 7; public static void main(String[] args) throws IOException { int port; try { port = Integer.parseInt(args[0]); } catch (RuntimeException ex) { port = DEFAULT_PORT; } ExecutorService threadPool = Executors.newFixedThreadPool(5); Socket clientSocket = null; try (ServerSocket serverSocket = new ServerSocket(port);) { while (true) { clientSocket = serverSocket.accept(); // Thread Pool threadPool.submit(new Thread(new EchoServerHandler(clientSocket))); } } catch (IOException e) { System.out.println( "Exception caught when trying to listen on port " + port + " or listening for a connection"); System.out.println(e.getMessage()); } } }
存在問題:在大量短連接的場景中性能會有提升,因為不用每次都創(chuàng)建和銷毀線程,而是重用連接池中的線程。但在大量長連接的場景中,因為線程被連接長期占用,不需要頻繁地創(chuàng)建和銷毀線程,因而沒有什么優(yōu)勢。
雖然這種方法可以適用于小到中度規(guī)模的客戶端的并發(fā)數(shù),如果連接數(shù)超過 100,000或更多,那么性能將很不理想。
改進(jìn)為“非阻塞I/O”模式
“阻塞I/O+線程池”網(wǎng)絡(luò)模型雖然比”阻塞I/O+多線程”網(wǎng)絡(luò)模型在性能方面有提升,但這兩種模型都存在一個共同的問題:讀和寫操作都是同步阻塞的,面對大并發(fā)(持續(xù)大量連接同時請求)的場景,需要消耗大量的線程來維持連接。CPU 在大量的線程之間頻繁切換,性能損耗很大。一旦單機的連接超過1萬,甚至達(dá)到幾萬的時候,服務(wù)器的性能會急劇下降。
而 NIO 的 Selector 卻很好地解決了這個問題,用主線程(一個線程或者是 CPU 個數(shù)的線程)保持住所有的連接,管理和讀取客戶端連接的數(shù)據(jù),將讀取的數(shù)據(jù)交給后面的線程池處理,線程池處理完業(yè)務(wù)邏輯后,將結(jié)果交給主線程發(fā)送響應(yīng)給客戶端,少量的線程就可以處理大量連接的請求。
Java NIO 由以下幾個核心部分組成:
Channel
Buffer
Selector
要使用 Selector,得向 Selector 注冊 Channel,然后調(diào)用它的 select()方法。這個方法會一直阻塞到某個注冊的通道有事件就緒。一旦這個方法返回,線程就可以處理這些事件,事件的例子有如新連接進(jìn)來,數(shù)據(jù)接收等。
主線程 NonBlokingEchoServer.java
public class NonBlokingEchoServer { public static int DEFAULT_PORT = 7; public static void main(String[] args) throws IOException { int port; try { port = Integer.parseInt(args[0]); } catch (RuntimeException ex) { port = DEFAULT_PORT; } System.out.println("Listening for connections on port " + port); ServerSocketChannel serverChannel; Selector selector; try { serverChannel = ServerSocketChannel.open(); InetSocketAddress address = new InetSocketAddress(port); serverChannel.bind(address); serverChannel.configureBlocking(false); selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException ex) { ex.printStackTrace(); return; } while (true) { try { selector.select(); } catch (IOException ex) { ex.printStackTrace(); break; } Set<SelectionKey> readyKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = readyKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); try { if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); System.out.println("Accepted connection from " + client); client.configureBlocking(false); SelectionKey clientKey = client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ); ByteBuffer buffer = ByteBuffer.allocate(100); clientKey.attach(buffer); } if (key.isReadable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer output = (ByteBuffer) key.attachment(); client.read(output); } if (key.isWritable()) { SocketChannel client = (SocketChannel) key.channel(); ByteBuffer output = (ByteBuffer) key.attachment(); output.flip(); client.write(output); output.compact(); } } catch (IOException ex) { key.cancel(); try { key.channel().close(); } catch (IOException cex) { } } } } } }
改進(jìn)為“異步I/O”模式
Java SE 7 版本之后,引入了異步 I/O (NIO.2) 的支持,為構(gòu)建高性能的網(wǎng)絡(luò)應(yīng)用提供了一個利器。
主線程 AsyncEchoServer.java
public class AsyncEchoServer { public static int DEFAULT_PORT = 7; public static void main(String[] args) throws IOException { int port; try { port = Integer.parseInt(args[0]); } catch (RuntimeException ex) { port = DEFAULT_PORT; } ExecutorService taskExecutor = Executors.newCachedThreadPool(Executors.defaultThreadFactory()); // create asynchronous server socket channel bound to the default group try (AsynchronousServerSocketChannel asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open()) { if (asynchronousServerSocketChannel.isOpen()) { // set some options asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024); asynchronousServerSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); // bind the server socket channel to local address asynchronousServerSocketChannel.bind(new InetSocketAddress(port)); // display a waiting message while ... waiting clients System.out.println("Waiting for connections ..."); while (true) { Future<AsynchronousSocketChannel> asynchronousSocketChannelFuture = asynchronousServerSocketChannel .accept(); try { final AsynchronousSocketChannel asynchronousSocketChannel = asynchronousSocketChannelFuture .get(); Callable<String> worker = new Callable<String>() { @Override public String call() throws Exception { String host = asynchronousSocketChannel.getRemoteAddress().toString(); System.out.println("Incoming connection from: " + host); final ByteBuffer buffer = ByteBuffer.allocateDirect(1024); // transmitting data while (asynchronousSocketChannel.read(buffer).get() != -1) { buffer.flip(); asynchronousSocketChannel.write(buffer).get(); if (buffer.hasRemaining()) { buffer.compact(); } else { buffer.clear(); } } asynchronousSocketChannel.close(); System.out.println(host + " was successfully served!"); return host; } }; taskExecutor.submit(worker); } catch (InterruptedException | ExecutionException ex) { System.err.println(ex); System.err.println("\n Server is shutting down ..."); // this will make the executor accept no new threads // and finish all existing threads in the queue taskExecutor.shutdown(); // wait until all threads are finished while (!taskExecutor.isTerminated()) { } break; } } } else { System.out.println("The asynchronous server-socket channel cannot be opened!"); } } catch (IOException ex) { System.err.println(ex); } } }
關(guān)于“Java中I/O模型的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,使各位可以學(xué)到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。