您好,登錄后才能下訂單哦!
這篇文章主要介紹了Netty核心類及其作用是什么的相關(guān)知識(shí),內(nèi)容詳細(xì)易懂,操作簡(jiǎn)單快捷,具有一定借鑒價(jià)值,相信大家閱讀完這篇Netty核心類及其作用是什么文章都會(huì)有所收獲,下面我們一起來看看吧。
MessageToByteEncoder是一個(gè)抽象編碼器,子類可重寫encode方法把對(duì)象編碼為ByteBuf輸出。
MessageToByteEncoder繼承自ChannelOutboundHandlerAdapter,encode在出站是被調(diào)用。
public class MyMessageEncoder extends MessageToByteEncoder<MessagePO> { @Override protected void encode(ChannelHandlerContext ctx, MessagePO msg, ByteBuf out) throws Exception { System.out.println("MyMessageEncoder.encode,被調(diào)用"); String json = JSONObject.toJSONString(msg); out.writeInt(json.getBytes(StandardCharsets.UTF_8).length); out.writeBytes(json.getBytes(StandardCharsets.UTF_8)); } }
ByteToMessageDecoder是一種ChannelInboundHandler,可以稱為解碼器,負(fù)責(zé)將byte字節(jié)流(ByteBuf)轉(zhuǎn)換成一種Message,Message是應(yīng)用可以自己定義的一種Java對(duì)象。
ByteToMessageDecoder:用于將字節(jié)轉(zhuǎn)為消息,需要檢測(cè)緩沖區(qū)是否有足夠的字節(jié)。
public class MyMessageDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MyMessageDecoder.decode,被調(diào)用"); while (in.readableBytes() >= 4){ int num = in.readInt(); System.out.println("解碼出一個(gè)整數(shù):"+num); out.add(num); } } }
ReplayingDecoder:繼承自ByteToMessageDecoder,不需要檢測(cè)緩沖區(qū)是否有足夠的字節(jié),但是ReplayingDecoder的速度略慢于ByteToMessageDecoder,而且并不是所有的ByteBuf都支持。
項(xiàng)目復(fù)雜度高用ReplayingDecoder,否則使用ByteToMessageDecoder。
public class MyMessageDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { System.out.println("MyMessageDecoder.decode,被調(diào)用"); int length = in.readInt(); byte[] content = new byte[length]; in.readBytes(content); String json = new String(content,StandardCharsets.UTF_8); MessagePO po = JSONObject.parseObject(json,MessagePO.class); out.add(po); } }
用于從一種消息編碼為另外一種消息,例如從POJO到POJO,是一種ChannelOutboundHandler
從一種消息解碼為另一種消息,例如POJO到POJO,是一種ChannelInboundHandler
整合了MessageToMessageEncoder 和 MessageToMessageDecoder
public class RequestMessageCodec extends MessageToMessageCodec<String, RequestData> { @Override protected void encode(ChannelHandlerContext ctx, RequestData msg, List<Object> out) throws Exception { System.out.println("RequestMessageCodec.encode 被調(diào)用 " + msg); String json = JSONObject.toJSONString(msg); out.add(json); } @Override protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception { System.out.println("RequestMessageCodec.decode 被調(diào)用 " + msg); RequestData po = JSONObject.parseObject(msg, RequestData.class); out.add(po); } }
ChannelInitializer是一種特殊的ChannelInboundHandler,可以通過一種簡(jiǎn)單的方式(調(diào)用initChannel方法)來初始化Channel。
通常在Bootstrap.handler(ChannelHandler)
, ServerBootstrap.handler(ChannelHandler)
和 ServerBootstrap.childHandler(ChannelHandler)
中給Channel設(shè)置ChannelPipeline。
注意:當(dāng)initChannel被執(zhí)行完后,會(huì)將當(dāng)前的handler從Pipeline中移除。
Bootstrap bootstrap = new Bootstrap().group(group)//設(shè)置線程組 .channel(NioSocketChannel.class)//設(shè)置客戶端通道的實(shí)現(xiàn)類 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyClientHandler());//加入自己的處理器 } });
ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作為服務(wù)器的通道實(shí)現(xiàn) .option(ChannelOption.SO_BACKLOG, 128)//設(shè)置線程隊(duì)列等待連接的個(gè)數(shù) .childOption(ChannelOption.SO_KEEPALIVE, true)//設(shè)置保持活動(dòng)連接狀態(tài) // .handler(null)//該Handler對(duì)應(yīng)bossGroup .childHandler(new ChannelInitializer<SocketChannel>() {//給workerGroup的EventLoop對(duì)應(yīng)的管道設(shè)置處理器 @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyServerHandler()); } });
SimpleChannelInboundHandler繼承自ChannelInboundHandlerAdapter,可以通過泛型來規(guī)定消息類型。
處理入站的數(shù)據(jù)我們只需要實(shí)現(xiàn)channelRead0方法。
SimpleChannelInboundHandler在接收到數(shù)據(jù)后會(huì)自動(dòng)release掉數(shù)據(jù)占用的Bytebuffer資源,ChannelInboundHandlerAdapter不會(huì)自動(dòng)釋放。
public class MyClientHandler extends SimpleChannelInboundHandler<MessagePO> { @Override protected void channelRead0(ChannelHandlerContext ctx, MessagePO msg) throws Exception { System.out.println("收到服務(wù)端消息:" + msg); } }
在向pipline中添加ChannelHandler時(shí),可以提供一個(gè)新的線程組,Handler業(yè)務(wù)會(huì)在該線程中執(zhí)行。
當(dāng)加ChannelHandler需要執(zhí)行多線程并發(fā)業(yè)務(wù)時(shí),DefaultEventLoopGroup可以派上大用處。
如果沒有設(shè)置DefaultEventLoopGroup,默認(rèn)使用的是EventLoopGroup workerGroup = new NioEventLoopGroup();
DefaultEventLoopGroup businessGroup = new DefaultEventLoopGroup(100); ... addLast(businessGroup, new MyNettyServerHandler())
/** * 讀取客戶端發(fā)送過來的消息 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; System.out.println("收到客戶信息:" + byteBuf.toString(CharsetUtil.UTF_8)); System.out.println("客戶端地址:" + ctx.channel().remoteAddress()); System.out.println("處理線程:" + Thread.currentThread().getName()); ctx.executor().parent().execute(()->{ try { System.out.println("parent().execute Thread = " + Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(2L); System.out.println("parent任務(wù)執(zhí)行完成1"); } catch (InterruptedException e) { e.printStackTrace(); } }); ctx.executor().parent().execute(()->{ try { System.out.println("parent().execute Thread = " + Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(2L); System.out.println("parent任務(wù)執(zhí)行完成2"); } catch (InterruptedException e) { e.printStackTrace(); } }); ctx.executor().parent().execute(()->{ try { System.out.println("parent().execute Thread = " + Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(2L); System.out.println("parent任務(wù)執(zhí)行完成3"); } catch (InterruptedException e) { e.printStackTrace(); } }); }
以上代碼執(zhí)行日志如下:
收到客戶信息:Hello 服務(wù)端
客戶端地址:/127.0.0.1:60345
處理線程:defaultEventLoopGroup-4-1
parent().execute Thread = defaultEventLoopGroup-4-2
parent().execute Thread = defaultEventLoopGroup-4-3
程序繼續(xù)~~ defaultEventLoopGroup-4-1
parent().execute Thread = defaultEventLoopGroup-4-4
parent任務(wù)執(zhí)行完成1
parent任務(wù)執(zhí)行完成3
parent任務(wù)執(zhí)行完成2
可以在Handler中通過方法ctx.channel().eventLoop().schedule()
添加定時(shí)任務(wù)
ctx.channel().eventLoop().schedule(()->{ try { System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(2L); System.out.println("定時(shí)任務(wù)執(zhí)行完成"); } catch (InterruptedException e) { e.printStackTrace(); } },10L,TimeUnit.SECONDS);
關(guān)于“Netty核心類及其作用是什么”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對(duì)“Netty核心類及其作用是什么”知識(shí)都有一定的了解,大家如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。