溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么用MINA、Netty、Twisted來實現(xiàn)消息分割

發(fā)布時間:2021-12-28 15:43:06 來源:億速云 閱讀:123 作者:小新 欄目:互聯(lián)網(wǎng)科技

這篇文章主要介紹了怎么用MINA、Netty、Twisted來實現(xiàn)消息分割,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

本文介紹一種消息分割方式,use a fixed length header that indicates the length of the body,用一個固定字節(jié)數(shù)的Header前綴來指定Body的字節(jié)數(shù),以此來分割消息。

怎么用MINA、Netty、Twisted來實現(xiàn)消息分割  固定字節(jié)數(shù)的Header前綴來指定Body的字節(jié)數(shù)

上面圖中 Header 固定為 4 字節(jié),Header 中保存的是一個 4 字節(jié)(32位)的整數(shù),例如 12 即為 0x0000000C,這個整數(shù)用來指定 Body 的長度(字節(jié)數(shù))。當(dāng)讀完這么多字節(jié)的 Body 之后,又是下一條消息的 Header。

下面分別用MINA、Netty、Twisted來實現(xiàn)對這種消息的切合和解碼。

MINA

MINA 提供了 PrefixedStringCodecFactory 來對這種類型的消息進行編碼解碼,PrefixedStringCodecFactory 默認 Header 的大小是4字節(jié),當(dāng)然也可以指定成1或2。

public class TcpServer {

 public static void main(String[] args) throws IOException {
   IoAcceptor acceptor = new NioSocketAcceptor();

   // 4字節(jié)的Header指定Body的字節(jié)數(shù),對這種消息的處理
   acceptor.getFilterChain().addLast("codec",
       new ProtocolCodecFilter(new PrefixedStringCodecFactory(Charset.forName("UTF-8"))));

   acceptor.setHandler(new TcpServerHandle());
   acceptor.bind(new InetSocketAddress(8080));
 }

}

class TcpServerHandle extends IoHandlerAdapter {

 @Override
 public void exceptionCaught(IoSession session, Throwable cause)
     throws Exception {
   cause.printStackTrace();
 }

 // 接收到新的數(shù)據(jù)
 @Override
 public void messageReceived(IoSession session, Object message)
     throws Exception {

   String msg = (String) message;
   System.out.println("messageReceived:" + msg);

 }

 @Override
 public void sessionCreated(IoSession session) throws Exception {
   System.out.println("sessionCreated");
 }

 @Override
 public void sessionClosed(IoSession session) throws Exception {
   System.out.println("sessionClosed");
 }
}

Netty

Netty 使用 LengthFieldBasedFrameDecoder 來處理這種消息。下面代碼中的new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4)中包含5個參數(shù),分別是int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip。maxFrameLength為消息的最大長度,lengthFieldOffset為Header的位置,lengthFieldLength為Header的長度,lengthAdjustment為長度調(diào)整(默認Header中的值表示Body的長度,并不包含Header自己),initialBytesToStrip為去掉字節(jié)數(shù)(默認解碼后返回Header+Body的全部內(nèi)容,這里設(shè)為4表示去掉4字節(jié)的Header,只留下Body)。

public class TcpServer {

 public static void main(String[] args) throws InterruptedException {
   EventLoopGroup bossGroup = new NioEventLoopGroup();
   EventLoopGroup workerGroup = new NioEventLoopGroup();
   try {
     ServerBootstrap b = new ServerBootstrap();
     b.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .childHandler(new ChannelInitializer<SocketChannel>() {
           @Override
           public void initChannel(SocketChannel ch)
               throws Exception {
             ChannelPipeline pipeline = ch.pipeline();

             // LengthFieldBasedFrameDecoder按行分割消息,取出body
             pipeline.addLast(new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4));
             // 再按UTF-8編碼轉(zhuǎn)成字符串
             pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));

             pipeline.addLast(new TcpServerHandler());
           }
         });
     ChannelFuture f = b.bind(8080).sync();
     f.channel().closeFuture().sync();
   } finally {
     workerGroup.shutdownGracefully();
     bossGroup.shutdownGracefully();
   }
 }

}

class TcpServerHandler extends ChannelInboundHandlerAdapter {

 // 接收到新的數(shù)據(jù)
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) {

   String message = (String) msg;
   System.out.println("channelRead:" + message);
 }

 @Override
 public void channelActive(ChannelHandlerContext ctx) {
   System.out.println("channelActive");
 }

 @Override
 public void channelInactive(ChannelHandlerContext ctx) {
   System.out.println("channelInactive");
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
   cause.printStackTrace();
   ctx.close();
 }
}

Twisted

在Twisted中需要繼承Int32StringReceiver,不再繼承Protocol。Int32StringReceiver表示固定32位(4字節(jié))的Header,另外還有Int16StringReceiver、Int8StringReceiver等。而需要實現(xiàn)的接受數(shù)據(jù)事件的方法不再是dataReceived,也不是lineReceived,而是stringReceived。

# -*- coding:utf-8 –*-

from twisted.protocols.basic import Int32StringReceiver
from twisted.internet.protocol import Factory
from twisted.internet import reactor

class TcpServerHandle(Int32StringReceiver):

   # 新的連接建立
   def connectionMade(self):
       print 'connectionMade'

   # 連接斷開
   def connectionLost(self, reason):
       print 'connectionLost'

   # 接收到新的數(shù)據(jù)
   def stringReceived(self, data):
       print 'stringReceived:' + data

factory = Factory()
factory.protocol = TcpServerHandle
reactor.listenTCP(8080, factory)
reactor.run()

下面是Java編寫的一個客戶端測試程序:

public class TcpClient {

 public static void main(String[] args) throws IOException {

   Socket socket = null;
   DataOutputStream out = null;

   try {

     socket = new Socket("localhost", 8080);
     out = new DataOutputStream(socket.getOutputStream());

     // 請求服務(wù)器
     String data1 = "牛頓";
     byte[] outputBytes1 = data1.getBytes("UTF-8");
     out.writeInt(outputBytes1.length); // write header
     out.write(outputBytes1); // write body

     String data2 = "愛因斯坦";
     byte[] outputBytes2 = data2.getBytes("UTF-8");
     out.writeInt(outputBytes2.length); // write header
     out.write(outputBytes2); // write body

     out.flush();

   } finally {
     // 關(guān)閉連接
     out.close();
     socket.close();
   }

 }

}

MINA服務(wù)器輸出結(jié)果:

sessionCreated

messageReceived:牛頓

messageReceived:愛因斯坦

sessionClosed

Netty服務(wù)器輸出結(jié)果:

channelActive

channelRead:牛頓

channelRead:愛因斯坦

channelInactive

Twisted服務(wù)器輸出結(jié)果:

connectionMade

stringReceived:牛頓

stringReceived:愛因斯坦

connectionLost


感謝你能夠認真閱讀完這篇文章,希望小編分享的“怎么用MINA、Netty、Twisted來實現(xiàn)消息分割”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI