您好,登錄后才能下訂單哦!
可根據(jù)需要配置線程模型:?jiǎn)尉€程Reactor、多線程Reactor、多層線程Reactor
無論幾個(gè)線程,都通過單一的Acceptor接收客戶端請(qǐng)求,可以創(chuàng)建更多的NioEventLoop來處理IO操作。
EventLoop和EventLoopGroup實(shí)際繼承了Java的ScheduledExecutorService,使其具備了線程池的特性,其線程數(shù)量可動(dòng)態(tài)配置。例如配置單線程模型,設(shè)置線程數(shù)量為1即可。
Future即異步操作
future操作可以被close,但結(jié)果是未知的;調(diào)用get可以獲取操作結(jié)果,但是會(huì)被阻塞;isDone可判斷是否完成操作。
ChannelFuture是為了獲取異步返回結(jié)果而設(shè)計(jì)
可以通過ChannelFutureListener接口獲得回調(diào),無需等待get方法返回。
public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> {
ChannelFutureListener CLOSE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
future.channel().close();
}
};
ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
future.channel().close();
}
}
};
ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
future.channel().pipeline().fireExceptionCaught(future.cause());
}
}
};
}
連接超時(shí)和channel超時(shí)配置
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,10000);
channelFutrue.awaitUninterruptibly(10, TimeUnit.SECONDS);
注意:
1、謹(jǐn)慎調(diào)用await,可能導(dǎo)致死鎖。
2、ChannelFuture超時(shí)后如果調(diào)用了業(yè)務(wù)代碼重連,而此時(shí)IO未超時(shí),將可能導(dǎo)致多條連接并存,設(shè)置IO超時(shí)時(shí)間建議小于業(yè)務(wù)代碼超時(shí)時(shí)間。
升級(jí)版的future,可寫可操作(對(duì)回調(diào)過程)。future好比古代飛鴿傳書,只能等鴿子回來或者不回來,不可控;promise就像現(xiàn)代快遞員,送快遞送一半可以打電話給他叫他不要送了或者中途請(qǐng)他幫忙買個(gè)餅。
例如:
DefaultPromise類
awaitUninterruptibly()可手動(dòng)打斷回調(diào),使進(jìn)程等待。
public Promise<V> awaitUninterruptibly() {
if (this.isDone()) {
return this;
} else {
boolean interrupted = false;
synchronized(this) {
while(!this.isDone()) {
this.checkDeadLock();
this.incWaiters();
try {
this.wait();
} catch (InterruptedException var9) {
interrupted = true;
} finally {
this.decWaiters();
}
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
return this;
}
}
進(jìn)行了死鎖判斷,避免已存在相同任務(wù);并限制了最大等待數(shù)量32767
protected void checkDeadLock() {
EventExecutor e = this.executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(this.toString());
}
}
private void incWaiters() {
if (this.waiters == 32767) {
throw new IllegalStateException("too many waiters: " + this);
} else {
++this.waiters;
}
}
Channel負(fù)責(zé)對(duì)外提供操作IO的接口,而UnSafe是Channel的內(nèi)部接口類,如其名一樣是不安全的操作,所以封裝在接口內(nèi)部不讓外部調(diào)用,而實(shí)際的操作IO最終都是在Unsafe中執(zhí)行。
//Channel調(diào)用連接為例,跟蹤實(shí)現(xiàn)連接請(qǐng)求的過程
ChannelFuture connect(SocketAddress var1);
//DefaultChannelPipeline中執(zhí)行,實(shí)際是調(diào)用尾部的pipeline
public ChannelFuture connect(SocketAddress remoteAddress) {
return this.tail.connect(remoteAddress);
}
//AbstractChannelHandlerContext是Pipeline容器中的對(duì)象,
//持續(xù)尋找所有handler執(zhí)行對(duì)象,直到全部被調(diào)用
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
AbstractChannelHandlerContext next = this.findContextOutbound();
next.invoker().invokeConnect(next, remoteAddress, localAddress, promise);
return promise;
}
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while(!ctx.outbound);
return ctx;
}
//而真實(shí)的執(zhí)行是尋找到UnSafe的Invoker
public ChannelHandlerInvoker invoker() {
return this.invoker == null ? this.channel().unsafe().invoker() : this.invoker;
}
public void invokeConnect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
} else if (ChannelHandlerInvokerUtil.validatePromise(ctx, promise, false)) {
if (this.executor.inEventLoop()) {
ChannelHandlerInvokerUtil.invokeConnectNow(ctx, remoteAddress, localAddress, promise);
} else {
this.safeExecuteOutbound(new OneTimeTask() {
public void run() {
ChannelHandlerInvokerUtil.invokeConnectNow(ctx, remoteAddress, localAddress, promise);
}
}, promise);
}
}
}
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。