您好,登錄后才能下訂單哦!
這篇文章主要介紹“閱讀一個分布式框架必備的NIO知識有哪些”,在日常操作中,相信很多人在閱讀一個分布式框架必備的NIO知識有哪些問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”閱讀一個分布式框架必備的NIO知識有哪些”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
閱讀一個分布式開源項目的時候,最重要的就是了解這個項目的通信框架。
因為一個分布式的開源框架,通常是集群部署的,不同的節(jié)點和節(jié)點之間需要相互通信來完成復(fù)雜的功能,而閱讀到這些源碼的時候,如果不了解它通信機(jī)制的話,就會迷失在代碼里,像走進(jìn)了一片原始森林。
比如 HDFS ,使用的通信框架是自己封裝的 Hadoop Rpc;Spark 底層通信就是用的 Netty;而最近閱讀的 Kafka 源碼,底層使用的是原生的 Java NIO。
所以本次,我們來聊一聊 Java NIO 的那些主要的知識點。
談到 NIO,就會有三個核心的概念:通道、緩沖、選擇器。
直接開門見山,或許聽起來會有點迷茫,我們需要從頭開始說。
1、通道
以前在并發(fā)要求不是很高的情況下,是 CPU 來全權(quán)處理輸入輸出的(中斷),如下圖:
用戶程序向服務(wù)端發(fā)起讀寫請求,cpu 直接處理這些請求。這樣有一個弊端,當(dāng) IO 請求非常多的時候,會大量占用 CPU,使得整個系統(tǒng)的處理能力會下降。
隨著計算機(jī)的發(fā)展,出現(xiàn)了一種新的方式,使用 DMA 來全權(quán)處理 IO 請求,如下圖:
DMA 是 Direct Memory Access,直接內(nèi)存訪問控制。
為什么要增加這個設(shè)備呢?是因為 CPU 中斷方式不能滿足數(shù)據(jù)傳輸速度的要求,因為在中斷方式下,每次中斷需要保存斷點和現(xiàn)場,中斷返回時,要恢復(fù)斷點和現(xiàn)場。
所有這些原因,使得中斷方式難以滿足高速外設(shè)對傳輸速度的要求。
所以,就有了 DMA 這樣的設(shè)備,在 DMA 方式的數(shù)據(jù)傳輸過程中,當(dāng) I/O 設(shè)備需要進(jìn)行數(shù)據(jù)傳送時,通過 DMA 控制器向 CPU 提出 DMA 傳送請求,CPU 響應(yīng)之后將讓出系統(tǒng)總線,由 DMA 控制器接管總線進(jìn)行數(shù)據(jù)傳輸,而此時 CPU 除了做一些初始化操作之外,可以去做自己的事情。
但是有了 DMA,仍然滿足不了業(yè)務(wù)快速發(fā)展的需要,因為當(dāng) I/O 請求過多時,會出現(xiàn)總線沖突的問題。
所以后面就出現(xiàn)了通道(Channel),它和 DMA 不同的地方是,通道有自己的指令系統(tǒng)和程序,是一個協(xié)處理器;而 DMA 只能實現(xiàn)固定的數(shù)據(jù)傳送控制。
而 Java NIO 中的 Channel ,就是對上圖中通道的實現(xiàn)。
理解了通道的概念,緩沖區(qū)也很好理解了。
通道表示打開到 I/O 設(shè)備的(例如:文件、套接字)的連接,但是通道本身并不存儲數(shù)據(jù)。真正作為數(shù)據(jù)傳輸載體的是緩沖區(qū)。
當(dāng)應(yīng)用程序要寫數(shù)據(jù)時,需要先把數(shù)據(jù)寫到緩沖區(qū)里,然后由通道負(fù)責(zé)把緩沖區(qū)的數(shù)據(jù)發(fā)送到目的地(文件、磁盤、網(wǎng)絡(luò)),然后再從緩沖區(qū)把數(shù)據(jù)取出來。
若需要使用 NIO 系統(tǒng),需要獲取用于連接 I/O 設(shè)備的通道以及用于容納數(shù)據(jù)的緩沖區(qū),然后操作緩沖區(qū),對數(shù)據(jù)進(jìn)行處理。
選擇器也叫做多路復(fù)用器,是一種非阻塞式的 I/O 。既然談到了非阻塞式,必然要先談?wù)勛枞?。阻塞式如下圖所示:
客戶端向服務(wù)端發(fā)出一個讀寫請求時,服務(wù)端的線程會一直看內(nèi)核地址空間是否有數(shù)據(jù)了。
客戶端沒有數(shù)據(jù)發(fā)送過來時,服務(wù)端的線程會一直等待,在此期間是什么事情都做不了的。
直到客戶端有數(shù)據(jù)發(fā)送過來,會把數(shù)據(jù)從內(nèi)核地址空間拷貝到用戶地址空間,然后才讀取到了數(shù)據(jù)的。
這就導(dǎo)致如果有大量的請求過來,后面的請求要等待前面的請求執(zhí)行完畢,會造成大量的排隊,無法充分利用 cpu 資源,性能就會急劇下降。
再看看選擇器是如何工作的。
現(xiàn)在客戶端服務(wù)端之間通信是用通道+緩沖區(qū)的,那么所有的通道都會注冊到選擇器上來。選擇器會監(jiān)控這些通道的 I/O 狀態(tài),比如連接、讀、寫的情況。
當(dāng)某一個通道上的某個事件完全就緒時,選擇器才會把這個任務(wù)分配到服務(wù)端的一個或者多個線程上。
當(dāng)客戶端沒有事件準(zhǔn)備好時,服務(wù)端的線程是不會阻塞的,它可以做自己的事情,直到客戶端事件就緒,才會去處理。
這種非阻塞式相比較阻塞式,可以進(jìn)一步的利用 cpu 資源。
1、緩沖區(qū)的 API
要徹底理解緩沖區(qū),必須知道緩沖區(qū)的四個屬性,mark,position,limit,capacity,只需要跑一遍代碼就知道了。
(1)分配一定大小的緩沖區(qū)
//1.分配一個指定大小的緩沖區(qū) ByteBuffer buffer = ByteBuffer.allocate(10); System.out.println("---------alocate"); System.out.println("position:" + buffer.position()); System.out.println("limit:" + buffer.limit()); System.out.println("capacity:" + buffer.capacity());
運行結(jié)果:
---------alocate----------- position:0 limit:10 capacity:10
這里我們分配了 10 個字節(jié)的緩沖區(qū),也就是在 ByteBuffer 的 final byte[] hb; 屬性上開辟了 10 個字節(jié)的空間。
所以容量 capacity 為 10 , limit 可讀寫數(shù)據(jù)的最大位置 也是 10 ,position 為可以操作數(shù)據(jù)的位置為 0 。
(2)往緩沖區(qū)寫數(shù)據(jù)
// 2.寫入數(shù)據(jù)到緩沖區(qū) String str = "abcde"; System.out.println("------------put------------"); buffer.put(str.getBytes(StandardCharsets.UTF_8)); System.out.println("position:" + buffer.position()); System.out.println("limit:" + buffer.limit()); System.out.println("capacity:" + buffer.capacity());
運行結(jié)果:
------------put------------ position:5 limit:10 capacity:10
這里我們往緩沖區(qū)寫了 5 個字節(jié)的數(shù)據(jù),那么 capacity 和 limit 都還是10,但是 position 為 5 了,因為前面已經(jīng)寫入了 5 個了
(3)切換成讀數(shù)據(jù)的模式
// 3.切換成讀數(shù)據(jù)的模式 buffer.flip(); System.out.println("------------flip------------"); System.out.println("position:" + buffer.position()); System.out.println("limit:" + buffer.limit()); System.out.println("capacity:" + buffer.capacity());
那我們現(xiàn)在想從緩沖區(qū)讀取一些數(shù)據(jù)出來,就需要切換成 flip 模式,flip 會改變一些屬性的值
運行結(jié)果:
------------flip------------ position:0 limit:5 capacity:10
flip 會改變 position 的值為 0 ,并且 limit 為5,表示我要從頭開始讀,并且只能讀到 5 的位置
(4)讀取一些數(shù)據(jù)
// 4. 讀取數(shù)據(jù) System.out.println("------------get------------"); byte[] dest = new byte[buffer.limit()]; buffer.get(dest); System.out.println(new String(dest,0,dest.length)); System.out.println("position:" + buffer.position()); System.out.println("limit:" + buffer.limit()); System.out.println("capacity:" + buffer.capacity());
運行結(jié)果:
------------get------------ abcde position:5 limit:5 capacity:10
讀取了數(shù)據(jù)之后,position 就變成 5 了,表示我已經(jīng)讀取到 5 了。
(5)重復(fù)讀
//5.rewind() buffer.rewind(); System.out.println("------------rewind------------"); System.out.println("position:" + buffer.position()); System.out.println("limit:" + buffer.limit()); System.out.println("capacity:" + buffer.capacity());
運行結(jié)果:
------------rewind------------ position:0 limit:5 capacity:10
rewind 表示重復(fù)讀取 buffer 里面的數(shù)據(jù)
(6)清除數(shù)據(jù)
//6.clear() buffer.clear(); System.out.println("------------clear------------"); System.out.println("position:" + buffer.position()); System.out.println("limit:" + buffer.limit()); System.out.println("capacity:" + buffer.capacity());
運行結(jié)果:
------------clear------------ position:0 limit:10 capacity:10
clear() 之后,position 回到了 0 ,limit 回到了 10,又可以重頭開始寫數(shù)據(jù)了,能寫 10 個字節(jié)。
但是要注意的是,緩沖里面的數(shù)據(jù)并沒有清空掉,數(shù)據(jù)還在里面,處于被“遺忘”狀態(tài)。這幾個指針回到了最初的狀態(tài)。
(7)標(biāo)記
這是第四個屬性:mark。
mark 可以記錄 position 的位置。可以通過 reset() 方法回到 mark 的位置。
@Test public void test2() { // 分配 10 個字節(jié) String str = "abcde"; ByteBuffer buffer = ByteBuffer.allocate(10); buffer.put(str.getBytes(StandardCharsets.UTF_8)); // 切換到讀模式,讀取 2 個字節(jié) buffer.flip(); byte[] dest = new byte[buffer.limit()]; buffer.get(dest, 0, 2); System.out.println(new String(dest, 0, 2)); System.out.println(buffer.position()); // mark 一下記錄當(dāng)前位置 buffer.mark(); // 又讀取兩個字節(jié) buffer.get(dest, 2, 2); System.out.println(new String(dest, 2, 2)); System.out.println(buffer.position()); // reset,回到 mark 的位置 buffer.reset(); System.out.println(buffer.position()); } 執(zhí)行結(jié)果: ```tex ab 2 cd 4 2
2、使用通道、緩沖區(qū)、選擇器完成一個網(wǎng)絡(luò)程序
(1)服務(wù)端
@Test public void testServer() throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(8989)); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isAcceptable()) { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int len = 0; while ((len = channel.read(byteBuffer)) > 0) { byteBuffer.flip(); System.out.println(new String(byteBuffer.array(), 0, len)); byteBuffer.clear(); } } } iterator.remove(); } }
1、首先使用 ServerSocketChannel.open(),打開一個通道,設(shè)置成非阻塞模式;
2、綁定到 8989 端口上;
3、把通道注冊到選擇器上;
4、while 循環(huán),選擇器上是否有事件,如果事件是客戶端的連接事件,則打開一個 SocketChannel,注冊成非阻塞模式,并且往選擇器上注冊一個讀數(shù)據(jù)的事件;
5、當(dāng)客戶端發(fā)送數(shù)據(jù)過來的時候,就可以打開一個通道,讀取緩沖區(qū)上的數(shù)據(jù);
6、并且此時,服務(wù)端是可以同時接受多個客戶端的請求的。
(2)客戶端
@Test public void testClient() throws IOException { SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 8989)); socketChannel.configureBlocking(false); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); byteBuffer.put(new Date().toString().getBytes(StandardCharsets.UTF_8)); byteBuffer.flip(); socketChannel.write(byteBuffer); byteBuffer.clear(); socketChannel.close(); }
1、客戶端打開一個 SocketChannel,配置成非阻塞模式;
2、使用 ByteBuffer 發(fā)送數(shù)據(jù)(注意發(fā)送之前,要 flip);
3、關(guān)閉通道。
到此,關(guān)于“閱讀一個分布式框架必備的NIO知識有哪些”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。