溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Netty4.x用戶(hù)指導(dǎo)(1)3個(gè)HelloWorld小例子

發(fā)布時(shí)間:2020-06-10 09:15:25 來(lái)源:網(wǎng)絡(luò) 閱讀:12777 作者:randy_shandong 欄目:移動(dòng)開(kāi)發(fā)

題記

最近對(duì)netty有了興趣,現(xiàn)在官方推薦版本是netty4.*,但是縱觀網(wǎng)絡(luò),大部分都是關(guān)于netty3.x的知識(shí)。

最好的學(xué)習(xí),莫過(guò)于通過(guò)官方文檔進(jìn)行學(xué)習(xí),系統(tǒng),透徹,權(quán)威,缺點(diǎn)是英文。本文,算做自己學(xué)習(xí)netty的第一篇,總體思路與User guide for 4.x基本一致,本篇文章不是嚴(yán)格意義的翻譯文章。開(kāi)始了...

1.前言

1.1 問(wèn)題

現(xiàn) 在,我們使用通用的應(yīng)用程序和程序庫(kù),進(jìn)行互相交流。例如,我們經(jīng)常使用HTTP client庫(kù)從web服務(wù)器上獲取信息,通過(guò)web services調(diào)用遠(yuǎn)程接口。然而,通用的協(xié)議或?qū)崿F(xiàn),有時(shí)候不能很好的擴(kuò)展伸縮。就像我們不能使用通用協(xié)議進(jìn)行部分信息的交換,如:huge files,e-mail,實(shí)時(shí)信息。我們需要高度優(yōu)化的協(xié)議實(shí)現(xiàn),來(lái)完成一些特殊目的。比如,你想實(shí)現(xiàn)一款專(zhuān)門(mén)的HTTP服務(wù)器,以支持基于AJAX的聊天應(yīng)用,流媒體,大文件傳輸。你需要設(shè)計(jì)和實(shí)現(xiàn)一個(gè)完整的新協(xié)議,不可避免需要處理遺留協(xié)議,在不影響性能和穩(wěn)定的情況下,實(shí)現(xiàn)新協(xié)議速度能有多塊?

1.2 解決方案

Netty 致力于提供異步,基于事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用框架,是一款進(jìn)行快速開(kāi)發(fā)高性能,可伸縮的協(xié)議服務(wù)器和客戶(hù)端工具。換句話說(shuō),Netty是一個(gè)快速和容易開(kāi)發(fā)的NIO客戶(hù)端,服務(wù)端網(wǎng)絡(luò)框架,它簡(jiǎn)化和使用流式方式處理網(wǎng)絡(luò)編程,如TCP,UDP。

"快速和容易“,并不代表netty存在可維護(hù)性和性能問(wèn)題。它很完美的提供FTP,SMTP,HTTP,二進(jìn)制和基于文本的協(xié)議支持。有的用戶(hù)可能已經(jīng)發(fā)現(xiàn)了擁有同樣有點(diǎn)的其他網(wǎng)絡(luò)應(yīng)用框架。你可能想問(wèn):netty和他們的區(qū)別?這個(gè)問(wèn)題不好回答,Netty被設(shè)計(jì)成提供最合適的API和實(shí)現(xiàn),會(huì)讓你的生活更簡(jiǎn)單。

2.開(kāi)始

本節(jié)使用一些簡(jiǎn)單的例子讓你快速的感知netty的核心結(jié)構(gòu)。閱讀本節(jié)之后,你可以熟練掌握netty,可以寫(xiě)一個(gè)client和server。

準(zhǔn)備

JDK 1.6+

最新版本的netty:下載地址

maven依賴(lài)

<dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-all</artifactId>
   <version>${netty.version}</version>
</dependency>


2.1寫(xiě)一個(gè)Discard服務(wù)

最簡(jiǎn)單的協(xié)議,不是'hello world',而是DISCARD。這個(gè)協(xié)議拋棄接收的數(shù)據(jù),沒(méi)有響應(yīng)。

協(xié)議實(shí)現(xiàn),唯一需要做的一件事就是無(wú)視所有接收到的數(shù)據(jù),讓我們開(kāi)始吧。

直接上Handler的實(shí)現(xiàn),它處理來(lái)自netty的I/O事件。

package io.netty.examples.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
        /*
        //也可以這樣
         try {        // Do something with msg
              } finally {
                      ReferenceCountUtil.release(msg);
            }        
        */
        
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}


1)DiscardServerHandler繼承了ChannelInboundHandlerAdapter,

實(shí)現(xiàn)了ChannelInboundHandler接口。ChannelInboundHandler提供了多個(gè)可重寫(xiě)的事件處理方法。現(xiàn)在,繼承了ChannelInboundHandlerAdapter,而不用自己實(shí)現(xiàn)handler接口。

2)重寫(xiě)了channelRead(),這個(gè)方法在接收到新信息時(shí)被調(diào)用。信息類(lèi)型是ByteBuf

3)實(shí)現(xiàn)Discard協(xié)議,ByteBuf是reference-counted對(duì)象,必須顯示的調(diào)用release(),進(jìn)行釋放。需要記住handler的責(zé)任包括釋放任意的reference-counted對(duì)象。

4)exceptionCaught(),當(dāng)發(fā)生異常時(shí)調(diào)用,I/O異常或Handler處理異常。一般情況下,在這里可以記錄異常日志,和返回錯(cuò)誤碼。

到了這里,完成了一半,接下來(lái)編寫(xiě)main(),啟動(dòng)Server

package io.netty.examples.discard;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.examples.discard.time.TimeServerHandler;

/**
 * Discards any incoming data.
 */
public class DiscardServer {

    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new TimeServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)

            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}
  1. NioEventLoopGroup 是一個(gè)多線程的處理I/O操作Event Loop(這是異步機(jī)制特點(diǎn)) Netty 提供了多個(gè) EventLoopGroup,支持多種傳輸方式。在這個(gè)例子中,我們使用了2個(gè)NioEventLoopGroup。


    第一個(gè),叫"boss",接收進(jìn)入的連接,第二個(gè)經(jīng)常叫“worker",一旦boss接收了連接并注冊(cè)連接到這個(gè)worker,worker就會(huì)處理這個(gè)連接。多少個(gè)線程被使用?,EventLoopGroup擁有多少個(gè)Channel?都可以通過(guò)構(gòu)造函數(shù)配置

  2. ServerBootstrap是建立server的幫助類(lèi)。你可以使用Channel直接創(chuàng)建server,注意,這個(gè)創(chuàng)建過(guò)程非常的冗長(zhǎng),大部分情況,你不需要直接創(chuàng)建。

  3. 指定 NioServerSocketChannel類(lèi),當(dāng)接收新接入連接時(shí),被用在Channel的初始化。

  4. 這個(gè)handler,初始化Channel時(shí)調(diào)用。ChannelInitializer是一個(gè)專(zhuān)門(mén)用于配置新的Channel的Handler,一般為Channel配置ChannelPipeline。當(dāng)業(yè)務(wù)復(fù)雜時(shí),會(huì)添加更多的handler到pipeline.

  5. 你可以設(shè)置影響Channel實(shí)現(xiàn)的參數(shù),比如keepAlive..

  6. option()  影響的是接收進(jìn)入的連接 的NioServerSocketChannel ;childOption()影響的是來(lái)自父 ServerChannel分發(fā)的Channel, 在本例中是 NioServerSocketChannel

  7. 保證工作準(zhǔn)備好了


通過(guò)telnet進(jìn)行測(cè)試。

輸入telnet 127.0.0.1 8080

Netty4.x用戶(hù)指導(dǎo)(1)3個(gè)HelloWorld小例子

由于是Discard協(xié)議,一沒(méi)響應(yīng),二服務(wù)端也沒(méi)輸出。通過(guò)測(cè)試,只能確認(rèn)是服務(wù)正常啟動(dòng)了。

調(diào)整下Handler邏輯,修改channelRead()方法。

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
        try {
           while (in.isReadable()) { // (1)
             System.out.print((char) in.readByte());
            System.out.flush();
             }
    } finally {
            ReferenceCountUtil.release(msg); // (2)
    }
}

修改之后,在使用telnet測(cè)試。你在命令行中輸入什么,在服務(wù)端就能看到什么!(只能是英文)


2.2寫(xiě)一個(gè)Echo服務(wù)


作為一個(gè)服務(wù),沒(méi)有返回響應(yīng)信息,明顯是不合格的。接下來(lái)實(shí)現(xiàn)Echo協(xié)議,返回響應(yīng)信息。與前面的Discard服務(wù)實(shí)現(xiàn)唯一的不同點(diǎn)就是Handler,需要回寫(xiě)接收到的信息,而不是打印輸出。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg); // (1)
        ctx.flush(); // (2)
    }

1)ChannelHandlerContext提供多個(gè)方法,觸發(fā)I/O事件。在這里,不需要像Discard一樣釋放 接收的信息。

2)ctx.write(Object)不能保證完全寫(xiě)入,底層存在緩存,需要通過(guò)ctx.flush()刷新,保證完全寫(xiě)入。


完成。

2.3寫(xiě)一個(gè)Time服務(wù)

時(shí)間協(xié)議見(jiàn) TIME protocol。和前面2個(gè)協(xié)議不同,服務(wù)將發(fā)送一個(gè)32位的integer值。不需要接收信息,一旦信息發(fā)出,將關(guān)閉連接。在這個(gè)例子中,將學(xué)到如何構(gòu)造和發(fā)送信息,并在完成時(shí)關(guān)閉連接。

由于我們不需要接收信息,所以不需要channelRead()方法,而是使用channelActive()。

package io.netty.example.time;public class TimeServerHandler extends ChannelInboundHandlerAdapter {    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {            @Override
            public void operationComplete(ChannelFuture future) {                assert f == future;
                ctx.close();
            }
        }); // (4)
    }    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

1)channelActive()在連接建立和準(zhǔn)備完成時(shí)調(diào)用。

2) 發(fā)送一個(gè)新消息,需要分配一個(gè)buff.將來(lái)寫(xiě)入一個(gè)4位的integer值。獲取當(dāng)前的 ByteBufAllocator,可以通過(guò)ChannelHandlerContext.alloc()獲取。

3)注意沒(méi)有java.nio.ByteBuffer.flip(),(nio 讀寫(xiě)切換時(shí)使用),這是因?yàn)閚etty重寫(xiě)了Buffer,維護(hù)了2個(gè)指示器,分別用來(lái)(reader index)讀取和( writer index )寫(xiě)入。

注意:ChannelHandlerContext.write() (and writeAndFlush())返回的ChannelFuture 。ChannelFuture 代表一個(gè)尚未發(fā)生的I/O操作。這一個(gè)是異步操作,會(huì)觸發(fā)通知listeners 。這兒的意思是當(dāng)ChannelFuture完成時(shí),才會(huì)調(diào)用close()。


最終版的Server Handler

package io.netty.examples.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        }); // (4)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

接下來(lái)實(shí)現(xiàn)Time客戶(hù)端

與Echo和Discard協(xié)議不同,人不能很好的將32位的integer,轉(zhuǎn)換為可以理解的日期數(shù)據(jù)。通過(guò)學(xué)習(xí)本節(jié),可以學(xué)習(xí)如何寫(xiě)一個(gè)Client。這和前面的EchoServer,DiscardServer實(shí)現(xiàn)有很大的不同。

package io.netty.examples.time;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = "127.0.0.1";
        int port = Integer.parseInt("8080");
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. Bootstrap is similar to ServerBootstrap except that it's for non-server channels such as a client-side or connectionless channel.

  2. 如果只使用一個(gè) EventLoopGroup,它會(huì)被當(dāng)成boss group 和 worker group.  boss worker不會(huì)再client使用。

  3. 代替NioServerSocketChannelNioSocketChannel 是一個(gè) 客戶(hù)端的Channel.

  4. 注意沒(méi)有使用 childOption() ,因?yàn)榭蛻?hù)端沒(méi)有上級(jí)。

  5. 調(diào)用connect(),而不是 bind()

下面是對(duì)應(yīng)的Handler

package io.netty.examples.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}


  1. In TCP/IP, Netty reads the data sent from a peer into a ByteBuf.

測(cè)試過(guò)程:略。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI