您好,登錄后才能下訂單哦!
這篇文章主要介紹“游戲服務(wù)器中的Netty應(yīng)用怎么實(shí)現(xiàn)”,在日常操作中,相信很多人在游戲服務(wù)器中的Netty應(yīng)用怎么實(shí)現(xiàn)問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”游戲服務(wù)器中的Netty應(yīng)用怎么實(shí)現(xiàn)”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
端口號資源
cat /proc/sys/net/ipv4/ip_local_port_range
文件描述符資源
系統(tǒng)級:當(dāng)前系統(tǒng)可打開的最大數(shù)量,通過 cat /proc/sys/fs/file-max 查看
用戶級:指定用戶可打開的最大數(shù)量,通過 cat /etc/security/limits.conf 查看
進(jìn)程級:單個進(jìn)程可打開的最大數(shù)量,通過 cat /proc/sys/fs/nr_open 查看
線程資源 BIO/NIO
所有操作都是同步阻塞(accept,read)
客戶端連接數(shù)與服務(wù)器線程數(shù)比例是1:1
非阻塞IO
通過selector實(shí)現(xiàn)可以一個線程管理多個連接
通過selector的事件注冊(OP_READ/OP_WRITE/OP_CONNECT/OP_ACCEPT),處理自己感興趣的事件
客戶端連接數(shù)與服務(wù)器線程數(shù)比例是n:1
所有IO在同一個NIO線程完成(處理連接,分派請求,編碼,解碼,邏輯運(yùn)算,發(fā)送)
優(yōu)點(diǎn):
編碼簡單
不存在共享資源競爭
并發(fā)安全
缺點(diǎn):
單線程處理大量鏈路時,性能無法支撐,不能合理利用多核處理
線程過載后,處理速度變慢,會導(dǎo)致消息積壓
一旦線程掛掉,整個通信層不可用 redis使用的就是reactor單進(jìn)程模型,redis由于都是內(nèi)存級操作,所以使用此模式?jīng)]什么問題
reactor單線程模型圖:
netty reactor單線程模型圖:
Netty對應(yīng)實(shí)現(xiàn)方式
// Netty對應(yīng)實(shí)現(xiàn)方式:創(chuàng)建io線程組是,boss和worker,使用同一個線程組,并且線程數(shù)為1 EventLoopGroup ioGroup = new NioEventLoopGroup(1); b.group(ioGroup, ioGroup) .channel(NioServerSocketChannel.class) .childHandler(initializer); ChannelFuture f = b.bind(portNumner); cf = f.sync(); f.get();
根據(jù)單線程模型,io處理中最耗時的編碼,解碼,邏輯運(yùn)算等cpu消耗較多的部分,可提取出來使用多線程實(shí)現(xiàn),并充分利用多核cpu的優(yōu)勢
優(yōu)點(diǎn):
多線程處理邏輯運(yùn)算,提高多核CPU利用率
缺點(diǎn):
對于單Reactor來說,大量鏈接的IO事件處理依然是性能瓶頸
reactor多線程模型圖:
netty reactor多線程模型圖:
Netty對應(yīng)實(shí)現(xiàn)方式
// Netty對應(yīng)實(shí)現(xiàn)方式:創(chuàng)建io線程組是,boss和worker,使用同一個線程組,并且線程數(shù)為1,把邏輯運(yùn)算部分投遞到用戶自定義線程處理 EventLoopGroup ioGroup = new NioEventLoopGroup(1); b.group(ioGroup, ioGroup) .channel(NioServerSocketChannel.class) .childHandler(initializer); ChannelFuture f = b.bind(portNumner); cf = f.sync(); f.get();
根據(jù)多線程模型,可把它的性能瓶頸做進(jìn)一步優(yōu)化,即把reactor由單個改為reactor線程池,把原來的reactor分為mainReactor和subReactor
優(yōu)點(diǎn):
解決單Reactor的性能瓶頸問題(Netty/Nginx采用這種設(shè)計)
reactor主從多線程模型圖:
netty reactor主從多線程模型圖:
Netty對應(yīng)實(shí)現(xiàn)方式
// Netty對應(yīng)實(shí)現(xiàn)方式:創(chuàng)建io線程組boss和worker,boss線程數(shù)為1,work線程數(shù)為cpu*2(一般IO密集可設(shè)置為2倍cpu核數(shù)) EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(initializer); ChannelFuture f = b.bind(portNumner); cf = f.sync(); f.get();
創(chuàng)建group實(shí)例
// 1.構(gòu)造參數(shù)不傳或傳0,默認(rèn)取系統(tǒng)參數(shù)配置,沒有參數(shù)配置,取CPU核數(shù)*2 super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); private static final int DEFAULT_EVENT_LOOP_THREADS; static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)); } // 2.不同版本的JDK會有不同版本的SelectorProvider實(shí)現(xiàn),Windows下的是WindowsSelectorProvider public NioEventLoopGroup(int nThreads, Executor executor) { //默認(rèn)selector,最終實(shí)現(xiàn)類似:https://github.com/frohoff/jdk8u-jdk/blob/master/src/macosx/classes/sun/nio/ch/DefaultSelectorProvider.java //basic flow: 1 java.nio.channels.spi.SelectorProvider 2 META-INF/services 3 default this(nThreads, executor, SelectorProvider.provider()); } // 3.創(chuàng)建nThread個EventExecutor,并封裝到選擇器chooser,chooser會根據(jù)線程數(shù)分別有兩種實(shí)現(xiàn)(GenericEventExecutorChooser和PowerOfTwoEventExecutorChooser,算法不同,但實(shí)現(xiàn)邏輯一樣,就是均勻的分配線程處理) EventExecutorChooserFactory.EventExecutorChooser chooser; children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { // ... children[i] = newChild(executor, args); // ... } chooser = chooserFactory.newChooser(children);
設(shè)置group
// 兩種方式設(shè)置group // parent和child使用同一個group,調(diào)用仍然是分別設(shè)置parent和child @Override public ServerBootstrap group(EventLoopGroup group) { return group(group, group); } ServerBootstrap.group(EventLoopGroup parentGroup, EventLoopGroup childGroup){ // 具體代碼略,可直接參考源碼 // 里面實(shí)現(xiàn)內(nèi)容是把parentGroup綁定到this.group,把childGroup綁定到this.childGroup }
Netty啟動
// 調(diào)用順序 ServerBootstrap:bind() -> doBind() -> initAndRegister() private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); // ... doBind0(regFuture, channel, localAddress, promise); // ... } final ChannelFuture initAndRegister() { // 創(chuàng)建ServerSocketChannel Channel channel = channelFactory.newChannel(); // ... // 開始register ChannelFuture regFuture = config().group().register(channel); // register調(diào)用順序 // next().register(channel) -> (EventLoop) super.next() -> chooser.next() // ... }
由以上源碼可得知,bind只在起服調(diào)用一次,因此bossGroup僅調(diào)用一次regist,也就是僅調(diào)用一次next,因此只有一根線程是有用的,其余線程都是廢棄的,所以bossGroup線程數(shù)設(shè)置為1即可
// 啟動BossGroup線程并綁定本地SocketAddress private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
客戶端連接
// 消息事件讀取 NioEventLoop.run() -> processSelectedKeys() -> ... -> ServerBootstrapAcceptor.channelRead // ServerBootstrapAcceptor.channelRead處理客戶端連接事件 // 最后一行的childGroup.register的邏輯和上面的代碼調(diào)用處一樣 public void channelRead(ChannelHandlerContext ctx, Object msg) { child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); childGroup.register(child) }
select(時間復(fù)雜度O(n)):用一個fd數(shù)組保存所有的socket,然后通過死循環(huán)遍歷調(diào)用操作系統(tǒng)的select方法找到就緒的fd
while(1) { nready = select(list); // 用戶層依然要遍歷,只不過少了很多無效的系統(tǒng)調(diào)用 for(fd <-- fdlist) { if(fd != -1) { // 只讀已就緒的文件描述符 read(fd, buf); // 總共只有 nready 個已就緒描述符,不用過多遍歷 if(--nready == 0) break; } } }
poll(時間復(fù)雜度O(n)):同select,不過把fd數(shù)組換成了fd鏈表,去掉了fd最大連接數(shù)(1024個)的數(shù)量限制
epoll(時間復(fù)雜度O(1)):解決了select/poll的幾個缺陷
調(diào)用需傳入整個fd數(shù)組或fd鏈表,需要拷貝數(shù)據(jù)到內(nèi)核
內(nèi)核層需要遍歷檢查文件描述符的就緒狀態(tài)
內(nèi)核僅返回可讀文件描述符個數(shù),用戶仍需自己遍歷所有fd
epoll是操作系統(tǒng)基于事件關(guān)聯(lián)fd,做了以下優(yōu)化:
內(nèi)核中保存一份文件描述符集合,無需用戶每次都重新傳入,只需告訴內(nèi)核修改的部分即可。(epoll_ctl)
內(nèi)核不再通過輪詢的方式找到就緒的文件描述符,而是通過異步 IO 事件喚醒。(epoll_wait)
內(nèi)核僅會將有 IO 事件的文件描述符返回給用戶,用戶也無需遍歷整個文件描述符集合。
epoll僅在Linux系統(tǒng)上支持
// DefaultSelectorProvider.create方法在不同版本的jdk下有不同實(shí)現(xiàn),創(chuàng)建不同Selector // Windows版本的jdk,其實(shí)現(xiàn)中調(diào)用的是native的poll方法 public static SelectorProvider create() { return new WindowsSelectorProvider(); } // Linux版本的jdk public static SelectorProvider create() { String str = (String)AccessController.doPrivileged(new GetPropertyAction("os.name")); if (str.equals("SunOS")) { return createProvider("sun.nio.ch.DevPollSelectorProvider"); } if (str.equals("Linux")) { return createProvider("sun.nio.ch.EPollSelectorProvider"); } return new PollSelectorProvider(); }
netty依然基于epoll做了一層封裝,主要做了以下事情:
(1)java的nio默認(rèn)使用水平觸發(fā),Netty的Epoll默認(rèn)使用邊緣觸發(fā),且可配置
邊緣觸發(fā):當(dāng)狀態(tài)變化時才會發(fā)生io事件。
水平觸發(fā):只要滿足條件,就觸發(fā)一個事件(只要有數(shù)據(jù)沒有被獲取,內(nèi)核就不斷通知你)
(2)Netty的Epoll提供更多的nio的可配參數(shù)。
(3)調(diào)用c代碼,更少gc,更少synchronized 具體可以參考源碼NioEventLoop.run和EpollEventLoop.run進(jìn)行對比
線程組類圖
channel類圖
// 創(chuàng)建指定的EventLoopGroup bossGroup = new EpollEventLoopGroup(1, new DefaultThreadFactory("BOSS_LOOP")); workerGroup = new EpollEventLoopGroup(32, new DefaultThreadFactory("IO_LOOP")); b.group(bossGroup, workerGroup) // 指定channel的class .channel(EpollServerSocketChannel.class) .childHandler(initializer); // 其中channel(clz)方法是通過class來new一個反射ServerSocketChannel創(chuàng)建工廠類 public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); } final ChannelFuture initAndRegister() { // ... Channel channel = channelFactory.newChannel(); // ... }
childOption(ChannelOption.SO_KEEPALIVE, true)
TCP鏈路探活
option(ChannelOption.SO_REUSEADDR, true)
重用處于TIME_WAIT但是未完全關(guān)閉的socket地址,讓端口釋放后可立即被重用。默認(rèn)關(guān)閉,需要手動開啟
childOption(ChannelOption.TCP_NODELAY, true)
IP報文格式
TCP報文格式
開啟則禁用TCP Negal算法,優(yōu)點(diǎn)低延時,缺點(diǎn)在大量小數(shù)據(jù)包的情況下,網(wǎng)絡(luò)利用率低
關(guān)閉則開啟TCP Negal算法,優(yōu)點(diǎn)提高網(wǎng)絡(luò)利用率(數(shù)據(jù)緩存到一定量才發(fā)送),缺點(diǎn)延時高
Negal算法
如果包長度達(dá)到MSS(maximum segment size最大分段長度),則允許發(fā)送;
如果該包含有FIN,則允許發(fā)送;
設(shè)置了TCP_NODELAY選項(xiàng),則允許發(fā)送;
未設(shè)置TCP_CORK選項(xiàng)(是否阻塞不完整報文)時,若所有發(fā)出去的小數(shù)據(jù)包(包長度小于MSS)均被確認(rèn),則允許發(fā)送;
上述條件都未滿足,但發(fā)生了超時(一般為200ms),則立即發(fā)送。
MSS計算規(guī)則 MSS的值是在TCP三次握手建立連接的過程中,經(jīng)通信雙方協(xié)商確定的 802.3標(biāo)準(zhǔn)里,規(guī)定了一個以太幀的數(shù)據(jù)部分(Payload)的最大長度是1500個字節(jié)(MTU)
MSS = MTU - IP首部 - TCP首部
以太網(wǎng)環(huán)境下:
MTU = 1500字節(jié)
IP首部 = 32*5/4 = 160bit = 20字節(jié)
TCP首部 = 32*5/4 = 160bit = 20字節(jié)
最終得出MSS = 1460字節(jié)
結(jié)論:因?yàn)橛螒蚍?wù)器的實(shí)時性要求,在網(wǎng)絡(luò)帶寬足夠的情況下,建議開啟TCP_NODELAY,關(guān)閉Negal算法,帶寬可以浪費(fèi),響應(yīng)必須及時
注意:需要客戶端服務(wù)器均關(guān)閉Negal算法,否則仍然會有延遲發(fā)送,影響傳輸速度
option(ChannelOption.SO_BACKLOG, 100)
操作系統(tǒng)內(nèi)核中維護(hù)的兩個隊列
syns queue:保存syn到達(dá),但沒完成三次握手的半連接
cat /proc/sys/net/ipv4/tcp_max_syn_backlog
accpet queue:保存完成三次握手,內(nèi)核等待accept調(diào)用的連接
cat /proc/sys/net/core/somaxconn
netty對于backlog的默認(rèn)值設(shè)置在NetUtil類253行
SOMAXCONN = AccessController.doPrivileged(new PrivilegedAction<Integer>() { @Override public Integer run() { // 1.設(shè)置默認(rèn)值 int somaxconn = PlatformDependent.isWindows() ? 200 : 128; File file = new File("/proc/sys/net/core/somaxconn"); if (file.exists()) { // 2.文件存在,讀取操作系統(tǒng)配置 in = new BufferedReader(new FileReader(file)); somaxconn = Integer.parseInt(in.readLine()); } else { // 3.文件不存在,從各個參數(shù)中讀取 if (SystemPropertyUtil.getBoolean("io.netty.net.somaxconn.trySysctl", false)) { tmp = sysctlGetInt("kern.ipc.somaxconn"); if (tmp == null) { tmp = sysctlGetInt("kern.ipc.soacceptqueue"); if (tmp != null) { somaxconn = tmp; } } else { somaxconn = tmp; } } } } }
結(jié)論:
Linux下/proc/sys/net/core/somaxconn一定存在,所以backlog一定取得它的值,我參考prod機(jī)器的參數(shù)配置的65535,也就是不設(shè)置backlog的情況下,服務(wù)器運(yùn)行緩存65535個全連接
默認(rèn)分配ByteBuffAllocator賦值如下: ByteBufUtil.java
static { //以io.netty.allocator.type為準(zhǔn),沒有的話,安卓平臺用非池化實(shí)現(xiàn),其他用池化實(shí)現(xiàn) String allocType = SystemPropertyUtil.get( "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled"); allocType = allocType.toLowerCase(Locale.US).trim(); ByteBufAllocator alloc; if ("unpooled".equals(allocType)) { alloc = UnpooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else if ("pooled".equals(allocType)) { alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: {}", allocType); } else { //io.netty.allocator.type設(shè)置的不是"unpooled"或者"pooled",就用池化實(shí)現(xiàn)。 alloc = PooledByteBufAllocator.DEFAULT; logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType); } DEFAULT_ALLOCATOR = alloc; }
RCVBUF_ALLOCATOR默認(rèn)AdaptiveRecvByteBufAllocator
public class DefaultChannelConfig implements ChannelConfig { // ... public DefaultChannelConfig(Channel channel) { this(channel, new AdaptiveRecvByteBufAllocator()); } // ... }
/** * Shortcut method for {@link #shutdownGracefully(long, long, TimeUnit)} with sensible default values. * * @return the {@link #terminationFuture()} */ Future<?> shutdownGracefully(); /** * Signals this executor that the caller wants the executor to be shut down. Once this method is called, * {@link #isShuttingDown()} starts to return {@code true}, and the executor prepares to shut itself down. * Unlike {@link #shutdown()}, graceful shutdown ensures that no tasks are submitted for <i>'the quiet period'</i> * (usually a couple seconds) before it shuts itself down. If a task is submitted during the quiet period, * it is guaranteed to be accepted and the quiet period will start over. * * @param quietPeriod the quiet period as described in the documentation 靜默期:在此期間,仍然可以提交任務(wù) * @param timeout the maximum amount of time to wait until the executor is {@linkplain #shutdown()} * regardless if a task was submitted during the quiet period 超時時間:等待所有任務(wù)執(zhí)行完的最大時間 * @param unit the unit of {@code quietPeriod} and {@code timeout} * * @return the {@link #terminationFuture()} */ Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit); // 抽象類中的實(shí)現(xiàn) static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2; static final long DEFAULT_SHUTDOWN_TIMEOUT = 15; @Override public Future<?> shutdownGracefully() { return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS); }
把NIO線程的狀態(tài)位設(shè)置成ST_SHUTTING_DOWN狀態(tài),不再處理新的消息(不允許再對外發(fā)送消息);
退出前的預(yù)處理操作:把發(fā)送隊列中尚未發(fā)送或者正在發(fā)送的消息發(fā)送完、把已經(jīng)到期或者在退出超時之前到期的定時任務(wù)執(zhí)行完成、把用戶注冊到NIO線程的退出Hook任務(wù)執(zhí)行完成;
資源的釋放操作:所有Channel的釋放、多路復(fù)用器的去注冊和關(guān)閉、所有隊列和定時任務(wù)的清空取消,最后是NIO線程的退出。
到此,關(guān)于“游戲服務(wù)器中的Netty應(yīng)用怎么實(shí)現(xiàn)”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。