溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Netty、MINA、Twisted中線程模型的示例分析

發(fā)布時間:2021-12-28 15:45:26 來源:億速云 閱讀:163 作者:小新 欄目:開發(fā)技術

這篇文章主要介紹了Netty、MINA、Twisted中線程模型的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

要想開發(fā)一個高性能的TCP服務器,熟悉所使用框架的線程模型非常重要。MINA、Netty、Twisted本身都是高性能的網(wǎng)絡框架,如果再搭配上高效率的代碼,才能實現(xiàn)一個高大上的服務器。但是如果不了解它們的線程模型,就很難寫出高性能的代碼??蚣鼙旧硇试俑?,程序?qū)懙奶?,那么服務器整體的性能也不會太高。就像一個電腦,CPU再好,內(nèi)存小硬盤慢散熱差,整體的性能也不會太高。

玩過Android開發(fā)的同學會知道,在Android應用中有一個非常重要線程:UI線程(即主線程)。UI線程是負責一個Android的界面顯示以及和用戶交互。Activity的一些方法,例如onCreate、onStop、onDestroy都是運行在UI線程中的。但是在編寫Activity代碼的時候有一點需要非常注意,就是絕對不能把阻塞的或者耗時的任務寫在這些方法中,如果寫在這些方法中,則會阻塞UI線程,導致用戶操作的界面反應遲鈍,體驗很差。所以在Android開發(fā)中,耗時或者阻塞的任務會另外開線程去做。

同樣在MINA、Netty、Twisted中,也有一個非常重要的線程:IO線程。

傳統(tǒng)的BIO實現(xiàn)的TCP服務器,特別對于TCP長連接,通常都要為每個連接開啟一個線程,線程也是操作系統(tǒng)的一種資源,所以很難實現(xiàn)高性能高并發(fā)。而異步IO實現(xiàn)的TCP服務器,由于IO操作都是異步的,可以用一個線程或者少量線程來處理大量連接的IO操作,所以只需要少量的IO線程就可以實現(xiàn)高并發(fā)的服務器。

在網(wǎng)絡編程過程中,通常有一些業(yè)務邏輯是比較耗時、阻塞的,例如數(shù)據(jù)庫操作,如果網(wǎng)絡不好,加上數(shù)據(jù)庫性能差,SQL不夠優(yōu)化,數(shù)據(jù)量大,一條SQL可能會執(zhí)行很久。由于IO線程本身數(shù)量就不多,通常只有一個或幾個,而如果這種耗時阻塞的代碼在IO線程中運行的話,IO線程的其他事情,例如網(wǎng)絡read和write,就無法進行了,會影響IO性能以及整個服務器的性能。

所以,無論是使用MINA、Netty、Twisted,如果有耗時的任務,就絕對不能在IO線程中運行,而是要另外開啟線程來處理。

1. MINA

在MINA中,有三種非常重要的線程:Acceptor thread、Connector thread、I/O processor thread。

Acceptor thread:這個線程用于TCP服務器接收新的連接,并將連接分配到I/O processor thread,由I/O processor thread來處理IO操作。每個NioSocketAcceptor創(chuàng)建一個Acceptor thread,線程數(shù)量不可配置。

Connector thread:用于處理TCP客戶端連接到服務器,并將連接分配到I/O processor thread,由I/O processor thread來處理IO操作。每個NioSocketConnector創(chuàng)建一個Connector thread,線程數(shù)量不可配置。

I/O processor thread:用于處理TCP連接的I/O操作,如read、write。I/O processor thread的線程數(shù)量可通過NioSocketAcceptor或NioSocketConnector構造方法來配置,默認是CPU核心數(shù)+1。

由于本文主要介紹TCP服務器的線程模型,所以就沒有Connector thread什么事了。下面說下Acceptor thread和I/O processor thread處理TCP連接的流程:

MINA的TCP服務器包含一個Acceptor thread和多個I/O processor thread,當有新的客戶端連接到服務器,首先會由Acceptor thread獲取到這個連接,同時將這個連接分配給多個I/O processor thread中的一個線程,當客戶端發(fā)送數(shù)據(jù)給服務器,對應的I/O processor thread負責讀取這個數(shù)據(jù),并執(zhí)行IoFilterChain中的IoFilter以及IoHandle。

由于I/O processor thread本身數(shù)量有限,通常就那么幾個,但是又要處理成千上萬個連接的IO操作,包括read、write、協(xié)議的編碼解碼、各種Filter以及IoHandle中的業(yè)務邏輯,特別是業(yè)務邏輯,比如IoHandle的messageReceived,如果有耗時、阻塞的任務,例如查詢數(shù)據(jù)庫,那么就會阻塞I/O processor thread,導致無法及時處理其他IO事件,服務器性能下降。

針對這個問題,MINA中提供了一個ExecutorFilter,用于將需要執(zhí)行很長時間的會阻塞I/O processor thread的業(yè)務邏輯放到另外的線程中,這樣就不會阻塞I/O processor thread,不會影響IO操作。ExecutorFilter中包含一個線程池,默認是OrderedThreadPoolExecutor,這個線程池保證同一個連接的多個事件按順序依次執(zhí)行,另外還可以使用UnorderedThreadPoolExecutor,它不會保證同一連接的事件的執(zhí)行順序,并且可能會并發(fā)執(zhí)行。二者之間可以根據(jù)需要來選擇。

public class TcpServer {

 public static void main(String[] args) throws IOException {
   IoAcceptor acceptor = new NioSocketAcceptor(4); // 配置I/O processor thread線程數(shù)量
   acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));
   acceptor.getFilterChain().addLast("executor", new ExecutorFilter()); // 將TcpServerHandle中的業(yè)務邏輯拿到ExecutorFilter的線程池中執(zhí)行
   acceptor.setHandler(new TcpServerHandle());
   acceptor.bind(new InetSocketAddress(8080));
 }

}

class TcpServerHandle extends IoHandlerAdapter {

 @Override
 public void messageReceived(IoSession session, Object message)
     throws Exception {

   // 假設這里有個變態(tài)的SQL要執(zhí)行3秒
   Thread.sleep(3000);
 }
}

2. Netty

Netty的TCP服務器啟動時,會創(chuàng)建兩個NioEventLoopGroup,一個boss,一個worker:

EventLoopGroup bossGroup = new NioEventLoopGroup();  
EventLoopGroup workerGroup = new NioEventLoopGroup();

NioEventLoopGroup實際上是一個線程組,可以通過構造方法設置線程數(shù)量,默認為CPU核心數(shù)*2。boss用于服務器接收新的TCP連接,boss線程接收到新的連接后將連接注冊到worker線程。worker線程用于處理IO操作,例如read、write。

Netty中的boss線程類似于MINA的Acceptor thread,work線程和MINA的I/O processor thread類似。不同的一點是MINA的Acceptor thread是單個線程,而Netty的boss是一個線程組。實際上Netty的ServerBootstrap可以監(jiān)聽多個端口號,如果只監(jiān)聽一個端口號,那么只需要一個boss線程即可,推薦將bossGroup的線程數(shù)量設置成1。

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

當有新的TCP客戶端連接到服務器,將由boss線程來接收連接,然后將連接注冊到worker線程,當客戶端發(fā)送數(shù)據(jù)到服務器,worker線程負責接收數(shù)據(jù),并執(zhí)行ChannelPipeline中的ChannelHandler。

和MINA的I/O processor thread 類似,Netty的worker線程本身數(shù)量不多,而且要實時處理IO事件,如果有耗時的業(yè)務邏輯阻塞住worker線程,例如在channelRead中執(zhí)行一個耗時的數(shù)據(jù)庫查詢,會導致IO操作無法進行,服務器整體性能就會下降。

在Netty 3中,存在一個ExecutionHandler,它是ChannelHandler的一個實現(xiàn)類,用于處理耗時的業(yè)務邏輯,類似于MINA的ExecutorFilter,但是在Netty 4中被刪除了。所以這里不再介紹ExecutionHandler。

Netty 4中可以使用EventExecutorGroup來處理耗時的業(yè)務邏輯:

public class TcpServer {

 public static void main(String[] args) throws InterruptedException {
   EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 服務器監(jiān)聽一個端口號,boss線程數(shù)建議設置成1
   EventLoopGroup workerGroup = new NioEventLoopGroup(4); // worker線程數(shù)設置成4
   try {
     ServerBootstrap b = new ServerBootstrap();
     b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .childHandler(new ChannelInitializer<SocketChannel>() {

           // 創(chuàng)建一個16個線程的線程組來處理耗時的業(yè)務邏輯
           private EventExecutorGroup group = new DefaultEventExecutorGroup(16);

           @Override
           public void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline pipeline = ch.pipeline();
             pipeline.addLast(new LineBasedFrameDecoder(80));
             pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));

             // 將TcpServerHandler中的業(yè)務邏輯放到EventExecutorGroup線程組中執(zhí)行
             pipeline.addLast(group, new TcpServerHandler());
           }
         });
     ChannelFuture f = b.bind(8080).sync();
     f.channel().closeFuture().sync();
   } finally {
     workerGroup.shutdownGracefully();
     bossGroup.shutdownGracefully();
   }
 }

}

