溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Netty線程模型、Future、Channel總結(jié)和源碼分析

發(fā)布時(shí)間:2020-06-13 09:25:02 來源:網(wǎng)絡(luò) 閱讀:2990 作者:星恒Android 欄目:移動(dòng)開發(fā)
Netty線程模型

可根據(jù)需要配置線程模型:?jiǎn)尉€程Reactor、多線程Reactor、多層線程Reactor

無論幾個(gè)線程,都通過單一的Acceptor接收客戶端請(qǐng)求,可以創(chuàng)建更多的NioEventLoop來處理IO操作。

EventLoop和EventLoopGroup實(shí)際繼承了Java的ScheduledExecutorService,使其具備了線程池的特性,其線程數(shù)量可動(dòng)態(tài)配置。例如配置單線程模型,設(shè)置線程數(shù)量為1即可。

Future和Promise
Future

Future即異步操作
future操作可以被close,但結(jié)果是未知的;調(diào)用get可以獲取操作結(jié)果,但是會(huì)被阻塞;isDone可判斷是否完成操作。
ChannelFuture是為了獲取異步返回結(jié)果而設(shè)計(jì)
可以通過ChannelFutureListener接口獲得回調(diào),無需等待get方法返回。

public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> {
    ChannelFutureListener CLOSE = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            future.channel().close();
        }
    };
    ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            if (!future.isSuccess()) {
                future.channel().close();
            }

        }
    };
    ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            if (!future.isSuccess()) {
                future.channel().pipeline().fireExceptionCaught(future.cause());
            }

        }
    };
}

連接超時(shí)和channel超時(shí)配置
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,10000);
channelFutrue.awaitUninterruptibly(10, TimeUnit.SECONDS);

注意:
1、謹(jǐn)慎調(diào)用await,可能導(dǎo)致死鎖。
2、ChannelFuture超時(shí)后如果調(diào)用了業(yè)務(wù)代碼重連,而此時(shí)IO未超時(shí),將可能導(dǎo)致多條連接并存,設(shè)置IO超時(shí)時(shí)間建議小于業(yè)務(wù)代碼超時(shí)時(shí)間。

promise

升級(jí)版的future,可寫可操作(對(duì)回調(diào)過程)。future好比古代飛鴿傳書,只能等鴿子回來或者不回來,不可控;promise就像現(xiàn)代快遞員,送快遞送一半可以打電話給他叫他不要送了或者中途請(qǐng)他幫忙買個(gè)餅。
例如:
DefaultPromise類
awaitUninterruptibly()可手動(dòng)打斷回調(diào),使進(jìn)程等待。

 public Promise<V> awaitUninterruptibly() {
        if (this.isDone()) {
            return this;
        } else {
            boolean interrupted = false;
            synchronized(this) {
                while(!this.isDone()) {
                    this.checkDeadLock();
                    this.incWaiters();

                    try {
                        this.wait();
                    } catch (InterruptedException var9) {
                        interrupted = true;
                    } finally {
                        this.decWaiters();
                    }
                }
            }

            if (interrupted) {
                Thread.currentThread().interrupt();
            }

            return this;
        }
    }

進(jìn)行了死鎖判斷,避免已存在相同任務(wù);并限制了最大等待數(shù)量32767

    protected void checkDeadLock() {
        EventExecutor e = this.executor();
        if (e != null && e.inEventLoop()) {
            throw new BlockingOperationException(this.toString());
        }
    }

private void incWaiters() {
        if (this.waiters == 32767) {
            throw new IllegalStateException("too many waiters: " + this);
        } else {
            ++this.waiters;
        }
    }
Channel和UnSafe

Channel負(fù)責(zé)對(duì)外提供操作IO的接口,而UnSafe是Channel的內(nèi)部接口類,如其名一樣是不安全的操作,所以封裝在接口內(nèi)部不讓外部調(diào)用,而實(shí)際的操作IO最終都是在Unsafe中執(zhí)行。

//Channel調(diào)用連接為例,跟蹤實(shí)現(xiàn)連接請(qǐng)求的過程
ChannelFuture connect(SocketAddress var1);

//DefaultChannelPipeline中執(zhí)行,實(shí)際是調(diào)用尾部的pipeline
 public ChannelFuture connect(SocketAddress remoteAddress) {
        return this.tail.connect(remoteAddress);
    }

//AbstractChannelHandlerContext是Pipeline容器中的對(duì)象,
//持續(xù)尋找所有handler執(zhí)行對(duì)象,直到全部被調(diào)用
 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        AbstractChannelHandlerContext next = this.findContextOutbound();
        next.invoker().invokeConnect(next, remoteAddress, localAddress, promise);
        return promise;
    }
   private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;

        do {
            ctx = ctx.prev;
        } while(!ctx.outbound);

        return ctx;
    }

//而真實(shí)的執(zhí)行是尋找到UnSafe的Invoker
   public ChannelHandlerInvoker invoker() {
        return this.invoker == null ? this.channel().unsafe().invoker() : this.invoker;
    }

 public void invokeConnect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        if (remoteAddress == null) {
            throw new NullPointerException("remoteAddress");
        } else if (ChannelHandlerInvokerUtil.validatePromise(ctx, promise, false)) {
            if (this.executor.inEventLoop()) {
                ChannelHandlerInvokerUtil.invokeConnectNow(ctx, remoteAddress, localAddress, promise);
            } else {
                this.safeExecuteOutbound(new OneTimeTask() {
                    public void run() {
                        ChannelHandlerInvokerUtil.invokeConnectNow(ctx, remoteAddress, localAddress, promise);
                    }
                }, promise);
            }

        }
    }
向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI