您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“Socket結(jié)合線程池怎么實(shí)現(xiàn)客戶端和服務(wù)端通信demo”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“Socket結(jié)合線程池怎么實(shí)現(xiàn)客戶端和服務(wù)端通信demo”吧!
可以使用 Socket 和 ServiceSocket 以及其它 API;
寫(xiě)一個(gè)客戶端和服務(wù)端之間 TCP 通信的例子;
服務(wù)端處理任務(wù)需要異步處理;
因?yàn)榉?wù)端處理能力很弱,只能同時(shí)處理 5 個(gè)請(qǐng)求,當(dāng)?shù)诹鶄€(gè)請(qǐng)求到達(dá)服務(wù)器時(shí),需要服務(wù)器返回明確的錯(cuò)誤信息:服務(wù)器太忙了,請(qǐng)稍后重試~。
需求比較簡(jiǎn)單,唯一復(fù)雜的地方在于第四點(diǎn),我們需要對(duì)客戶端的請(qǐng)求量進(jìn)行控制,首先我們需要確認(rèn)的是,我們是無(wú)法控制客戶端發(fā)送的請(qǐng)求數(shù)的,所以我們只能從服務(wù)端進(jìn)行改造,比如從服務(wù)端進(jìn)行限流。
有的同學(xué)可能很快想到,我們應(yīng)該使用 ServerSocket 的 backlog 的屬性,把其設(shè)置成 5,但我們?cè)谏弦徽轮姓f(shuō)到 backlog 并不能準(zhǔn)確代表限制的客戶端連接數(shù),而且我們還要求服務(wù)端返回具體的錯(cuò)誤信息,即使 backlog 生效,也只會(huì)返回固定的錯(cuò)誤信息,不是我們定制的錯(cuò)誤信息。
我們好好想想,線程池似乎可以做這個(gè)事情,我們可以把線程池的 coreSize 和 maxSize 都設(shè)置成 4,把隊(duì)列大小設(shè)置成 1,這樣服務(wù)端每次收到請(qǐng)求后,會(huì)先判斷一下線程池中的隊(duì)列有沒(méi)有數(shù)據(jù),如果有的話,說(shuō)明當(dāng)前服務(wù)器已經(jīng)馬上就要處理第五個(gè)請(qǐng)求了,當(dāng)前請(qǐng)求就是第六個(gè)請(qǐng)求,應(yīng)該被拒絕。
正好線程池的加入也可以滿足第三點(diǎn),服務(wù)端的任務(wù)可以異步執(zhí)行。
客戶端的代碼比較簡(jiǎn)單,直接向服務(wù)器請(qǐng)求數(shù)據(jù)即可,代碼如下:
public class SocketClient { private static final Integer SIZE = 1024; private static final ThreadPoolExecutor socketPoll = new ThreadPoolExecutor(50, 50, 365L, TimeUnit.DAYS, new LinkedBlockingQueue<>(400)); @Test public void test() throws InterruptedException { // 模擬客戶端同時(shí)向服務(wù)端發(fā)送 6 條消息 for (int i = 0; i < 6; i++) { socketPoll.submit(() -> { send("localhost", 7007, "nihao"); }); } Thread.sleep(1000000000); } /** * 發(fā)送tcp * * @param domainName 域名 * @param port 端口 * @param content 發(fā)送內(nèi)容 */ public static String send(String domainName, int port, String content) { log.info("客戶端開(kāi)始運(yùn)行"); Socket socket = null; OutputStream outputStream = null; InputStreamReader isr = null; BufferedReader br = null; InputStream is = null; StringBuffer response = null; try { if (StringUtils.isBlank(domainName)) { return null; } // 無(wú)參構(gòu)造器初始化 Socket,默認(rèn)底層協(xié)議是 TCP socket = new Socket(); socket.setReuseAddress(true); // 客戶端準(zhǔn)備連接服務(wù)端,設(shè)置超時(shí)時(shí)間 10 秒 socket.connect(new InetSocketAddress(domainName, port), 10000); log.info("TCPClient 成功和服務(wù)端建立連接"); // 準(zhǔn)備發(fā)送消息給服務(wù)端 outputStream = socket.getOutputStream(); // 設(shè)置 UTF 編碼,防止亂碼 byte[] bytes = content.getBytes(Charset.forName("UTF-8")); // 輸出字節(jié)碼 segmentWrite(bytes, outputStream); // 關(guān)閉輸出 socket.shutdownOutput(); log.info("TCPClient 發(fā)送內(nèi)容為 {}",content); // 等待服務(wù)端的返回 socket.setSoTimeout(50000);//50秒還沒(méi)有得到數(shù)據(jù),直接斷開(kāi)連接 // 得到服務(wù)端的返回流 is = socket.getInputStream(); isr = new InputStreamReader(is); br = new BufferedReader(isr); // 從流中讀取返回值 response = segmentRead(br); // 關(guān)閉輸入流 socket.shutdownInput(); //關(guān)閉各種流和套接字 close(socket, outputStream, isr, br, is); log.info("TCPClient 接受到服務(wù)端返回的內(nèi)容為 {}",response); return response.toString(); } catch (ConnectException e) { log.error("TCPClient-send socket連接失敗", e); throw new RuntimeException("socket連接失敗"); } catch (Exception e) { log.error("TCPClient-send unkown errror", e); throw new RuntimeException("socket連接失敗"); } finally { try { close(socket, outputStream, isr, br, is); } catch (Exception e) { // do nothing } } } /** * 關(guān)閉各種流 * * @param socket * @param outputStream * @param isr * @param br * @param is * @throws IOException */ public static void close(Socket socket, OutputStream outputStream, InputStreamReader isr, BufferedReader br, InputStream is) throws IOException { if (null != socket && !socket.isClosed()) { try { socket.shutdownOutput(); } catch (Exception e) { } try { socket.shutdownInput(); } catch (Exception e) { } try { socket.close(); } catch (Exception e) { } } if (null != outputStream) { outputStream.close(); } if (null != br) { br.close(); } if (null != isr) { isr.close(); } if (null != is) { is.close(); } } /** * 分段讀 * * @param br * @throws IOException */ public static StringBuffer segmentRead(BufferedReader br) throws IOException { StringBuffer sb = new StringBuffer(); String line; while ((line = br.readLine()) != null) { sb.append(line); } return sb; } /** * 分段寫(xiě) * * @param bytes * @param outputStream * @throws IOException */ public static void segmentWrite(byte[] bytes, OutputStream outputStream) throws IOException { int length = bytes.length; int start, end = 0; for (int i = 0; end != bytes.length; i++) { start = i == 0 ? 0 : i * SIZE; end = length > SIZE ? start + SIZE : bytes.length; length -= SIZE; outputStream.write(bytes, start, end - start); outputStream.flush(); } } }
客戶端代碼中我們也用到了線程池,主要是為了并發(fā)模擬客戶端一次性發(fā)送 6 個(gè)請(qǐng)求,按照預(yù)期服務(wù)端在處理第六個(gè)請(qǐng)求的時(shí)候,會(huì)返回特定的錯(cuò)誤信息給客戶端。
以上代碼主要方法是 send 方法,主要處理像服務(wù)端發(fā)送數(shù)據(jù),并處理服務(wù)端的響應(yīng)。
服務(wù)端的邏輯分成兩個(gè)部分,第一部分是控制客戶端的請(qǐng)求個(gè)數(shù),當(dāng)超過(guò)服務(wù)端的能力時(shí),拒絕新的請(qǐng)求,當(dāng)服務(wù)端能力可響應(yīng)時(shí),放入新的請(qǐng)求,第二部分是服務(wù)端任務(wù)的執(zhí)行邏輯。
public class SocketServiceStart { /** * 服務(wù)端的線程池,兩個(gè)作用 * 1:讓服務(wù)端的任務(wù)可以異步執(zhí)行 * 2:管理可同時(shí)處理的服務(wù)端的請(qǐng)求數(shù) */ private static final ThreadPoolExecutor collectPoll = new ThreadPoolExecutor(4, 4, 365L, TimeUnit.DAYS, new LinkedBlockingQueue<>( 1)); @Test public void test(){ start(); } /** * 啟動(dòng)服務(wù)端 */ public static final void start() { log.info("SocketServiceStart 服務(wù)端開(kāi)始啟動(dòng)"); try { // backlog serviceSocket處理阻塞時(shí),客戶端最大的可創(chuàng)建連接數(shù),超過(guò)客戶端連接不上 // 當(dāng)線程池能力處理滿了之后,我們希望盡量阻塞客戶端的連接 // ServerSocket serverSocket = new ServerSocket(7007,1,null); // 初始化服務(wù)端 ServerSocket serverSocket = new ServerSocket(); serverSocket.setReuseAddress(true); // serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), 80)); serverSocket.bind(new InetSocketAddress("localhost", 7007)); log.info("SocketServiceStart 服務(wù)端啟動(dòng)成功"); // 自旋,讓客戶端一直在取客戶端的請(qǐng)求,如果客戶端暫時(shí)沒(méi)有請(qǐng)求,會(huì)一直阻塞 while (true) { // 接受客戶端的請(qǐng)求 Socket socket = serverSocket.accept(); // 如果隊(duì)列中有數(shù)據(jù)了,說(shuō)明服務(wù)端已經(jīng)到了并發(fā)處理的極限了,此時(shí)需要返回客戶端有意義的信息 if (collectPoll.getQueue().size() >= 1) { log.info("SocketServiceStart 服務(wù)端處理能力到頂,需要控制客戶端的請(qǐng)求"); //返回處理結(jié)果給客戶端 rejectRequest(socket); continue; } try { // 異步處理客戶端提交上來(lái)的任務(wù) collectPoll.submit(new SocketService(socket)); } catch (Exception e) { socket.close(); } } } catch (Exception e) { log.error("SocketServiceStart - start error", e); throw new RuntimeException(e); } catch (Throwable e) { log.error("SocketServiceStart - start error", e); throw new RuntimeException(e); } } // 返回特定的錯(cuò)誤碼給客戶端 public static void rejectRequest(Socket socket) throws IOException { OutputStream outputStream = null; try{ outputStream = socket.getOutputStream(); byte[] bytes = "服務(wù)器太忙了,請(qǐng)稍后重試~".getBytes(Charset.forName("UTF-8")); SocketClient.segmentWrite(bytes, outputStream); socket.shutdownOutput(); }finally { //關(guān)閉流 SocketClient.close(socket,outputStream,null,null,null); } } }
我們使用 collectPoll.getQueue().size() >= 1 來(lái)判斷目前服務(wù)端是否已經(jīng)到達(dá)處理的極限了,如果隊(duì)列中有一個(gè)任務(wù)正在排隊(duì),說(shuō)明當(dāng)前服務(wù)端已經(jīng)超負(fù)荷運(yùn)行了,新的請(qǐng)求應(yīng)該拒絕掉,如果隊(duì)列中沒(méi)有數(shù)據(jù),說(shuō)明服務(wù)端還可以接受新的請(qǐng)求。
以上代碼注釋詳細(xì),就不累贅說(shuō)了。
服務(wù)端的處理邏輯比較簡(jiǎn)單,主要步驟是:從客戶端的 Socket 中讀取輸入,進(jìn)行處理,把響應(yīng)返回給客戶端。
我們使用線程沉睡 2 秒來(lái)模擬服務(wù)端的處理邏輯,代碼如下:
public class SocketService implements Runnable { private Socket socket; public SocketService() { } public SocketService(Socket socket) { this.socket = socket; } @Override public void run() { log.info("SocketService 服務(wù)端任務(wù)開(kāi)始執(zhí)行"); OutputStream outputStream = null; InputStream is = null; InputStreamReader isr = null; BufferedReader br = null; try { //接受消息 socket.setSoTimeout(10000);// 10秒還沒(méi)有得到數(shù)據(jù),直接斷開(kāi)連接 is = socket.getInputStream(); isr = new InputStreamReader(is,"UTF-8"); br = new BufferedReader(isr); StringBuffer sb = SocketClient.segmentRead(br); socket.shutdownInput(); log.info("SocketService accept info is {}", sb.toString()); //服務(wù)端處理 模擬服務(wù)端處理耗時(shí) Thread.sleep(2000); String response = sb.toString(); //返回處理結(jié)果給客戶端 outputStream = socket.getOutputStream(); byte[] bytes = response.getBytes(Charset.forName("UTF-8")); SocketClient.segmentWrite(bytes, outputStream); socket.shutdownOutput(); //關(guān)閉流 SocketClient.close(socket,outputStream,isr,br,is); log.info("SocketService 服務(wù)端任務(wù)執(zhí)行完成"); } catch (IOException e) { log.error("SocketService IOException", e); } catch (Exception e) { log.error("SocketService Exception", e); } finally { try { SocketClient.close(socket,outputStream,isr,br,is); } catch (IOException e) { log.error("SocketService IOException", e); } } } }
測(cè)試的時(shí)候,我們必須先啟動(dòng)服務(wù)端,然后再啟動(dòng)客戶端,首先我們啟動(dòng)服務(wù)端,打印日志如下:
接著我們啟動(dòng)客戶端,打印日志如下:
我們最后看一下服務(wù)端的運(yùn)行日志:
從以上運(yùn)行結(jié)果中,我們可以看出得出的結(jié)果是符合我們預(yù)期的,服務(wù)端在請(qǐng)求高峰時(shí),能夠并發(fā)處理5個(gè)請(qǐng)求,其余請(qǐng)求可以用正確的提示進(jìn)行拒絕。
到此,相信大家對(duì)“Socket結(jié)合線程池怎么實(shí)現(xiàn)客戶端和服務(wù)端通信demo”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。