您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“Netty是如何綁定端口和啟動(dòng)服務(wù)的”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
服務(wù)端啟動(dòng)DEMO
先從一個(gè)簡(jiǎn)單的服務(wù)端啟動(dòng)`DEMO`開(kāi)始,以下是一個(gè)標(biāo)準(zhǔn)的`Netty`服務(wù)端代碼
public final class NettyServer {public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup();try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) { ChannelPipeline channelPipeline = channel.pipeline(); channelPipeline.addLast("decoder", new StringDecoder()); channelPipeline.addLast("encoder", new StringEncoder()); channelPipeline.addLast("handler", new ServerHandler()); } }); ChannelFuture channelFuture = serverBootstrap.bind(8888).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}br
注:ServerBootstrap.childHandler()用于指定處理新連接數(shù)據(jù)的讀寫(xiě)處理邏輯,同時(shí)ServerBootstrap還提供handler()用于指定在服務(wù)端啟動(dòng)過(guò)程中的一些邏輯,通常情況下我們用不著這個(gè)方法
`ServerHandler`代碼如下:
public class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("channelActive"); } @Override public void channelRegistered(ChannelHandlerContext ctx) { System.out.println("channelRegistered"); } @Override public void handlerAdded(ChannelHandlerContext ctx) { System.out.println("handlerAdded"); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channelReadComplete"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelInactive"); } @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("service receive msg:" + msg); }}br
當(dāng)有新連接接入時(shí),控制臺(tái)打印出
reStructuredTexthandlerAddedchannelRegisteredchannelActivebr
但接收到新消息時(shí),控制臺(tái)打印出
reStructuredTextservice receive msg:xxxchannelReadCompletebr
本文主要分析服務(wù)端的啟動(dòng)過(guò)程,而新連接接入 新消息的讀取會(huì)在后續(xù)章節(jié)中說(shuō)明
服務(wù)端啟動(dòng)源碼分析
`ServerBootstrap`是`Netty`為方便開(kāi)發(fā)者使用而設(shè)計(jì)的一個(gè)啟動(dòng)類(lèi),其核心代碼入口在`bind()`,代碼如下
public ChannelFuture bind(int inetPort) {return bind(new InetSocketAddress(inetPort));}//通過(guò)端口號(hào)創(chuàng)建一個(gè)`InetSocketAddress`,然后繼續(xù)調(diào)用重載的`bind()`public ChannelFuture bind(SocketAddress localAddress) {// ...return doBind(localAddress);}br
由于博客篇幅有限所以有些健壯性分支會(huì)以`// ...`略過(guò),健壯性分支不會(huì)影響對(duì)`Netty`主流程的理解。
`Netty`服務(wù)端啟動(dòng)最后會(huì)調(diào)用到`doBind()`方法,代碼如下
private ChannelFuture doBind(final SocketAddress localAddress) {//...final ChannelFuture regFuture = initAndRegister();//...final Channel channel = regFuture.channel();//... doBind0(regFuture, channel, localAddress, promise);//...return promise;br
在`doBind()`中我們關(guān)注兩個(gè)核心方法`initAndRegister()`以及`doBind0()`
服務(wù)端`Channel`創(chuàng)建
final ChannelFuture initAndRegister() { Channel channel = null;// 新建Channel channel = channelFactory.newChannel();// 初始化Channelinit(channel);// 將這個(gè)Channel Register到某個(gè)對(duì)象 ChannelFuture regFuture = config().group().register(channel);return regFuture;}br
`Channel`是`Netty`的核心概念之一,它是`Netty`網(wǎng)絡(luò)通信的主體由它負(fù)責(zé)同對(duì)端進(jìn)行網(wǎng)絡(luò)通信、注冊(cè)和數(shù)據(jù)操作等功能。
`Channel`的創(chuàng)建是由`channelFactory.newChannel()`完成的,接下來(lái)跟蹤`channelFactory`是在何時(shí)被初始化,我們層層回溯最終發(fā)現(xiàn)是在這個(gè)函數(shù)中
public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass"); }return channelFactory(new ReflectiveChannelFactory<C>(channelClass));}br
在`Demo`程序調(diào)用`.channel()`方法并傳入`NioServerSocketChannel.class`
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {private final Class<? extends T> clazz;public ReflectiveChannelFactory(Class<? extends T> clazz) {if (clazz == null) {throw new NullPointerException("clazz"); }this.clazz = clazz; }@Overridepublic T newChannel() {try {return clazz.newInstance(); } catch (Throwable t) {throw new ChannelException("Unable to create Channel from class " + clazz, t); } }}br
即在`Netty`服務(wù)端啟動(dòng)的時(shí)候通過(guò)反射方式(調(diào)用默認(rèn)構(gòu)造函數(shù))來(lái)創(chuàng)建一個(gè)`NioServerSocketChannel`對(duì)象,
加下來(lái)我們繼續(xù)跟進(jìn)`NioServerSocketChannel`的默認(rèn)構(gòu)造函數(shù)
public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER));}private static ServerSocketChannel newSocket(SelectorProvider provider) {try {// 利用SelectorProvider產(chǎn)生一個(gè)ServerSocketChannel對(duì)象return provider.openServerSocketChannel(); } catch (IOException e) {throw new ChannelException("Failed to open a server socket.", e); }}br
通過(guò)`newSocket(DEFAULT_SELECTOR_PROVIDER)`創(chuàng)建一條`server`端`channel`,然后進(jìn)入到以下方法
public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket());}br
該方法主要完成兩個(gè)功能,首先是調(diào)用父類(lèi)的構(gòu)造方法然后初始化`NioServerSocketChannelConfig`屬性
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent, ch, readInterestOp);}protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;// 設(shè)置SelectionKey.OP_ACCEPT事件this.readInterestOp = readInterestOp;// 設(shè)置ServerSocketChannel為非阻塞的 ch.configureBlocking(false);}br
這里將前面通過(guò)`provider.openServerSocketChannel()`創(chuàng)建出來(lái)的`ServerSocketChannel`保存到成員變量,然后調(diào)用將該`channel`為非阻塞模式,這是個(gè)標(biāo)準(zhǔn)的`JDK NIO`編程的玩法。這里的`readInterestOp`即前面層層傳入的`SelectionKey.OP_ACCEPT`,接下來(lái)繼續(xù)跟進(jìn)`super(parent)`
protected AbstractChannel(Channel parent) {this.parent = parent; id = newId();unsafe = newUnsafe(); pipeline = newChannelPipeline();}br
在`AbstractChannel`的構(gòu)造方法中主要是初始化了`id`,`unsafe`,`pipeline`屬性
服務(wù)端`Channel`初始化
在創(chuàng)建完`Channel`后,我們?cè)赻init`方法中對(duì)`Channel`進(jìn)行初始化操作,代碼如下
void init(Channel channel) throws Exception {// 給channel設(shè)置optionfinal Map<ChannelOption<?>, Object> options = options0();synchronized (options) { channel.config().setOptions(options); }// 給channel設(shè)置attrfinal Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {// 設(shè)置新接入channel的options currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); }synchronized (childAttrs) {// 設(shè)置新接入channel的attrs currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();// 設(shè)置handler到pipeline上// 這里的handler()返回的就是.handler()所設(shè)置的值 ChannelHandler handler = config.handler();if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// p.addLast()向serverChannel的流水線(xiàn)處理器中加入了一個(gè)ServerBootstrapAcceptor// 從名字上就可以看出來(lái)這是一個(gè)接入器,專(zhuān)門(mén)接受新請(qǐng)求,把新的請(qǐng)求扔給某個(gè)事件循環(huán)器 pipeline.addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });}br
以上代碼主要完成如下功能:
ChannelFuture regFuture = config().group().register(channel);```此處`config().group()`返回的對(duì)象為`NioEventLoopGroup````java@Overridepublic ChannelFuture register(Channel channel) {// 調(diào)用了NioEventLoop對(duì)象中的register方法// NioEventLoop extends SingleThreadEventLoopreturn next().register(channel);}br
public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this));}public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise;}br
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// ... AbstractChannel.this.eventLoop = eventLoop;// ... register0(promise);}br
private void register0(ChannelPromise promise) { try { boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); }}//這一段其實(shí)也很清晰,先調(diào)用`doRegister()`進(jìn)行注冊(cè)protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { // ... } }}br
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } });}//在`dobind0()`方法中通過(guò)`EventLoop`執(zhí)行一個(gè)任務(wù),接下來(lái)我們進(jìn)入到`channel.bind()`方法public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);}public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise);}br
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {unsafe.bind(localAddress, promise);}//這里的`unsafe`就是前面提到的`AbstractUnsafe`, 準(zhǔn)確點(diǎn)應(yīng)該是`NioMessageUnsafe`@Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {// ... boolean wasActive = isActive();// ... doBind(localAddress);if (!wasActive && isActive()) { invokeLater(new Runnable() { @Overridepublic void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise);}//在`doBind`方法中完成綁定操作,代碼如下protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); }}br
最終調(diào)用到了`JDK`里面的`bind`方法真正進(jìn)行了端口的綁定。按照正常流程我們前面已經(jīng)分析到`isActive()`方法返回`false`,進(jìn)入到`doBind()`之后如果`channel`被激活了,就發(fā)起`pipeline.fireChannelActive()`調(diào)用
public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead();}//`pipeline.channelActive`會(huì)逐一調(diào)用`pipeline`中每一個(gè)節(jié)點(diǎn)的`channelActive`方法,所以`HeadContext`的`channelActive`將會(huì)被調(diào)用,即`readIfIsAutoRead`方法將會(huì)被調(diào)用public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead();}private void readIfIsAutoRead() {if (channel.config().isAutoRead()) { channel.read(); }}//最終這個(gè)方法會(huì)調(diào)用到`AbstractNioChannel`的`doBeginRead`方法protected void doBeginRead() throws Exception {final SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return; } readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); }}br
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 建NioServerSocketChannelserverSocketChannel.configureBlocking(false);//AbstractNioChannel中ch.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));// NioServerSocketChannel.doBind()Selector selector = Selector.open();// NioEventLoop.openSelector()serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT) // AbstractNioChannel.doBeginRead()br
“Netty是如何綁定端口和啟動(dòng)服務(wù)的”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。