溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Netty核心類及其作用是什么

發(fā)布時(shí)間:2023-04-26 10:33:48 來源:億速云 閱讀:97 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹了Netty核心類及其作用是什么的相關(guān)知識(shí),內(nèi)容詳細(xì)易懂,操作簡(jiǎn)單快捷,具有一定借鑒價(jià)值,相信大家閱讀完這篇Netty核心類及其作用是什么文章都會(huì)有所收獲,下面我們一起來看看吧。

MessageToByteEncoder

MessageToByteEncoder是一個(gè)抽象編碼器,子類可重寫encode方法把對(duì)象編碼為ByteBuf輸出。

MessageToByteEncoder繼承自ChannelOutboundHandlerAdapter,encode在出站是被調(diào)用。

public class MyMessageEncoder extends MessageToByteEncoder<MessagePO> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MessagePO msg, ByteBuf out) throws Exception {
        System.out.println("MyMessageEncoder.encode,被調(diào)用");
        String json = JSONObject.toJSONString(msg);
        out.writeInt(json.getBytes(StandardCharsets.UTF_8).length);
        out.writeBytes(json.getBytes(StandardCharsets.UTF_8));
    }
}

ByteToMessageDecoder

ByteToMessageDecoder是一種ChannelInboundHandler,可以稱為解碼器,負(fù)責(zé)將byte字節(jié)流(ByteBuf)轉(zhuǎn)換成一種Message,Message是應(yīng)用可以自己定義的一種Java對(duì)象。

ByteToMessageDecoder:用于將字節(jié)轉(zhuǎn)為消息,需要檢測(cè)緩沖區(qū)是否有足夠的字節(jié)。

public class MyMessageDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MyMessageDecoder.decode,被調(diào)用");
        while (in.readableBytes() >= 4){
            int num = in.readInt();
            System.out.println("解碼出一個(gè)整數(shù):"+num);
            out.add(num);
        }
    }
}

ReplayingDecoder

ReplayingDecoder:繼承自ByteToMessageDecoder,不需要檢測(cè)緩沖區(qū)是否有足夠的字節(jié),但是ReplayingDecoder的速度略慢于ByteToMessageDecoder,而且并不是所有的ByteBuf都支持。

項(xiàng)目復(fù)雜度高用ReplayingDecoder,否則使用ByteToMessageDecoder。

public class MyMessageDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("MyMessageDecoder.decode,被調(diào)用");
        int length = in.readInt();
        byte[] content = new byte[length];
        in.readBytes(content);
        String json = new String(content,StandardCharsets.UTF_8);
        MessagePO po = JSONObject.parseObject(json,MessagePO.class);
        out.add(po);
    }
}

MessageToMessageEncoder

用于從一種消息編碼為另外一種消息,例如從POJO到POJO,是一種ChannelOutboundHandler

MessageToMessageDecoder

從一種消息解碼為另一種消息,例如POJO到POJO,是一種ChannelInboundHandler

MessageToMessageCodec

整合了MessageToMessageEncoder 和 MessageToMessageDecoder

public class RequestMessageCodec extends MessageToMessageCodec<String, RequestData> {
    @Override
    protected void encode(ChannelHandlerContext ctx, RequestData msg, List<Object> out) throws Exception {
        System.out.println("RequestMessageCodec.encode 被調(diào)用 " + msg);
        String json = JSONObject.toJSONString(msg);
        out.add(json);
    }
    @Override
    protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
        System.out.println("RequestMessageCodec.decode 被調(diào)用 " + msg);
        RequestData po = JSONObject.parseObject(msg, RequestData.class);
        out.add(po);
    }
}

ChannelInitializer

ChannelInitializer是一種特殊的ChannelInboundHandler,可以通過一種簡(jiǎn)單的方式(調(diào)用initChannel方法)來初始化Channel。

通常在Bootstrap.handler(ChannelHandler) , ServerBootstrap.handler(ChannelHandler)ServerBootstrap.childHandler(ChannelHandler)中給Channel設(shè)置ChannelPipeline。

注意:當(dāng)initChannel被執(zhí)行完后,會(huì)將當(dāng)前的handler從Pipeline中移除。

Bootstrap bootstrap = new Bootstrap().group(group)//設(shè)置線程組
    .channel(NioSocketChannel.class)//設(shè)置客戶端通道的實(shí)現(xiàn)類
    .handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new NettyClientHandler());//加入自己的處理器
        }
    });
ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作為服務(wù)器的通道實(shí)現(xiàn)
    .option(ChannelOption.SO_BACKLOG, 128)//設(shè)置線程隊(duì)列等待連接的個(gè)數(shù)
    .childOption(ChannelOption.SO_KEEPALIVE, true)//設(shè)置保持活動(dòng)連接狀態(tài)
//      .handler(null)//該Handler對(duì)應(yīng)bossGroup
    .childHandler(new ChannelInitializer<SocketChannel>() {//給workerGroup的EventLoop對(duì)應(yīng)的管道設(shè)置處理器
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new NettyServerHandler());
        }
    });

SimpleChannelInboundHandler

SimpleChannelInboundHandler繼承自ChannelInboundHandlerAdapter,可以通過泛型來規(guī)定消息類型。

處理入站的數(shù)據(jù)我們只需要實(shí)現(xiàn)channelRead0方法。

SimpleChannelInboundHandler在接收到數(shù)據(jù)后會(huì)自動(dòng)release掉數(shù)據(jù)占用的Bytebuffer資源,ChannelInboundHandlerAdapter不會(huì)自動(dòng)釋放。

public class MyClientHandler extends SimpleChannelInboundHandler<MessagePO> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessagePO msg) throws Exception {
        System.out.println("收到服務(wù)端消息:" + msg);
    }
}

DefaultEventLoopGroup

在向pipline中添加ChannelHandler時(shí),可以提供一個(gè)新的線程組,Handler業(yè)務(wù)會(huì)在該線程中執(zhí)行。

當(dāng)加ChannelHandler需要執(zhí)行多線程并發(fā)業(yè)務(wù)時(shí),DefaultEventLoopGroup可以派上大用處。

如果沒有設(shè)置DefaultEventLoopGroup,默認(rèn)使用的是EventLoopGroup workerGroup = new NioEventLoopGroup();

DefaultEventLoopGroup businessGroup = new DefaultEventLoopGroup(100);
...
addLast(businessGroup, new MyNettyServerHandler())
/**
 * 讀取客戶端發(fā)送過來的消息
 */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ByteBuf byteBuf = (ByteBuf) msg;
    System.out.println("收到客戶信息:" + byteBuf.toString(CharsetUtil.UTF_8));
    System.out.println("客戶端地址:" + ctx.channel().remoteAddress());
    System.out.println("處理線程:" + Thread.currentThread().getName());
    ctx.executor().parent().execute(()->{
        try {
            System.out.println("parent().execute Thread = " + Thread.currentThread().getName());
            TimeUnit.SECONDS.sleep(2L);
            System.out.println("parent任務(wù)執(zhí)行完成1");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    ctx.executor().parent().execute(()->{
        try {
            System.out.println("parent().execute Thread = " + Thread.currentThread().getName());
            TimeUnit.SECONDS.sleep(2L);
            System.out.println("parent任務(wù)執(zhí)行完成2");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    ctx.executor().parent().execute(()->{
        try {
            System.out.println("parent().execute Thread = " + Thread.currentThread().getName());
            TimeUnit.SECONDS.sleep(2L);
            System.out.println("parent任務(wù)執(zhí)行完成3");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}

以上代碼執(zhí)行日志如下:

收到客戶信息:Hello 服務(wù)端
客戶端地址:/127.0.0.1:60345
處理線程:defaultEventLoopGroup-4-1
parent().execute Thread = defaultEventLoopGroup-4-2
parent().execute Thread = defaultEventLoopGroup-4-3
程序繼續(xù)~~ defaultEventLoopGroup-4-1
parent().execute Thread = defaultEventLoopGroup-4-4
parent任務(wù)執(zhí)行完成1
parent任務(wù)執(zhí)行完成3
parent任務(wù)執(zhí)行完成2

EventLoop定時(shí)任務(wù)

可以在Handler中通過方法ctx.channel().eventLoop().schedule()添加定時(shí)任務(wù)

ctx.channel().eventLoop().schedule(()->{
    try {
        System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName());
        TimeUnit.SECONDS.sleep(2L);
        System.out.println("定時(shí)任務(wù)執(zhí)行完成");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
},10L,TimeUnit.SECONDS);

關(guān)于“Netty核心類及其作用是什么”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對(duì)“Netty核心類及其作用是什么”知識(shí)都有一定的了解,大家如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI