您好,登錄后才能下訂單哦!
上一篇 介紹了事件監(jiān)聽、責任鏈模型、socket接口和IO模型、線程模型等基本概念,以及Netty的整體結(jié)構(gòu),這篇就來說下Netty三大核心模塊之一:事件監(jiān)聽和處理。
前面提到,Netty是一個NIO框架,它將IO通道的建立、可讀、可寫等狀態(tài)變化,抽象成事件,以責任鏈的方式進行傳遞,可以在處理鏈上插入自定義的Handler,對感興趣的事件進行監(jiān)聽和處理。
通過介紹,你會了解到:
進行網(wǎng)絡(luò)編程時,一般的編寫過程是這樣的:
傳統(tǒng)的模型,每個客戶端Socket會創(chuàng)建一個單獨的線程監(jiān)聽socket事件,一方面系統(tǒng)可創(chuàng)建的線程數(shù)有限,限制了并發(fā)數(shù),一方面線程過多,線程切換頻繁,導(dǎo)致性能嚴重下降。
隨著操作系統(tǒng)IO模型的發(fā)展,可以采用多路復(fù)用IO,一個線程監(jiān)聽多個Socket,另外,服務(wù)端處理客戶端連接,與客戶端Socket的監(jiān)聽,可以在不同的線程進行處理。
Netty就是采用多路復(fù)用IO進行事件監(jiān)聽,另外,使用不同的線程分別處理客戶端的連接、數(shù)據(jù)讀寫。
整個處理結(jié)構(gòu)如下圖,簡單說明下:
一個EventLoop其實和一個特定的線程綁定, 并且在其生命周期內(nèi), 綁定的線程都不會再改。
EventLoop肩負著兩種任務(wù):
第一個任務(wù)比較好理解,主要解釋下第二個:從socket數(shù)據(jù)到數(shù)據(jù)處理,再到寫入響應(yīng)數(shù)據(jù),Netty都在一個線程中處理,主要是為了線程安全考慮,減少競爭和線程切換,通過任務(wù)隊列的方式,可以在用戶線程提交處理邏輯,在Eventloop中執(zhí)行。
整個EventLoop干的事情就是select -> processIO -> runAllTask,processIO處理IO事件相關(guān)的邏輯,runAllTask處理任務(wù)隊列中的任務(wù),如果執(zhí)行的任務(wù)過多,會影響IO事件的處理,所以會限制任務(wù)處理的時間,整個處理過程如下圖:
EventLoop的run代碼如下:
protected void run() {
for (; ; ) {
oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) { //如果有任務(wù),快速返回
selectNow();
} else {
select(); //如果沒任務(wù),等待事件返回
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
final long ioStartTime = System.nanoTime();
needsToSelectAgain = false;
//處理IO事件
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
//計算IO處理時間
final long ioTime = System.nanoTime() - ioStartTime;
final int ioRatio = this.ioRatio; //默認為50
//處理提交的任務(wù)
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}
ChannelPipeline是一個接口,其有一個默認的實現(xiàn)類DefaultChannelPipeline,內(nèi)部有兩個屬性:head和tail,
這兩者都實現(xiàn)了ChannelHandler接口,對應(yīng)處理鏈的頭和尾。
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
每個Channel創(chuàng)建時,會創(chuàng)建一個ChannelPipeline對象,來處理channel的各種事件,可以在運行時動態(tài)進行動態(tài)修改其中的 ChannelHandler。
ChannelHandler承載業(yè)務(wù)處理邏輯的地方,我們接觸最多的類,可以自定義Handler,加入處理鏈中,實現(xiàn)自定義邏輯。
ChannelHandler 可分為兩大類:ChannelInboundHandler 和 ChannelOutboundHandle,這兩接口分別對應(yīng)入站和出站消息的處理,對應(yīng)數(shù)據(jù)讀取和數(shù)據(jù)寫入。它提供了接口方法供我們實現(xiàn),處理各種事件。
public interface ChannelInboundHandler extends ChannelHandler {
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
}
自定義Handler時,一般繼承ChannelInboundHandlerAdapter或 ChannelOutboundHandlerAdapter。
需要注意的是,不建議在 ChannelHandler 中直接實現(xiàn)耗時或阻塞的操作,因為這可能會阻塞 Netty 工作線程,導(dǎo)致 Netty 無法及時響應(yīng) IO 處理。
不是本篇的重點,簡單說明下:
Netty提供了ChannelInitializer類方便我們初始化,創(chuàng)建WebSocketServerInitializer類,繼承ChannelInitializer類,用于添加ChannelHandler:
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
@Resource
private CustomTextFrameHandler customTextFrameHandler;
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("codec-http", new HttpServerCodec());
pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
pipeline.addLast("websocket-protocal-handler",new WebSocketServerProtocolHandler());
pipeline.addLast("custome-handler", customTextFrameHandler);
}
}
分析下這幾個Handler,都是Netty默認提供的:
是不是很方便,經(jīng)過WebSocketServerProtocolHandler處理后,讀取出來的就是文本數(shù)據(jù)了,不用自己處理數(shù)據(jù)合包、拆包問題。
自定義的Handler,進行業(yè)務(wù)處理:
public class CustomTextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(final ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
final String content = frame.text();
System.out.println("接收到數(shù)據(jù):"+content);
// 回復(fù)數(shù)據(jù)
TextWebSocketFrame respFrame = new TextWebSocketFrame("我收到了你的數(shù)據(jù)");
if (ctx.channel().isWritable()) {
ChannelFuture future = ctx.writeAndFlush(respFrame);
}
}
}
歡迎掃描下方二維碼,關(guān)注我的個人微信公眾號 ~
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。