您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“如何解決Socket粘包問題”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
問題一:TCP存在粘包問題嗎?
先說答案:TCP 本身并沒有粘包和半包一說,因為 TCP 本質(zhì)上只是一個傳輸控制協(xié)議(Transmission Control Protocol,TCP),它是一種面向連接的、可靠的、基于字節(jié)流的傳輸層通信協(xié)議,由 IETF 的 RFC 793 定義。
所謂的協(xié)議本質(zhì)上是一個約定,就好比 Java 編程約定使用駝峰命名法一樣,約定的意義是為了讓通訊雙方,能夠正常的進(jìn)行消息互換的,那粘包和半包問題又是如何產(chǎn)生的呢?
這是因為在 TCP 的交互中,數(shù)據(jù)是以字節(jié)流的形式進(jìn)行傳輸?shù)?,而“流”的傳輸是沒有邊界的,因為沒有邊界所以就不能區(qū)分消息的歸屬,從而就會產(chǎn)生粘包和半包問題(粘包和半包的定義,詳見上一篇)。所以說 TCP 協(xié)議本身并不存在粘包和半包問題,只是在使用中如果不能有效的確定流的邊界就會產(chǎn)生粘包和半包問題。
問題二:分隔符是最優(yōu)解決方案?
坦白的說,經(jīng)過評論區(qū)大家的耐心“開導(dǎo)”,我也意識到了以結(jié)束符作為最終的解決方案存在一定的局限性,比如當(dāng)一條消息中間如果出現(xiàn)了結(jié)束符就會造成半包的問題,所以如果是復(fù)雜的字符串要對內(nèi)容進(jìn)行編碼和解碼處理,這樣才能保證結(jié)束符的正確性。
問題三:Socket 高效嗎?
這個問題的答案是否定的,其實上文在開頭已經(jīng)描述了應(yīng)用場景:「傳統(tǒng)的 Socket 編程」,學(xué)習(xí)它的意義就在于理解更早期更底層的一些知識,當(dāng)然作為補(bǔ)充本文會提供更加高效的消息通訊方案——Netty 通訊。
聊完了以上問題,接下來咱們先來補(bǔ)充一下上篇文章中提到的,將消息分為消息頭和消息體的代碼實現(xiàn)。
一、封裝消息頭和消息體
在開始寫服務(wù)器端和客戶端之前,咱們先來編寫一個消息的封裝類,使用它可以將消息封裝成消息頭和消息體,如下圖所示:
消息頭中存儲消息體的長度,從而確定了消息的邊界,便解決粘包和半包問題。
1.消息封裝類
消息的封裝類中提供了兩個方法:一個是將消息轉(zhuǎn)換成消息頭 + 消息體的方法,另一個是讀取消息頭的方法,具體實現(xiàn)代碼如下:
/** * 消息封裝類 */ class SocketPacket { // 消息頭存儲的長度(占 8 字節(jié)) static final int HEAD_SIZE = 8; /** * 將協(xié)議封裝為:協(xié)議頭 + 協(xié)議體 * @param context 消息體(String 類型) * @return byte[] */ public byte[] toBytes(String context) { // 協(xié)議體 byte 數(shù)組 byte[] bodyByte = context.getBytes(); int bodyByteLength = bodyByte.length; // 最終封裝對象 byte[] result = new byte[HEAD_SIZE + bodyByteLength]; // 借助 NumberFormat 將 int 轉(zhuǎn)換為 byte[] NumberFormat numberFormat = NumberFormat.getNumberInstance(); numberFormat.setMinimumIntegerDigits(HEAD_SIZE); numberFormat.setGroupingUsed(false); // 協(xié)議頭 byte 數(shù)組 byte[] headByte = numberFormat.format(bodyByteLength).getBytes(); // 封裝協(xié)議頭 System.arraycopy(headByte, 0, result, 0, HEAD_SIZE); // 封裝協(xié)議體 System.arraycopy(bodyByte, 0, result, HEAD_SIZE, bodyByteLength); return result; } /** * 獲取消息頭的內(nèi)容(也就是消息體的長度) * @param inputStream * @return */ public int getHeader(InputStream inputStream) throws IOException { int result = 0; byte[] bytes = new byte[HEAD_SIZE]; inputStream.read(bytes, 0, HEAD_SIZE); // 得到消息體的字節(jié)長度 result = Integer.valueOf(new String(bytes)); return result; } }
2.編寫客戶端
接下來我們來定義客戶端,在客戶端中我們添加一組待發(fā)送的消息,隨機(jī)給服務(wù)器端發(fā)送一個消息,實現(xiàn)代碼如下:
/** * 客戶端 */ class MySocketClient { public static void main(String[] args) throws IOException { // 啟動 Socket 并嘗試連接服務(wù)器 Socket socket = new Socket("127.0.0.1", 9093); // 發(fā)送消息合集(隨機(jī)發(fā)送一條消息) final String[] message = {"Hi,Java.", "Hi,SQL~", "關(guān)注公眾號|Java中文社群."}; // 創(chuàng)建協(xié)議封裝對象 SocketPacket socketPacket = new SocketPacket(); try (OutputStream outputStream = socket.getOutputStream()) { // 給服務(wù)器端發(fā)送 10 次消息 for (int i = 0; i < 10; i++) { // 隨機(jī)發(fā)送一條消息 String msg = message[new Random().nextInt(message.length)]; // 將內(nèi)容封裝為:協(xié)議頭+協(xié)議體 byte[] bytes = socketPacket.toBytes(msg); // 發(fā)送消息 outputStream.write(bytes, 0, bytes.length); outputStream.flush(); } } } }
3.編寫服務(wù)器端
服務(wù)器端我們使用線程池來處理每個客戶端的業(yè)務(wù)請求,實現(xiàn)代碼如下:
/** * 服務(wù)器端 */ class MySocketServer { public static void main(String[] args) throws IOException { // 創(chuàng)建 Socket 服務(wù)器端 ServerSocket serverSocket = new ServerSocket(9093); // 獲取客戶端連接 Socket clientSocket = serverSocket.accept(); // 使用線程池處理更多的客戶端 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(100, 150, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000)); threadPool.submit(() -> { // 客戶端消息處理 processMessage(clientSocket); }); } /** * 客戶端消息處理 * @param clientSocket */ private static void processMessage(Socket clientSocket) { // Socket 封裝對象 SocketPacket socketPacket = new SocketPacket(); // 獲取客戶端發(fā)送的消息對象 try (InputStream inputStream = clientSocket.getInputStream()) { while (true) { // 獲取消息頭(也就是消息體的長度) int bodyLength = socketPacket.getHeader(inputStream); // 消息體 byte 數(shù)組 byte[] bodyByte = new byte[bodyLength]; // 每次實際讀取字節(jié)數(shù) int readCount = 0; // 消息體賦值下標(biāo) int bodyIndex = 0; // 循環(huán)接收消息頭中定義的長度 while (bodyIndex <= (bodyLength - 1) && (readCount = inputStream.read(bodyByte, bodyIndex, bodyLength)) != -1) { bodyIndex += readCount; } bodyIndex = 0; // 成功接收到客戶端的消息并打印 System.out.println("接收到客戶端的信息:" + new String(bodyByte)); } } catch (IOException ioException) { System.out.println(ioException.getMessage()); } } }
以上程序的執(zhí)行結(jié)果如下:
從上述結(jié)果可以看出,消息通訊正常,客戶端和服務(wù)器端的交互中并沒有出現(xiàn)粘包和半包的問題。
二、使用 Netty 實現(xiàn)高效通訊
以上的內(nèi)容都是針對傳統(tǒng) Socket 編程的,但要實現(xiàn)更加高效的通訊和連接對象的復(fù)用就要使用 NIO(Non-Blocking IO,非阻塞 IO)或者 AIO(Asynchronous IO,異步非阻塞 IO)了。
傳統(tǒng)的 Socket 編程是 BIO(Blocking IO,同步阻塞 IO),它和 NIO 和 AIO 的區(qū)別如下:
BIO 來自傳統(tǒng)的 java.io 包,它是基于流模型實現(xiàn)的,交互的方式是同步、阻塞方式,也就是說在讀入輸入流或者輸出流時,在讀寫動作完成之前,線程會一直阻塞在那里,它們之間的調(diào)用是可靠的線性順序。它的優(yōu)點就是代碼比較簡單、直觀;缺點就是 IO 的效率和擴(kuò)展性很低,容易成為應(yīng)用性能瓶頸。
NIO 是 Java 1.4 引入的 java.nio 包,提供了 Channel、Selector、Buffer 等新的抽象,可以構(gòu)建多路復(fù)用的、同步非阻塞 IO 程序,同時提供了更接近操作系統(tǒng)底層高性能的數(shù)據(jù)操作方式。
AIO 是 Java 1.7 之后引入的包,是 NIO 的升級版本,提供了異步非堵塞的 IO 操作方式,因此人們叫它 AIO(Asynchronous IO),異步 IO 是基于事件和回調(diào)機(jī)制實現(xiàn)的,也就是應(yīng)用操作之后會直接返回,不會堵塞在那里,當(dāng)后臺處理完成,操作系統(tǒng)會通知相應(yīng)的線程進(jìn)行后續(xù)的操作。
PS:AIO 可以看作是 NIO 的升級,它也叫 NIO 2。
傳統(tǒng) Socket 的通訊流程:
NIO 的通訊流程:
使用 Netty 替代傳統(tǒng) NIO 編程
NIO 的設(shè)計思路雖然很好,但它的代碼編寫比較麻煩,比如 Buffer 的使用和 Selector 的編寫等。并且在面對斷線重連、包丟失和粘包等復(fù)雜問題時手動處理的成本都很大,因此我們通常會使用 Netty 框架來替代傳統(tǒng)的 NIO。
Netty 是什么?
Netty 是一個異步、事件驅(qū)動的用來做高性能、高可靠性的網(wǎng)絡(luò)應(yīng)用框架,使用它可以快速輕松地開發(fā)網(wǎng)絡(luò)應(yīng)用程序,極大的簡化了網(wǎng)絡(luò)編程的復(fù)雜度。
Netty 主要優(yōu)點有以下幾個:
鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)
框架設(shè)計優(yōu)雅,底層模型隨意切換適應(yīng)不同的網(wǎng)絡(luò)協(xié)議要求;
提供很多標(biāo)準(zhǔn)的協(xié)議、安全、編碼解碼的支持;
簡化了 NIO 使用中的諸多不便;
社區(qū)非常活躍,很多開源框架中都使用了 Netty 框架,如 Dubbo、RocketMQ、Spark 等。
Netty 主要包含以下 3 個部分,如下圖所示:
圖片這 3 個部分的功能介紹如下。
1. Core 核心層
Core 核心層是 Netty 最精華的內(nèi)容,它提供了底層網(wǎng)絡(luò)通信的通用抽象和實現(xiàn),包括可擴(kuò)展的事件模型、通用的通信 API、支持零拷貝的 ByteBuf 等。
2. Protocol Support 協(xié)議支持層
協(xié)議支持層基本上覆蓋了主流協(xié)議的編解碼實現(xiàn),如 HTTP、SSL、Protobuf、壓縮、大文件傳輸、WebSocket、文本、二進(jìn)制等主流協(xié)議,此外 Netty 還支持自定義應(yīng)用層協(xié)議。Netty 豐富的協(xié)議支持降低了用戶的開發(fā)成本,基于 Netty 我們可以快速開發(fā) HTTP、WebSocket 等服務(wù)。
3. Transport Service 傳輸服務(wù)層
傳輸服務(wù)層提供了網(wǎng)絡(luò)傳輸能力的定義和實現(xiàn)方法。它支持 Socket、HTTP 隧道、虛擬機(jī)管道等傳輸方式。Netty 對 TCP、UDP 等數(shù)據(jù)傳輸做了抽象和封裝,用戶可以更聚焦在業(yè)務(wù)邏輯實現(xiàn)上,而不必關(guān)系底層數(shù)據(jù)傳輸?shù)募?xì)節(jié)。
Netty 使用
對 Netty 有了大概的認(rèn)識之后,接下來我們用 Netty 來編寫一個基礎(chǔ)的通訊服務(wù)器,它包含兩個端:服務(wù)器端和客戶端,客戶端負(fù)責(zé)發(fā)送消息,服務(wù)器端負(fù)責(zé)接收并打印消息,具體的實現(xiàn)步驟如下。
1.添加 Netty 框架
首先我們需要先添加 Netty 框架的支持,如果是 Maven 項目添加如下配置即可:
<!-- 添加 Netty 框架 --> <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.56.Final</version> </dependency>
Netty 版本說明
Netty 的 3.x 和 4.x 為主流的穩(wěn)定版本,而最新的 5.x 已經(jīng)是放棄的測試版了,因此推薦使用 Netty 4.x 的最新穩(wěn)定版。
2. 服務(wù)器端實現(xiàn)代碼
按照官方的推薦,這里將服務(wù)器端的代碼分為以下 3 個部分:
MyNettyServer:服務(wù)器端的核心業(yè)務(wù)代碼;
ServerInitializer:服務(wù)器端通道(Channel)初始化;
ServerHandler:服務(wù)器端接收到信息之后的處理邏輯。
PS:Channel 字面意思為“通道”,它是網(wǎng)絡(luò)通信的載體。Channel 提供了基本的 API 用于網(wǎng)絡(luò) I/O 操作,如 register、bind、connect、read、write、flush 等。Netty 自己實現(xiàn)的 Channel 是以 JDK NIO Channel 為基礎(chǔ)的,相比較于 JDK NIO,Netty 的 Channel 提供了更高層次的抽象,同時屏蔽了底層 Socket 的復(fù)雜性,賦予了 Channel 更加強(qiáng)大的功能,你在使用 Netty 時基本不需要再與 Java Socket 類直接打交道。
服務(wù)器端的實現(xiàn)代碼如下:
// 定義服務(wù)器的端口號 static final int PORT = 8007; /** * 服務(wù)器端 */ static class MyNettyServer { public static void main(String[] args) { // 創(chuàng)建一個線程組,用來負(fù)責(zé)接收客戶端連接 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 創(chuàng)建另一個線程組,用來負(fù)責(zé) I/O 的讀寫 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 創(chuàng)建一個 Server 實例(可理解為 Netty 的入門類) ServerBootstrap b = new ServerBootstrap(); // 將兩個線程池設(shè)置到 Server 實例 b.group(bossGroup, workerGroup) // 設(shè)置 Netty 通道的類型為 NioServerSocket(非阻塞 I/O Socket 服務(wù)器) .channel(NioServerSocketChannel.class) // 設(shè)置建立連接之后的執(zhí)行器(ServerInitializer 是我創(chuàng)建的一個自定義類) .childHandler(new ServerInitializer()); // 綁定端口并且進(jìn)行同步 ChannelFuture future = b.bind(PORT).sync(); // 對關(guān)閉通道進(jìn)行監(jiān)聽 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 資源關(guān)閉 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } /** * 服務(wù)端通道初始化 */ static class ServerInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 服務(wù)器端連接之后的執(zhí)行器(自定義的類) private static final ServerHandler SERVER_HANDLER = new ServerHandler(); /** * 初始化通道的具體執(zhí)行方法 */ @Override public void initChannel(SocketChannel ch) { // 通道 Channel 設(shè)置 ChannelPipeline pipeline = ch.pipeline(); // 設(shè)置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 服務(wù)器端連接之后的執(zhí)行器,接收到消息之后的業(yè)務(wù)處理 pipeline.addLast(SERVER_HANDLER); } } /** * 服務(wù)器端接收到消息之后的業(yè)務(wù)處理類 */ static class ServerHandler extends SimpleChannelInboundHandler<String> { /** * 讀取到客戶端的消息 */ @Override public void channelRead0(ChannelHandlerContext ctx, String request) { if (!request.isEmpty()) { System.out.println("接到客戶端的消息:" + request); } } /** * 數(shù)據(jù)讀取完畢 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } /** * 異常處理,打印異常并關(guān)閉通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
3.客戶端實現(xiàn)代碼
客戶端的代碼實現(xiàn)也是分為以下 3 個部分:
MyNettyClient:客戶端核心業(yè)務(wù)代碼;
ClientInitializer:客戶端通道初始化;
ClientHandler:接收到消息之后的處理邏輯。
客戶端的實現(xiàn)代碼如下:
/** * 客戶端 */ static class MyNettyClient { public static void main(String[] args) { // 創(chuàng)建事件循環(huán)線程組(客戶端的線程組只有一個) EventLoopGroup group = new NioEventLoopGroup(); try { // Netty 客戶端啟動對象 Bootstrap b = new Bootstrap(); // 設(shè)置啟動參數(shù) b.group(group) // 設(shè)置通道類型 .channel(NioSocketChannel.class) // 設(shè)置啟動執(zhí)行器(負(fù)責(zé)啟動事件的業(yè)務(wù)執(zhí)行,ClientInitializer 為自定義的類) .handler(new ClientInitializer()); // 連接服務(wù)器端并同步通道 Channel ch = b.connect("127.0.0.1", 8007).sync().channel(); // 發(fā)送消息 ChannelFuture lastWriteFuture = null; // 給服務(wù)器端發(fā)送 10 條消息 for (int i = 0; i < 10; i++) { // 發(fā)送給服務(wù)器消息 lastWriteFuture = ch.writeAndFlush("Hi,Java."); } // 在關(guān)閉通道之前,同步刷新所有的消息 if (lastWriteFuture != null) { lastWriteFuture.sync(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { // 釋放資源 group.shutdownGracefully(); } } } /** * 客戶端通道初始化類 */ static class ClientInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 客戶端連接成功之后業(yè)務(wù)處理 private static final ClientHandler CLIENT_HANDLER = new ClientHandler(); /** * 初始化客戶端通道 */ @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 設(shè)置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 客戶端連接成功之后的業(yè)務(wù)處理 pipeline.addLast(CLIENT_HANDLER); } } /** * 客戶端連接成功之后的業(yè)務(wù)處理 */ static class ClientHandler extends SimpleChannelInboundHandler<String> { /** * 讀取到服務(wù)器端的消息 */ @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.err.println("接到服務(wù)器的消息:" + msg); } /** * 異常處理,打印異常并關(guān)閉通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
從以上代碼可以看出,我們代碼實現(xiàn)的功能是,客戶端給服務(wù)器端發(fā)送 10 條消息。
編寫完上述代碼之后,我們就可以啟動服務(wù)器端和客戶端了,啟動之后,它們的執(zhí)行結(jié)果如下:
從上述結(jié)果中可以看出,雖然客戶端和服務(wù)器端實現(xiàn)了通信,但在 Netty 的使用中依然存在粘包的問題,服務(wù)器端一次收到了 10 條消息,而不是每次只收到一條消息,因此接下來我們要解決掉 Netty 中的粘包問題。
三、解決 Netty 粘包問題
在 Netty 中,解決粘包問題的常用方案有以下 3 種:
鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)
設(shè)置固定大小的消息長度,如果長度不足則使用空字符彌補(bǔ),它的缺點比較明顯,比較消耗網(wǎng)絡(luò)流量,因此不建議使用;
使用分隔符來確定消息的邊界,從而避免粘包和半包問題的產(chǎn)生;
將消息分為消息頭和消息體,在頭部中保存有當(dāng)前整個消息的長度,只有在讀取到足夠長度的消息之后才算是讀到了一個完整的消息。
接下來我們分別來看后兩種推薦的解決方案。
1.使用分隔符解決粘包問題
在 Netty 中提供了 DelimiterBasedFrameDecoder 類用來以特殊符號作為消息的結(jié)束符,從而解決粘包和半包的問題。
它的核心實現(xiàn)代碼是在初始化通道(Channel)時,通過設(shè)置 DelimiterBasedFrameDecoder 來分隔消息,需要在客戶端和服務(wù)器端都進(jìn)行設(shè)置,具體實現(xiàn)代碼如下。
服務(wù)器端核心實現(xiàn)代碼如下:
/** * 服務(wù)端通道初始化 */ static class ServerInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 服務(wù)器端連接之后的執(zhí)行器(自定義的類) private static final ServerHandler SERVER_HANDLER = new ServerHandler(); /** * 初始化通道的具體執(zhí)行方法 */ @Override public void initChannel(SocketChannel ch) { // 通道 Channel 設(shè)置 ChannelPipeline pipeline = ch.pipeline(); // 19 行:設(shè)置結(jié)尾分隔符【核心代碼】(參數(shù)1:為消息的最大長度,可自定義;參數(shù)2:分隔符[此處以換行符為分隔符]) pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter())); // 設(shè)置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 服務(wù)器端連接之后的執(zhí)行器,接收到消息之后的業(yè)務(wù)處理 pipeline.addLast(SERVER_HANDLER); } }
核心代碼為第 19 行,代碼中已經(jīng)備注了方法的含義,這里就不再贅述。
客戶端的核心實現(xiàn)代碼如下:
/** * 客戶端通道初始化類 */ static class ClientInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 客戶端連接成功之后業(yè)務(wù)處理 private static final ClientHandler CLIENT_HANDLER = new ClientHandler(); /** * 初始化客戶端通道 */ @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 17 行:設(shè)置結(jié)尾分隔符【核心代碼】(參數(shù)1:為消息的最大長度,可自定義;參數(shù)2:分隔符[此處以換行符為分隔符]) pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter())); // 設(shè)置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 客戶端連接成功之后的業(yè)務(wù)處理 pipeline.addLast(CLIENT_HANDLER); } }
完整的服務(wù)器端和客戶端的實現(xiàn)代碼如下:
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class NettyExample { // 定義服務(wù)器的端口號 static final int PORT = 8007; /** * 服務(wù)器端 */ static class MyNettyServer { public static void main(String[] args) { // 創(chuàng)建一個線程組,用來負(fù)責(zé)接收客戶端連接 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 創(chuàng)建另一個線程組,用來負(fù)責(zé) I/O 的讀寫 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 創(chuàng)建一個 Server 實例(可理解為 Netty 的入門類) ServerBootstrap b = new ServerBootstrap(); // 將兩個線程池設(shè)置到 Server 實例 b.group(bossGroup, workerGroup) // 設(shè)置 Netty 通道的類型為 NioServerSocket(非阻塞 I/O Socket 服務(wù)器) .channel(NioServerSocketChannel.class) // 設(shè)置建立連接之后的執(zhí)行器(ServerInitializer 是我創(chuàng)建的一個自定義類) .childHandler(new ServerInitializer()); // 綁定端口并且進(jìn)行同步 ChannelFuture future = b.bind(PORT).sync(); // 對關(guān)閉通道進(jìn)行監(jiān)聽 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 資源關(guān)閉 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } /** * 服務(wù)端通道初始化 */ static class ServerInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 服務(wù)器端連接之后的執(zhí)行器(自定義的類) private static final ServerHandler SERVER_HANDLER = new ServerHandler(); /** * 初始化通道的具體執(zhí)行方法 */ @Override public void initChannel(SocketChannel ch) { // 通道 Channel 設(shè)置 ChannelPipeline pipeline = ch.pipeline(); // 設(shè)置結(jié)尾分隔符 pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter())); // 設(shè)置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 服務(wù)器端連接之后的執(zhí)行器,接收到消息之后的業(yè)務(wù)處理 pipeline.addLast(SERVER_HANDLER); } } /** * 服務(wù)器端接收到消息之后的業(yè)務(wù)處理類 */ static class ServerHandler extends SimpleChannelInboundHandler<String> { /** * 讀取到客戶端的消息 */ @Override public void channelRead0(ChannelHandlerContext ctx, String request) { if (!request.isEmpty()) { System.out.println("接到客戶端的消息:" + request); } } /** * 數(shù)據(jù)讀取完畢 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } /** * 異常處理,打印異常并關(guān)閉通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } /** * 客戶端 */ static class MyNettyClient { public static void main(String[] args) { // 創(chuàng)建事件循環(huán)線程組(客戶端的線程組只有一個) EventLoopGroup group = new NioEventLoopGroup(); try { // Netty 客戶端啟動對象 Bootstrap b = new Bootstrap(); // 設(shè)置啟動參數(shù) b.group(group) // 設(shè)置通道類型 .channel(NioSocketChannel.class) // 設(shè)置啟動執(zhí)行器(負(fù)責(zé)啟動事件的業(yè)務(wù)執(zhí)行,ClientInitializer 為自定義的類) .handler(new ClientInitializer()); // 連接服務(wù)器端并同步通道 Channel ch = b.connect("127.0.0.1", PORT).sync().channel(); // 發(fā)送消息 ChannelFuture lastWriteFuture = null; // 給服務(wù)器端發(fā)送 10 條消息 for (int i = 0; i < 10; i++) { // 發(fā)送給服務(wù)器消息 lastWriteFuture = ch.writeAndFlush("Hi,Java.\n"); } // 在關(guān)閉通道之前,同步刷新所有的消息 if (lastWriteFuture != null) { lastWriteFuture.sync(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { // 釋放資源 group.shutdownGracefully(); } } } /** * 客戶端通道初始化類 */ static class ClientInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 客戶端連接成功之后業(yè)務(wù)處理 private static final ClientHandler CLIENT_HANDLER = new ClientHandler(); /** * 初始化客戶端通道 */ @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 設(shè)置結(jié)尾分隔符 pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter())); // 設(shè)置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 客戶端連接成功之后的業(yè)務(wù)處理 pipeline.addLast(CLIENT_HANDLER); } } /** * 客戶端連接成功之后的業(yè)務(wù)處理 */ static class ClientHandler extends SimpleChannelInboundHandler<String> { /** * 讀取到服務(wù)器端的消息 */ @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.err.println("接到服務(wù)器的消息:" + msg); } /** * 異常處理,打印異常并關(guān)閉通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } }
最終的執(zhí)行結(jié)果如下圖所示:
從上述結(jié)果中可以看出,Netty 可以正常使用了,它已經(jīng)不存在粘包和半包問題了。
2.封裝消息解決粘包問題
此解決方案的核心是將消息分為消息頭 + 消息體,在消息頭中保存消息體的長度,從而確定一條消息的邊界,這樣就避免了粘包和半包問題了,它的實現(xiàn)過程如下圖所示:
在 Netty 中可以通過 LengthFieldPrepender(編碼)和 LengthFieldBasedFrameDecoder(解碼)兩個類實現(xiàn)消息的封裝。和上一個解決方案類似,我們需要分別在服務(wù)器端和客戶端通過設(shè)置通道(Channel)來解決粘包問題。
服務(wù)器端的核心代碼如下:
/** * 服務(wù)端通道初始化 */ static class ServerInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 服務(wù)器端連接之后的執(zhí)行器(自定義的類) private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler(); /** * 初始化通道的具體執(zhí)行方法 */ @Override public void initChannel(SocketChannel ch) { // 通道 Channel 設(shè)置 ChannelPipeline pipeline = ch.pipeline(); // 18 行:消息解碼:讀取消息頭和消息體 pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); // 20 行:消息編碼:將消息封裝為消息頭和消息體,在消息前添加消息體的長度 pipeline.addLast(new LengthFieldPrepender(4)); // 設(shè)置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 服務(wù)器端連接之后的執(zhí)行器,接收到消息之后的業(yè)務(wù)處理 pipeline.addLast(SERVER_HANDLER); } }
其中核心代碼是 18 行和 20 行,通過 LengthFieldPrepender 實現(xiàn)編碼(將消息打包成消息頭 + 消息體),通過 LengthFieldBasedFrameDecoder 實現(xiàn)解碼(從封裝的消息中取出消息的內(nèi)容)。
LengthFieldBasedFrameDecoder 的參數(shù)說明如下:
鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)
參數(shù) 1:maxFrameLength - 發(fā)送的數(shù)據(jù)包最大長度;
參數(shù) 2:lengthFieldOffset - 長度域偏移量,指的是長度域位于整個數(shù)據(jù)包字節(jié)數(shù)組中的下標(biāo);
參數(shù) 3:lengthFieldLength - 長度域自己的字節(jié)數(shù)長度;
參數(shù) 4:lengthAdjustment – 長度域的偏移量矯正。如果長度域的值,除了包含有效數(shù)據(jù)域的長度外,還包含了其他域(如長度域自身)長度,那么,就需要進(jìn)行矯正。矯正的值為:包長 - 長度域的值 – 長度域偏移 – 長度域長;
參數(shù) 5:initialBytesToStrip – 丟棄的起始字節(jié)數(shù)。丟棄處于有效數(shù)據(jù)前面的字節(jié)數(shù)量。比如前面有 4 個節(jié)點的長度域,則它的值為 4。
LengthFieldBasedFrameDecoder(1024,0,4,0,4) 的意思是:數(shù)據(jù)包最大長度為 1024,長度域占首部的四個字節(jié),在讀數(shù)據(jù)的時候去掉首部四個字節(jié)(即長度域)。
客戶端的核心實現(xiàn)代碼如下:
/** * 客戶端通道初始化類 */ static class ClientInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 客戶端連接成功之后業(yè)務(wù)處理 private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler(); /** * 初始化客戶端通道 */ @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 消息解碼:讀取消息頭和消息體 pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); // 消息編碼:將消息封裝為消息頭和消息體,在響應(yīng)字節(jié)數(shù)據(jù)前面添加消息體長度 pipeline.addLast(new LengthFieldPrepender(4)); // 設(shè)置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 客戶端連接成功之后的業(yè)務(wù)處理 pipeline.addLast(CLIENT_HANDLER); } }
完整的服務(wù)器端和客戶端的實現(xiàn)代碼如下:
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * 通過封裝 Netty 來解決粘包 */ public class NettyExample { // 定義服務(wù)器的端口號 static final int PORT = 8007; /** * 服務(wù)器端 */ static class MyNettyServer { public static void main(String[] args) { // 創(chuàng)建一個線程組,用來負(fù)責(zé)接收客戶端連接 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 創(chuàng)建另一個線程組,用來負(fù)責(zé) I/O 的讀寫 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // 創(chuàng)建一個 Server 實例(可理解為 Netty 的入門類) ServerBootstrap b = new ServerBootstrap(); // 將兩個線程池設(shè)置到 Server 實例 b.group(bossGroup, workerGroup) // 設(shè)置 Netty 通道的類型為 NioServerSocket(非阻塞 I/O Socket 服務(wù)器) .channel(NioServerSocketChannel.class) // 設(shè)置建立連接之后的執(zhí)行器(ServerInitializer 是我創(chuàng)建的一個自定義類) .childHandler(new NettyExample.ServerInitializer()); // 綁定端口并且進(jìn)行同步 ChannelFuture future = b.bind(PORT).sync(); // 對關(guān)閉通道進(jìn)行監(jiān)聽 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 資源關(guān)閉 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } /** * 服務(wù)端通道初始化 */ static class ServerInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 服務(wù)器端連接之后的執(zhí)行器(自定義的類) private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler(); /** * 初始化通道的具體執(zhí)行方法 */ @Override public void initChannel(SocketChannel ch) { // 通道 Channel 設(shè)置 ChannelPipeline pipeline = ch.pipeline(); // 消息解碼:讀取消息頭和消息體 pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); // 消息編碼:將消息封裝為消息頭和消息體,在響應(yīng)字節(jié)數(shù)據(jù)前面添加消息體長度 pipeline.addLast(new LengthFieldPrepender(4)); // 設(shè)置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 服務(wù)器端連接之后的執(zhí)行器,接收到消息之后的業(yè)務(wù)處理 pipeline.addLast(SERVER_HANDLER); } } /** * 服務(wù)器端接收到消息之后的業(yè)務(wù)處理類 */ static class ServerHandler extends SimpleChannelInboundHandler<String> { /** * 讀取到客戶端的消息 */ @Override public void channelRead0(ChannelHandlerContext ctx, String request) { if (!request.isEmpty()) { System.out.println("接到客戶端的消息:" + request); } } /** * 數(shù)據(jù)讀取完畢 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } /** * 異常處理,打印異常并關(guān)閉通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } /** * 客戶端 */ static class MyNettyClient { public static void main(String[] args) { // 創(chuàng)建事件循環(huán)線程組(客戶端的線程組只有一個) EventLoopGroup group = new NioEventLoopGroup(); try { // Netty 客戶端啟動對象 Bootstrap b = new Bootstrap(); // 設(shè)置啟動參數(shù) b.group(group) // 設(shè)置通道類型 .channel(NioSocketChannel.class) // 設(shè)置啟動執(zhí)行器(負(fù)責(zé)啟動事件的業(yè)務(wù)執(zhí)行,ClientInitializer 為自定義的類) .handler(new NettyExample.ClientInitializer()); // 連接服務(wù)器端并同步通道 Channel ch = b.connect("127.0.0.1", PORT).sync().channel(); // 發(fā)送消息 ChannelFuture lastWriteFuture = null; // 給服務(wù)器端發(fā)送 10 條消息 for (int i = 0; i < 10; i++) { // 發(fā)送給服務(wù)器消息 lastWriteFuture = ch.writeAndFlush("Hi,Java.\n"); } // 在關(guān)閉通道之前,同步刷新所有的消息 if (lastWriteFuture != null) { lastWriteFuture.sync(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { // 釋放資源 group.shutdownGracefully(); } } } /** * 客戶端通道初始化類 */ static class ClientInitializer extends ChannelInitializer<SocketChannel> { // 字符串編碼器和解碼器 private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder(); // 客戶端連接成功之后業(yè)務(wù)處理 private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler(); /** * 初始化客戶端通道 */ @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 消息解碼:讀取消息頭和消息體 pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)); // 消息編碼:將消息封裝為消息頭和消息體,在響應(yīng)字節(jié)數(shù)據(jù)前面添加消息體長度 pipeline.addLast(new LengthFieldPrepender(4)); // 設(shè)置(字符串)編碼器和解碼器 pipeline.addLast(DECODER); pipeline.addLast(ENCODER); // 客戶端連接成功之后的業(yè)務(wù)處理 pipeline.addLast(CLIENT_HANDLER); } } /** * 客戶端連接成功之后的業(yè)務(wù)處理 */ static class ClientHandler extends SimpleChannelInboundHandler<String> { /** * 讀取到服務(wù)器端的消息 */ @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.err.println("接到服務(wù)器的消息:" + msg); } /** * 異常處理,打印異常并關(guān)閉通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } }
以上程序的執(zhí)行結(jié)果為:
“如何解決Socket粘包問題”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。