class TcpServerHandler extends ChannelInboundHandlerAdapter {

 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException {

   // 假設這里有個變態(tài)的SQL要執(zhí)行3秒
   Thread.sleep(3000);

 }
}

3.Twisted

Twisted的線程模型是最簡單粗暴的:單線程,即reactor線程。也就是,所有的IO操作、編碼解碼、業(yè)務邏輯等都是在一個線程中執(zhí)行。實際上,即使是單線程,其性能也是非常高的,可以同時處理大量的連接。在單線程的環(huán)境下編程,不需要考慮線程安全的問題。不過,單線程帶來一個問題,就是耗時的業(yè)務邏輯,如果運行在reactor線程中,那么其他事情,例如網(wǎng)絡IO,就要等到reactor線程空閑時才能繼續(xù)做,會影響到服務器的性能。

下面的代碼,通過reactor.callInThread將耗時的業(yè)務邏輯放到單獨的線程池中執(zhí)行,而不在reactor線程中運行。這樣就不會影響到reactor線程的網(wǎng)絡IO了??梢酝ㄟ^reactor.suggestThreadPoolSize設置這個線程池的線程數(shù)量。

# -*- coding:utf-8 –*-

import time
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import Factory
from twisted.internet import reactor

# 耗時、阻塞的業(yè)務邏輯
def logic(data):
   print data
   time.sleep(3) # 假設這里有個變態(tài)的SQL要執(zhí)行3秒    

class TcpServerHandle(Protocol):

   def dataReceived(self, data):
       reactor.callInThread(logic, data) # 在線程池中運行l(wèi)ogic(data)耗時任務,不在reactor線程中運行

reactor.suggestThreadPoolSize(8) # 設置線程池的線程數(shù)量為8

factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()

由于Twisted的reactor的單線程設計,它的很多代碼都不是線程安全的。所以在非reactor線程中執(zhí)行的代碼需要注意線程安全問題。例如transport.write就不是線程安全的。不過在非reactor線程中可以調(diào)用reactor.callFromThread方法,這個方法功能和callInThread相反,將一個函數(shù)從別的線程放到reactor線程中運行。不過還是要注意,reactor.callFromThread調(diào)用的函數(shù)由于運行在reactor線程中,如果運行耗時,同樣會阻塞reactor線程,影響IO。

# -*- coding:utf-8 –*-

import time
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import Factory
from twisted.internet import reactor

# 非線程安全的代碼
def notThreadSafe():
   print "notThreadSafe"

# 耗時、阻塞的業(yè)務邏輯
def logic(data):
   print data
   time.sleep(3) # 假設這里有個變態(tài)的SQL要執(zhí)行3秒
   reactor.callFromThread(notThreadSafe) # 在reactor線程中運行notThreadSafe()


class TcpServerHandle(Protocol):

   def dataReceived(self, data):
       reactor.callInThread(logic, data) # 在線程池中運行l(wèi)ogic(data)耗時任務,不在reactor線程中運行

reactor.suggestThreadPoolSize(8) # 設置線程池的線程數(shù)量為8

factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()

此外,twisted.internet.threads中提供了許多很方便的函數(shù)。例如threads.deferToThread用于將一個耗時任務放在線程池中執(zhí)行,與reactor.callInThread不同的是,它的返回值是Deferred類型,可以通過添加回調(diào)函數(shù),處理耗時任務完成后的結(jié)果(返回值)。

# -*- coding:utf-8 –*-

import time
from twisted.internet.protocol import Protocol
from twisted.internet.protocol import Factory
from twisted.internet import reactor, threads

# 耗時、阻塞的業(yè)務邏輯
def logic(data):
   print data
   time.sleep(3) # 假設這里有個變態(tài)的SQL要執(zhí)行3秒
   return "success"

# 回調(diào)函數(shù)
def logicSuccess(result):
   # result即為logic函數(shù)的返回值,即"success"
   print result

class TcpServerHandle(Protocol):

   def dataReceived(self, data):
       d = threads.deferToThread(logic, data) # 將耗時的業(yè)務邏輯logic(data)放到線程池中運行,deferToThread返回值類型是Deferred
       d.addCallback(logicSuccess) # 添加回調(diào)函數(shù)

reactor.suggestThreadPoolSize(8) # 設置線程池的線程數(shù)量為8

factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()

感謝你能夠認真閱讀完這篇文章,希望小編分享的“Netty、MINA、Twisted中線程模型的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業(yè)資訊頻道,更多相關知識等著你來學習!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI