溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何解決Socket粘包問題

發(fā)布時間:2021-10-20 13:51:35 來源:億速云 閱讀:160 作者:iii 欄目:web開發(fā)

本篇內(nèi)容介紹了“如何解決Socket粘包問題”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

問題一:TCP存在粘包問題嗎?

先說答案:TCP 本身并沒有粘包和半包一說,因為 TCP 本質(zhì)上只是一個傳輸控制協(xié)議(Transmission Control  Protocol,TCP),它是一種面向連接的、可靠的、基于字節(jié)流的傳輸層通信協(xié)議,由 IETF 的 RFC 793 定義。

所謂的協(xié)議本質(zhì)上是一個約定,就好比 Java  編程約定使用駝峰命名法一樣,約定的意義是為了讓通訊雙方,能夠正常的進(jìn)行消息互換的,那粘包和半包問題又是如何產(chǎn)生的呢?

這是因為在 TCP  的交互中,數(shù)據(jù)是以字節(jié)流的形式進(jìn)行傳輸?shù)?,而“流”的傳輸是沒有邊界的,因為沒有邊界所以就不能區(qū)分消息的歸屬,從而就會產(chǎn)生粘包和半包問題(粘包和半包的定義,詳見上一篇)。所以說  TCP 協(xié)議本身并不存在粘包和半包問題,只是在使用中如果不能有效的確定流的邊界就會產(chǎn)生粘包和半包問題。

問題二:分隔符是最優(yōu)解決方案?

坦白的說,經(jīng)過評論區(qū)大家的耐心“開導(dǎo)”,我也意識到了以結(jié)束符作為最終的解決方案存在一定的局限性,比如當(dāng)一條消息中間如果出現(xiàn)了結(jié)束符就會造成半包的問題,所以如果是復(fù)雜的字符串要對內(nèi)容進(jìn)行編碼和解碼處理,這樣才能保證結(jié)束符的正確性。

問題三:Socket 高效嗎?

這個問題的答案是否定的,其實上文在開頭已經(jīng)描述了應(yīng)用場景:「傳統(tǒng)的 Socket  編程」,學(xué)習(xí)它的意義就在于理解更早期更底層的一些知識,當(dāng)然作為補(bǔ)充本文會提供更加高效的消息通訊方案——Netty 通訊。

聊完了以上問題,接下來咱們先來補(bǔ)充一下上篇文章中提到的,將消息分為消息頭和消息體的代碼實現(xiàn)。

一、封裝消息頭和消息體

在開始寫服務(wù)器端和客戶端之前,咱們先來編寫一個消息的封裝類,使用它可以將消息封裝成消息頭和消息體,如下圖所示:

如何解決Socket粘包問題

消息頭中存儲消息體的長度,從而確定了消息的邊界,便解決粘包和半包問題。

1.消息封裝類

消息的封裝類中提供了兩個方法:一個是將消息轉(zhuǎn)換成消息頭 + 消息體的方法,另一個是讀取消息頭的方法,具體實現(xiàn)代碼如下:

/**  * 消息封裝類  */ class SocketPacket {     // 消息頭存儲的長度(占 8 字節(jié))     static final int HEAD_SIZE = 8;      /**      * 將協(xié)議封裝為:協(xié)議頭 + 協(xié)議體      * @param context 消息體(String 類型)      * @return byte[]      */     public byte[] toBytes(String context) {         // 協(xié)議體 byte 數(shù)組         byte[] bodyByte = context.getBytes();         int bodyByteLength = bodyByte.length;         // 最終封裝對象         byte[] result = new byte[HEAD_SIZE + bodyByteLength];         // 借助 NumberFormat 將 int 轉(zhuǎn)換為 byte[]         NumberFormat numberFormat = NumberFormat.getNumberInstance();         numberFormat.setMinimumIntegerDigits(HEAD_SIZE);         numberFormat.setGroupingUsed(false);         // 協(xié)議頭 byte 數(shù)組         byte[] headByte = numberFormat.format(bodyByteLength).getBytes();         // 封裝協(xié)議頭         System.arraycopy(headByte, 0, result, 0, HEAD_SIZE);         // 封裝協(xié)議體         System.arraycopy(bodyByte, 0, result, HEAD_SIZE, bodyByteLength);         return result;     }      /**      * 獲取消息頭的內(nèi)容(也就是消息體的長度)      * @param inputStream      * @return      */     public int getHeader(InputStream inputStream) throws IOException {         int result = 0;         byte[] bytes = new byte[HEAD_SIZE];         inputStream.read(bytes, 0, HEAD_SIZE);         // 得到消息體的字節(jié)長度         result = Integer.valueOf(new String(bytes));         return result;     } }

2.編寫客戶端

接下來我們來定義客戶端,在客戶端中我們添加一組待發(fā)送的消息,隨機(jī)給服務(wù)器端發(fā)送一個消息,實現(xiàn)代碼如下:

/**  * 客戶端  */ class MySocketClient {     public static void main(String[] args) throws IOException {         // 啟動 Socket 并嘗試連接服務(wù)器         Socket socket = new Socket("127.0.0.1", 9093);         // 發(fā)送消息合集(隨機(jī)發(fā)送一條消息)         final String[] message = {"Hi,Java.", "Hi,SQL~", "關(guān)注公眾號|Java中文社群."};         // 創(chuàng)建協(xié)議封裝對象         SocketPacket socketPacket = new SocketPacket();         try (OutputStream outputStream = socket.getOutputStream()) {             // 給服務(wù)器端發(fā)送 10 次消息             for (int i = 0; i < 10; i++) {                 // 隨機(jī)發(fā)送一條消息                 String msg = message[new Random().nextInt(message.length)];                 // 將內(nèi)容封裝為:協(xié)議頭+協(xié)議體                 byte[] bytes = socketPacket.toBytes(msg);                 // 發(fā)送消息                 outputStream.write(bytes, 0, bytes.length);                 outputStream.flush();             }         }     } }

3.編寫服務(wù)器端

服務(wù)器端我們使用線程池來處理每個客戶端的業(yè)務(wù)請求,實現(xiàn)代碼如下:

/**  * 服務(wù)器端  */ class MySocketServer {     public static void main(String[] args) throws IOException {         // 創(chuàng)建 Socket 服務(wù)器端         ServerSocket serverSocket = new ServerSocket(9093);         // 獲取客戶端連接         Socket clientSocket = serverSocket.accept();         // 使用線程池處理更多的客戶端         ThreadPoolExecutor threadPool = new ThreadPoolExecutor(100, 150, 100,                 TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000));         threadPool.submit(() -> {             // 客戶端消息處理             processMessage(clientSocket);         });     }     /**      * 客戶端消息處理      * @param clientSocket      */     private static void processMessage(Socket clientSocket) {         // Socket 封裝對象         SocketPacket socketPacket = new SocketPacket();         // 獲取客戶端發(fā)送的消息對象         try (InputStream inputStream = clientSocket.getInputStream()) {             while (true) {                 // 獲取消息頭(也就是消息體的長度)                 int bodyLength = socketPacket.getHeader(inputStream);                 // 消息體 byte 數(shù)組                 byte[] bodyByte = new byte[bodyLength];                 // 每次實際讀取字節(jié)數(shù)                 int readCount = 0;                 // 消息體賦值下標(biāo)                 int bodyIndex = 0;                 // 循環(huán)接收消息頭中定義的長度                 while (bodyIndex <= (bodyLength - 1) &&                         (readCount = inputStream.read(bodyByte, bodyIndex, bodyLength)) != -1) {                     bodyIndex += readCount;                 }                 bodyIndex = 0;                 // 成功接收到客戶端的消息并打印                 System.out.println("接收到客戶端的信息:" + new String(bodyByte));             }         } catch (IOException ioException) {             System.out.println(ioException.getMessage());         }     } }

以上程序的執(zhí)行結(jié)果如下:

如何解決Socket粘包問題

從上述結(jié)果可以看出,消息通訊正常,客戶端和服務(wù)器端的交互中并沒有出現(xiàn)粘包和半包的問題。

二、使用 Netty 實現(xiàn)高效通訊

以上的內(nèi)容都是針對傳統(tǒng) Socket 編程的,但要實現(xiàn)更加高效的通訊和連接對象的復(fù)用就要使用 NIO(Non-Blocking IO,非阻塞 IO)或者  AIO(Asynchronous IO,異步非阻塞 IO)了。

傳統(tǒng)的 Socket 編程是 BIO(Blocking IO,同步阻塞 IO),它和 NIO 和 AIO 的區(qū)別如下:

  • BIO 來自傳統(tǒng)的 java.io  包,它是基于流模型實現(xiàn)的,交互的方式是同步、阻塞方式,也就是說在讀入輸入流或者輸出流時,在讀寫動作完成之前,線程會一直阻塞在那里,它們之間的調(diào)用是可靠的線性順序。它的優(yōu)點就是代碼比較簡單、直觀;缺點就是  IO 的效率和擴(kuò)展性很低,容易成為應(yīng)用性能瓶頸。

  • NIO 是 Java 1.4 引入的 java.nio 包,提供了 Channel、Selector、Buffer  等新的抽象,可以構(gòu)建多路復(fù)用的、同步非阻塞 IO 程序,同時提供了更接近操作系統(tǒng)底層高性能的數(shù)據(jù)操作方式。

  • AIO 是 Java 1.7 之后引入的包,是 NIO 的升級版本,提供了異步非堵塞的 IO 操作方式,因此人們叫它 AIO(Asynchronous  IO),異步 IO 是基于事件和回調(diào)機(jī)制實現(xiàn)的,也就是應(yīng)用操作之后會直接返回,不會堵塞在那里,當(dāng)后臺處理完成,操作系統(tǒng)會通知相應(yīng)的線程進(jìn)行后續(xù)的操作。

PS:AIO 可以看作是 NIO 的升級,它也叫 NIO 2。

傳統(tǒng) Socket 的通訊流程:

如何解決Socket粘包問題

NIO 的通訊流程:

如何解決Socket粘包問題

使用 Netty 替代傳統(tǒng) NIO 編程

NIO 的設(shè)計思路雖然很好,但它的代碼編寫比較麻煩,比如 Buffer 的使用和 Selector  的編寫等。并且在面對斷線重連、包丟失和粘包等復(fù)雜問題時手動處理的成本都很大,因此我們通常會使用 Netty 框架來替代傳統(tǒng)的 NIO。

Netty 是什么?

Netty 是一個異步、事件驅(qū)動的用來做高性能、高可靠性的網(wǎng)絡(luò)應(yīng)用框架,使用它可以快速輕松地開發(fā)網(wǎng)絡(luò)應(yīng)用程序,極大的簡化了網(wǎng)絡(luò)編程的復(fù)雜度。

Netty 主要優(yōu)點有以下幾個:

  1. 鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)

  2. 框架設(shè)計優(yōu)雅,底層模型隨意切換適應(yīng)不同的網(wǎng)絡(luò)協(xié)議要求;

  3. 提供很多標(biāo)準(zhǔn)的協(xié)議、安全、編碼解碼的支持;

  4. 簡化了 NIO 使用中的諸多不便;

  5. 社區(qū)非常活躍,很多開源框架中都使用了 Netty 框架,如 Dubbo、RocketMQ、Spark 等。

Netty 主要包含以下 3 個部分,如下圖所示:

如何解決Socket粘包問題

圖片這 3 個部分的功能介紹如下。

1. Core 核心層

Core 核心層是 Netty 最精華的內(nèi)容,它提供了底層網(wǎng)絡(luò)通信的通用抽象和實現(xiàn),包括可擴(kuò)展的事件模型、通用的通信 API、支持零拷貝的 ByteBuf  等。

2. Protocol Support 協(xié)議支持層

協(xié)議支持層基本上覆蓋了主流協(xié)議的編解碼實現(xiàn),如 HTTP、SSL、Protobuf、壓縮、大文件傳輸、WebSocket、文本、二進(jìn)制等主流協(xié)議,此外  Netty 還支持自定義應(yīng)用層協(xié)議。Netty 豐富的協(xié)議支持降低了用戶的開發(fā)成本,基于 Netty 我們可以快速開發(fā) HTTP、WebSocket  等服務(wù)。

3. Transport Service 傳輸服務(wù)層

傳輸服務(wù)層提供了網(wǎng)絡(luò)傳輸能力的定義和實現(xiàn)方法。它支持 Socket、HTTP 隧道、虛擬機(jī)管道等傳輸方式。Netty 對 TCP、UDP  等數(shù)據(jù)傳輸做了抽象和封裝,用戶可以更聚焦在業(yè)務(wù)邏輯實現(xiàn)上,而不必關(guān)系底層數(shù)據(jù)傳輸?shù)募?xì)節(jié)。

Netty 使用

對 Netty 有了大概的認(rèn)識之后,接下來我們用 Netty  來編寫一個基礎(chǔ)的通訊服務(wù)器,它包含兩個端:服務(wù)器端和客戶端,客戶端負(fù)責(zé)發(fā)送消息,服務(wù)器端負(fù)責(zé)接收并打印消息,具體的實現(xiàn)步驟如下。

1.添加 Netty 框架

首先我們需要先添加 Netty 框架的支持,如果是 Maven 項目添加如下配置即可:

<!-- 添加 Netty 框架 --> <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency>     <groupId>io.netty</groupId>     <artifactId>netty-all</artifactId>     <version>4.1.56.Final</version> </dependency>

Netty 版本說明

Netty 的 3.x 和 4.x 為主流的穩(wěn)定版本,而最新的 5.x 已經(jīng)是放棄的測試版了,因此推薦使用 Netty 4.x 的最新穩(wěn)定版。

2. 服務(wù)器端實現(xiàn)代碼

按照官方的推薦,這里將服務(wù)器端的代碼分為以下 3 個部分:

  • MyNettyServer:服務(wù)器端的核心業(yè)務(wù)代碼;

  • ServerInitializer:服務(wù)器端通道(Channel)初始化;

  • ServerHandler:服務(wù)器端接收到信息之后的處理邏輯。

PS:Channel 字面意思為“通道”,它是網(wǎng)絡(luò)通信的載體。Channel 提供了基本的 API 用于網(wǎng)絡(luò) I/O 操作,如  register、bind、connect、read、write、flush 等。Netty 自己實現(xiàn)的 Channel 是以 JDK NIO Channel  為基礎(chǔ)的,相比較于 JDK NIO,Netty 的 Channel 提供了更高層次的抽象,同時屏蔽了底層 Socket 的復(fù)雜性,賦予了 Channel  更加強(qiáng)大的功能,你在使用 Netty 時基本不需要再與 Java Socket 類直接打交道。

服務(wù)器端的實現(xiàn)代碼如下:

// 定義服務(wù)器的端口號 static final int PORT = 8007;  /**  * 服務(wù)器端  */ static class MyNettyServer {     public static void main(String[] args) {         // 創(chuàng)建一個線程組,用來負(fù)責(zé)接收客戶端連接         EventLoopGroup bossGroup = new NioEventLoopGroup();         // 創(chuàng)建另一個線程組,用來負(fù)責(zé) I/O 的讀寫         EventLoopGroup workerGroup = new NioEventLoopGroup();         try {             // 創(chuàng)建一個 Server 實例(可理解為 Netty 的入門類)             ServerBootstrap b = new ServerBootstrap();             // 將兩個線程池設(shè)置到 Server 實例             b.group(bossGroup, workerGroup)                     // 設(shè)置 Netty 通道的類型為 NioServerSocket(非阻塞 I/O Socket 服務(wù)器)                     .channel(NioServerSocketChannel.class)                     // 設(shè)置建立連接之后的執(zhí)行器(ServerInitializer 是我創(chuàng)建的一個自定義類)                     .childHandler(new ServerInitializer());             // 綁定端口并且進(jìn)行同步             ChannelFuture future = b.bind(PORT).sync();             // 對關(guān)閉通道進(jìn)行監(jiān)聽             future.channel().closeFuture().sync();         } catch (InterruptedException e) {             e.printStackTrace();         } finally {             // 資源關(guān)閉             bossGroup.shutdownGracefully();             workerGroup.shutdownGracefully();         }     } }  /**  * 服務(wù)端通道初始化  */ static class ServerInitializer extends ChannelInitializer<SocketChannel> {     // 字符串編碼器和解碼器     private static final StringDecoder DECODER = new StringDecoder();     private static final StringEncoder ENCODER = new StringEncoder();     // 服務(wù)器端連接之后的執(zhí)行器(自定義的類)     private static final ServerHandler SERVER_HANDLER = new ServerHandler();      /**      * 初始化通道的具體執(zhí)行方法      */     @Override     public void initChannel(SocketChannel ch) {         // 通道 Channel 設(shè)置         ChannelPipeline pipeline = ch.pipeline();         // 設(shè)置(字符串)編碼器和解碼器         pipeline.addLast(DECODER);         pipeline.addLast(ENCODER);         // 服務(wù)器端連接之后的執(zhí)行器,接收到消息之后的業(yè)務(wù)處理         pipeline.addLast(SERVER_HANDLER);     } }  /**  * 服務(wù)器端接收到消息之后的業(yè)務(wù)處理類  */ static class ServerHandler extends SimpleChannelInboundHandler<String> {      /**      * 讀取到客戶端的消息      */     @Override     public void channelRead0(ChannelHandlerContext ctx, String request) {         if (!request.isEmpty()) {             System.out.println("接到客戶端的消息:" + request);         }     }      /**      * 數(shù)據(jù)讀取完畢      */     @Override     public void channelReadComplete(ChannelHandlerContext ctx) {         ctx.flush();     }      /**      * 異常處理,打印異常并關(guān)閉通道      */     @Override     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {         cause.printStackTrace();         ctx.close();     } }

3.客戶端實現(xiàn)代碼

客戶端的代碼實現(xiàn)也是分為以下 3 個部分:

  • MyNettyClient:客戶端核心業(yè)務(wù)代碼;

  • ClientInitializer:客戶端通道初始化;

  • ClientHandler:接收到消息之后的處理邏輯。

客戶端的實現(xiàn)代碼如下:

/**  * 客戶端  */ static class MyNettyClient {     public static void main(String[] args) {         // 創(chuàng)建事件循環(huán)線程組(客戶端的線程組只有一個)         EventLoopGroup group = new NioEventLoopGroup();         try {             // Netty 客戶端啟動對象             Bootstrap b = new Bootstrap();             // 設(shè)置啟動參數(shù)             b.group(group)                     // 設(shè)置通道類型                     .channel(NioSocketChannel.class)                     // 設(shè)置啟動執(zhí)行器(負(fù)責(zé)啟動事件的業(yè)務(wù)執(zhí)行,ClientInitializer 為自定義的類)                     .handler(new ClientInitializer());              // 連接服務(wù)器端并同步通道             Channel ch = b.connect("127.0.0.1", 8007).sync().channel();              // 發(fā)送消息             ChannelFuture lastWriteFuture = null;             // 給服務(wù)器端發(fā)送 10 條消息             for (int i = 0; i < 10; i++) {                 // 發(fā)送給服務(wù)器消息                 lastWriteFuture = ch.writeAndFlush("Hi,Java.");             }             // 在關(guān)閉通道之前,同步刷新所有的消息             if (lastWriteFuture != null) {                 lastWriteFuture.sync();             }         } catch (InterruptedException e) {             e.printStackTrace();         } finally {             // 釋放資源             group.shutdownGracefully();         }     } }  /**  * 客戶端通道初始化類  */ static class ClientInitializer extends ChannelInitializer<SocketChannel> {     // 字符串編碼器和解碼器     private static final StringDecoder DECODER = new StringDecoder();     private static final StringEncoder ENCODER = new StringEncoder();     // 客戶端連接成功之后業(yè)務(wù)處理     private static final ClientHandler CLIENT_HANDLER = new ClientHandler();      /**      * 初始化客戶端通道      */     @Override     public void initChannel(SocketChannel ch) {         ChannelPipeline pipeline = ch.pipeline();         // 設(shè)置(字符串)編碼器和解碼器         pipeline.addLast(DECODER);         pipeline.addLast(ENCODER);         // 客戶端連接成功之后的業(yè)務(wù)處理         pipeline.addLast(CLIENT_HANDLER);     } }  /**  * 客戶端連接成功之后的業(yè)務(wù)處理  */ static class ClientHandler extends SimpleChannelInboundHandler<String> {     /**      * 讀取到服務(wù)器端的消息      */     @Override     protected void channelRead0(ChannelHandlerContext ctx, String msg) {         System.err.println("接到服務(wù)器的消息:" + msg);     }      /**      * 異常處理,打印異常并關(guān)閉通道      */     @Override     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {         cause.printStackTrace();         ctx.close();     } }

從以上代碼可以看出,我們代碼實現(xiàn)的功能是,客戶端給服務(wù)器端發(fā)送 10 條消息。

編寫完上述代碼之后,我們就可以啟動服務(wù)器端和客戶端了,啟動之后,它們的執(zhí)行結(jié)果如下:

如何解決Socket粘包問題

從上述結(jié)果中可以看出,雖然客戶端和服務(wù)器端實現(xiàn)了通信,但在  Netty 的使用中依然存在粘包的問題,服務(wù)器端一次收到了 10 條消息,而不是每次只收到一條消息,因此接下來我們要解決掉 Netty 中的粘包問題。

三、解決 Netty 粘包問題

在 Netty 中,解決粘包問題的常用方案有以下 3 種:

  1. 鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)

  2. 設(shè)置固定大小的消息長度,如果長度不足則使用空字符彌補(bǔ),它的缺點比較明顯,比較消耗網(wǎng)絡(luò)流量,因此不建議使用;

  3. 使用分隔符來確定消息的邊界,從而避免粘包和半包問題的產(chǎn)生;

  4. 將消息分為消息頭和消息體,在頭部中保存有當(dāng)前整個消息的長度,只有在讀取到足夠長度的消息之后才算是讀到了一個完整的消息。

接下來我們分別來看后兩種推薦的解決方案。

1.使用分隔符解決粘包問題

在 Netty 中提供了 DelimiterBasedFrameDecoder 類用來以特殊符號作為消息的結(jié)束符,從而解決粘包和半包的問題。

它的核心實現(xiàn)代碼是在初始化通道(Channel)時,通過設(shè)置 DelimiterBasedFrameDecoder  來分隔消息,需要在客戶端和服務(wù)器端都進(jìn)行設(shè)置,具體實現(xiàn)代碼如下。

服務(wù)器端核心實現(xiàn)代碼如下:

/**  * 服務(wù)端通道初始化  */ static class ServerInitializer extends ChannelInitializer<SocketChannel> {     // 字符串編碼器和解碼器     private static final StringDecoder DECODER = new StringDecoder();     private static final StringEncoder ENCODER = new StringEncoder();     // 服務(wù)器端連接之后的執(zhí)行器(自定義的類)     private static final ServerHandler SERVER_HANDLER = new ServerHandler();      /**      * 初始化通道的具體執(zhí)行方法      */     @Override     public void initChannel(SocketChannel ch) {         // 通道 Channel 設(shè)置         ChannelPipeline pipeline = ch.pipeline();         // 19 行:設(shè)置結(jié)尾分隔符【核心代碼】(參數(shù)1:為消息的最大長度,可自定義;參數(shù)2:分隔符[此處以換行符為分隔符])         pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));         // 設(shè)置(字符串)編碼器和解碼器         pipeline.addLast(DECODER);         pipeline.addLast(ENCODER);         // 服務(wù)器端連接之后的執(zhí)行器,接收到消息之后的業(yè)務(wù)處理         pipeline.addLast(SERVER_HANDLER);     } }

核心代碼為第 19 行,代碼中已經(jīng)備注了方法的含義,這里就不再贅述。

客戶端的核心實現(xiàn)代碼如下:

/**  * 客戶端通道初始化類  */ static class ClientInitializer extends ChannelInitializer<SocketChannel> {     // 字符串編碼器和解碼器     private static final StringDecoder DECODER = new StringDecoder();     private static final StringEncoder ENCODER = new StringEncoder();     // 客戶端連接成功之后業(yè)務(wù)處理     private static final ClientHandler CLIENT_HANDLER = new ClientHandler();      /**      * 初始化客戶端通道      */     @Override     public void initChannel(SocketChannel ch) {         ChannelPipeline pipeline = ch.pipeline();         // 17 行:設(shè)置結(jié)尾分隔符【核心代碼】(參數(shù)1:為消息的最大長度,可自定義;參數(shù)2:分隔符[此處以換行符為分隔符])         pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));         // 設(shè)置(字符串)編碼器和解碼器         pipeline.addLast(DECODER);         pipeline.addLast(ENCODER);         // 客戶端連接成功之后的業(yè)務(wù)處理         pipeline.addLast(CLIENT_HANDLER);     } }

完整的服務(wù)器端和客戶端的實現(xiàn)代碼如下:

import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder;  public class NettyExample {     // 定義服務(wù)器的端口號     static final int PORT = 8007;      /**      * 服務(wù)器端      */     static class MyNettyServer {         public static void main(String[] args) {             // 創(chuàng)建一個線程組,用來負(fù)責(zé)接收客戶端連接             EventLoopGroup bossGroup = new NioEventLoopGroup();             // 創(chuàng)建另一個線程組,用來負(fù)責(zé) I/O 的讀寫             EventLoopGroup workerGroup = new NioEventLoopGroup();             try {                 // 創(chuàng)建一個 Server 實例(可理解為 Netty 的入門類)                 ServerBootstrap b = new ServerBootstrap();                 // 將兩個線程池設(shè)置到 Server 實例                 b.group(bossGroup, workerGroup)                         // 設(shè)置 Netty 通道的類型為 NioServerSocket(非阻塞 I/O Socket 服務(wù)器)                         .channel(NioServerSocketChannel.class)                         // 設(shè)置建立連接之后的執(zhí)行器(ServerInitializer 是我創(chuàng)建的一個自定義類)                         .childHandler(new ServerInitializer());                 // 綁定端口并且進(jìn)行同步                 ChannelFuture future = b.bind(PORT).sync();                 // 對關(guān)閉通道進(jìn)行監(jiān)聽                 future.channel().closeFuture().sync();             } catch (InterruptedException e) {                 e.printStackTrace();             } finally {                 // 資源關(guān)閉                 bossGroup.shutdownGracefully();                 workerGroup.shutdownGracefully();             }         }     }      /**      * 服務(wù)端通道初始化      */     static class ServerInitializer extends ChannelInitializer<SocketChannel> {         // 字符串編碼器和解碼器         private static final StringDecoder DECODER = new StringDecoder();         private static final StringEncoder ENCODER = new StringEncoder();         // 服務(wù)器端連接之后的執(zhí)行器(自定義的類)         private static final ServerHandler SERVER_HANDLER = new ServerHandler();          /**          * 初始化通道的具體執(zhí)行方法          */         @Override         public void initChannel(SocketChannel ch) {             // 通道 Channel 設(shè)置             ChannelPipeline pipeline = ch.pipeline();             // 設(shè)置結(jié)尾分隔符             pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));             // 設(shè)置(字符串)編碼器和解碼器             pipeline.addLast(DECODER);             pipeline.addLast(ENCODER);             // 服務(wù)器端連接之后的執(zhí)行器,接收到消息之后的業(yè)務(wù)處理             pipeline.addLast(SERVER_HANDLER);         }     }      /**      * 服務(wù)器端接收到消息之后的業(yè)務(wù)處理類      */     static class ServerHandler extends SimpleChannelInboundHandler<String> {          /**          * 讀取到客戶端的消息          */         @Override         public void channelRead0(ChannelHandlerContext ctx, String request) {             if (!request.isEmpty()) {                 System.out.println("接到客戶端的消息:" + request);             }         }          /**          * 數(shù)據(jù)讀取完畢          */         @Override         public void channelReadComplete(ChannelHandlerContext ctx) {             ctx.flush();         }          /**          * 異常處理,打印異常并關(guān)閉通道          */         @Override         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {             cause.printStackTrace();             ctx.close();         }     }      /**      * 客戶端      */     static class MyNettyClient {         public static void main(String[] args) {             // 創(chuàng)建事件循環(huán)線程組(客戶端的線程組只有一個)             EventLoopGroup group = new NioEventLoopGroup();             try {                 // Netty 客戶端啟動對象                 Bootstrap b = new Bootstrap();                 // 設(shè)置啟動參數(shù)                 b.group(group)                         // 設(shè)置通道類型                         .channel(NioSocketChannel.class)                         // 設(shè)置啟動執(zhí)行器(負(fù)責(zé)啟動事件的業(yè)務(wù)執(zhí)行,ClientInitializer 為自定義的類)                         .handler(new ClientInitializer());                  // 連接服務(wù)器端并同步通道                 Channel ch = b.connect("127.0.0.1", PORT).sync().channel();                  // 發(fā)送消息                 ChannelFuture lastWriteFuture = null;                 // 給服務(wù)器端發(fā)送 10 條消息                 for (int i = 0; i < 10; i++) {                     // 發(fā)送給服務(wù)器消息                     lastWriteFuture = ch.writeAndFlush("Hi,Java.\n");                 }                 // 在關(guān)閉通道之前,同步刷新所有的消息                 if (lastWriteFuture != null) {                     lastWriteFuture.sync();                 }             } catch (InterruptedException e) {                 e.printStackTrace();             } finally {                 // 釋放資源                 group.shutdownGracefully();             }         }     }      /**      * 客戶端通道初始化類      */     static class ClientInitializer extends ChannelInitializer<SocketChannel> {         // 字符串編碼器和解碼器         private static final StringDecoder DECODER = new StringDecoder();         private static final StringEncoder ENCODER = new StringEncoder();         // 客戶端連接成功之后業(yè)務(wù)處理         private static final ClientHandler CLIENT_HANDLER = new ClientHandler();          /**          * 初始化客戶端通道          */         @Override         public void initChannel(SocketChannel ch) {             ChannelPipeline pipeline = ch.pipeline();             // 設(shè)置結(jié)尾分隔符             pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));             // 設(shè)置(字符串)編碼器和解碼器             pipeline.addLast(DECODER);             pipeline.addLast(ENCODER);             // 客戶端連接成功之后的業(yè)務(wù)處理             pipeline.addLast(CLIENT_HANDLER);         }     }      /**      * 客戶端連接成功之后的業(yè)務(wù)處理      */     static class ClientHandler extends SimpleChannelInboundHandler<String> {          /**          * 讀取到服務(wù)器端的消息          */         @Override         protected void channelRead0(ChannelHandlerContext ctx, String msg) {             System.err.println("接到服務(wù)器的消息:" + msg);         }          /**          * 異常處理,打印異常并關(guān)閉通道          */         @Override         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {             cause.printStackTrace();             ctx.close();         }     } }

最終的執(zhí)行結(jié)果如下圖所示:

如何解決Socket粘包問題

從上述結(jié)果中可以看出,Netty 可以正常使用了,它已經(jīng)不存在粘包和半包問題了。

2.封裝消息解決粘包問題

此解決方案的核心是將消息分為消息頭 +  消息體,在消息頭中保存消息體的長度,從而確定一條消息的邊界,這樣就避免了粘包和半包問題了,它的實現(xiàn)過程如下圖所示:

如何解決Socket粘包問題

在 Netty 中可以通過  LengthFieldPrepender(編碼)和  LengthFieldBasedFrameDecoder(解碼)兩個類實現(xiàn)消息的封裝。和上一個解決方案類似,我們需要分別在服務(wù)器端和客戶端通過設(shè)置通道(Channel)來解決粘包問題。

服務(wù)器端的核心代碼如下:

/**  * 服務(wù)端通道初始化  */ static class ServerInitializer extends ChannelInitializer<SocketChannel> {     // 字符串編碼器和解碼器     private static final StringDecoder DECODER = new StringDecoder();     private static final StringEncoder ENCODER = new StringEncoder();     // 服務(wù)器端連接之后的執(zhí)行器(自定義的類)     private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler();      /**      * 初始化通道的具體執(zhí)行方法      */     @Override     public void initChannel(SocketChannel ch) {         // 通道 Channel 設(shè)置         ChannelPipeline pipeline = ch.pipeline();         // 18 行:消息解碼:讀取消息頭和消息體         pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));         // 20 行:消息編碼:將消息封裝為消息頭和消息體,在消息前添加消息體的長度         pipeline.addLast(new LengthFieldPrepender(4));         // 設(shè)置(字符串)編碼器和解碼器         pipeline.addLast(DECODER);         pipeline.addLast(ENCODER);         // 服務(wù)器端連接之后的執(zhí)行器,接收到消息之后的業(yè)務(wù)處理         pipeline.addLast(SERVER_HANDLER);     } }

其中核心代碼是 18 行和 20 行,通過 LengthFieldPrepender 實現(xiàn)編碼(將消息打包成消息頭 + 消息體),通過  LengthFieldBasedFrameDecoder 實現(xiàn)解碼(從封裝的消息中取出消息的內(nèi)容)。

LengthFieldBasedFrameDecoder 的參數(shù)說明如下:

  1. 鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)

  2. 參數(shù) 1:maxFrameLength - 發(fā)送的數(shù)據(jù)包最大長度;

  3. 參數(shù) 2:lengthFieldOffset - 長度域偏移量,指的是長度域位于整個數(shù)據(jù)包字節(jié)數(shù)組中的下標(biāo);

  4. 參數(shù) 3:lengthFieldLength - 長度域自己的字節(jié)數(shù)長度;

  5. 參數(shù) 4:lengthAdjustment &ndash;  長度域的偏移量矯正。如果長度域的值,除了包含有效數(shù)據(jù)域的長度外,還包含了其他域(如長度域自身)長度,那么,就需要進(jìn)行矯正。矯正的值為:包長 - 長度域的值 &ndash;  長度域偏移 &ndash; 長度域長;

  6. 參數(shù) 5:initialBytesToStrip &ndash; 丟棄的起始字節(jié)數(shù)。丟棄處于有效數(shù)據(jù)前面的字節(jié)數(shù)量。比如前面有 4 個節(jié)點的長度域,則它的值為  4。

LengthFieldBasedFrameDecoder(1024,0,4,0,4) 的意思是:數(shù)據(jù)包最大長度為  1024,長度域占首部的四個字節(jié),在讀數(shù)據(jù)的時候去掉首部四個字節(jié)(即長度域)。

客戶端的核心實現(xiàn)代碼如下:

/**  * 客戶端通道初始化類  */ static class ClientInitializer extends ChannelInitializer<SocketChannel> {     // 字符串編碼器和解碼器     private static final StringDecoder DECODER = new StringDecoder();     private static final StringEncoder ENCODER = new StringEncoder();     // 客戶端連接成功之后業(yè)務(wù)處理     private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler();      /**      * 初始化客戶端通道      */     @Override     public void initChannel(SocketChannel ch) {         ChannelPipeline pipeline = ch.pipeline();         // 消息解碼:讀取消息頭和消息體         pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));         // 消息編碼:將消息封裝為消息頭和消息體,在響應(yīng)字節(jié)數(shù)據(jù)前面添加消息體長度         pipeline.addLast(new LengthFieldPrepender(4));         // 設(shè)置(字符串)編碼器和解碼器         pipeline.addLast(DECODER);         pipeline.addLast(ENCODER);         // 客戶端連接成功之后的業(yè)務(wù)處理         pipeline.addLast(CLIENT_HANDLER);     } }

完整的服務(wù)器端和客戶端的實現(xiàn)代碼如下:

import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder;  /**  * 通過封裝 Netty 來解決粘包  */ public class NettyExample {     // 定義服務(wù)器的端口號     static final int PORT = 8007;      /**      * 服務(wù)器端      */     static class MyNettyServer {         public static void main(String[] args) {             // 創(chuàng)建一個線程組,用來負(fù)責(zé)接收客戶端連接             EventLoopGroup bossGroup = new NioEventLoopGroup();             // 創(chuàng)建另一個線程組,用來負(fù)責(zé) I/O 的讀寫             EventLoopGroup workerGroup = new NioEventLoopGroup();             try {                 // 創(chuàng)建一個 Server 實例(可理解為 Netty 的入門類)                 ServerBootstrap b = new ServerBootstrap();                 // 將兩個線程池設(shè)置到 Server 實例                 b.group(bossGroup, workerGroup)                         // 設(shè)置 Netty 通道的類型為 NioServerSocket(非阻塞 I/O Socket 服務(wù)器)                         .channel(NioServerSocketChannel.class)                         // 設(shè)置建立連接之后的執(zhí)行器(ServerInitializer 是我創(chuàng)建的一個自定義類)                         .childHandler(new NettyExample.ServerInitializer());                 // 綁定端口并且進(jìn)行同步                 ChannelFuture future = b.bind(PORT).sync();                 // 對關(guān)閉通道進(jìn)行監(jiān)聽                 future.channel().closeFuture().sync();             } catch (InterruptedException e) {                 e.printStackTrace();             } finally {                 // 資源關(guān)閉                 bossGroup.shutdownGracefully();                 workerGroup.shutdownGracefully();             }         }     }      /**      * 服務(wù)端通道初始化      */     static class ServerInitializer extends ChannelInitializer<SocketChannel> {         // 字符串編碼器和解碼器         private static final StringDecoder DECODER = new StringDecoder();         private static final StringEncoder ENCODER = new StringEncoder();         // 服務(wù)器端連接之后的執(zhí)行器(自定義的類)         private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler();          /**          * 初始化通道的具體執(zhí)行方法          */         @Override         public void initChannel(SocketChannel ch) {             // 通道 Channel 設(shè)置             ChannelPipeline pipeline = ch.pipeline();             // 消息解碼:讀取消息頭和消息體             pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));             // 消息編碼:將消息封裝為消息頭和消息體,在響應(yīng)字節(jié)數(shù)據(jù)前面添加消息體長度             pipeline.addLast(new LengthFieldPrepender(4));             // 設(shè)置(字符串)編碼器和解碼器             pipeline.addLast(DECODER);             pipeline.addLast(ENCODER);             // 服務(wù)器端連接之后的執(zhí)行器,接收到消息之后的業(yè)務(wù)處理             pipeline.addLast(SERVER_HANDLER);         }     }      /**      * 服務(wù)器端接收到消息之后的業(yè)務(wù)處理類      */     static class ServerHandler extends SimpleChannelInboundHandler<String> {          /**          * 讀取到客戶端的消息          */         @Override         public void channelRead0(ChannelHandlerContext ctx, String request) {             if (!request.isEmpty()) {                 System.out.println("接到客戶端的消息:" + request);             }         }          /**          * 數(shù)據(jù)讀取完畢          */         @Override         public void channelReadComplete(ChannelHandlerContext ctx) {             ctx.flush();         }          /**          * 異常處理,打印異常并關(guān)閉通道          */         @Override         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {             cause.printStackTrace();             ctx.close();         }     }      /**      * 客戶端      */     static class MyNettyClient {         public static void main(String[] args) {             // 創(chuàng)建事件循環(huán)線程組(客戶端的線程組只有一個)             EventLoopGroup group = new NioEventLoopGroup();             try {                 // Netty 客戶端啟動對象                 Bootstrap b = new Bootstrap();                 // 設(shè)置啟動參數(shù)                 b.group(group)                         // 設(shè)置通道類型                         .channel(NioSocketChannel.class)                         // 設(shè)置啟動執(zhí)行器(負(fù)責(zé)啟動事件的業(yè)務(wù)執(zhí)行,ClientInitializer 為自定義的類)                         .handler(new NettyExample.ClientInitializer());                  // 連接服務(wù)器端并同步通道                 Channel ch = b.connect("127.0.0.1", PORT).sync().channel();                  // 發(fā)送消息                 ChannelFuture lastWriteFuture = null;                 // 給服務(wù)器端發(fā)送 10 條消息                 for (int i = 0; i < 10; i++) {                     // 發(fā)送給服務(wù)器消息                     lastWriteFuture = ch.writeAndFlush("Hi,Java.\n");                 }                 // 在關(guān)閉通道之前,同步刷新所有的消息                 if (lastWriteFuture != null) {                     lastWriteFuture.sync();                 }             } catch (InterruptedException e) {                 e.printStackTrace();             } finally {                 // 釋放資源                 group.shutdownGracefully();             }         }     }      /**      * 客戶端通道初始化類      */     static class ClientInitializer extends ChannelInitializer<SocketChannel> {         // 字符串編碼器和解碼器         private static final StringDecoder DECODER = new StringDecoder();         private static final StringEncoder ENCODER = new StringEncoder();         // 客戶端連接成功之后業(yè)務(wù)處理         private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler();          /**          * 初始化客戶端通道          */         @Override         public void initChannel(SocketChannel ch) {             ChannelPipeline pipeline = ch.pipeline();             // 消息解碼:讀取消息頭和消息體             pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));             // 消息編碼:將消息封裝為消息頭和消息體,在響應(yīng)字節(jié)數(shù)據(jù)前面添加消息體長度             pipeline.addLast(new LengthFieldPrepender(4));             // 設(shè)置(字符串)編碼器和解碼器             pipeline.addLast(DECODER);             pipeline.addLast(ENCODER);             // 客戶端連接成功之后的業(yè)務(wù)處理             pipeline.addLast(CLIENT_HANDLER);         }     }      /**      * 客戶端連接成功之后的業(yè)務(wù)處理      */     static class ClientHandler extends SimpleChannelInboundHandler<String> {          /**          * 讀取到服務(wù)器端的消息          */         @Override         protected void channelRead0(ChannelHandlerContext ctx, String msg) {             System.err.println("接到服務(wù)器的消息:" + msg);         }          /**          * 異常處理,打印異常并關(guān)閉通道          */         @Override         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {             cause.printStackTrace();             ctx.close();         }     } }

以上程序的執(zhí)行結(jié)果為:

如何解決Socket粘包問題

“如何解決Socket粘包問題”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI