您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“JAVA如何實(shí)現(xiàn)心跳檢測”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“JAVA如何實(shí)現(xiàn)心跳檢測”吧!
在分布式系統(tǒng)中,分布在不同主機(jī)上的節(jié)點(diǎn)需要檢測其他節(jié)點(diǎn)的狀態(tài),如服務(wù)器節(jié)點(diǎn)需要檢測從節(jié)點(diǎn)是否失效。為了檢測對(duì)方節(jié)點(diǎn)的有效性,每隔固定時(shí)間就發(fā)送一個(gè)固定信息給對(duì)方,對(duì)方回復(fù)一個(gè)固定信息,如果長時(shí)間沒有收到對(duì)方的回復(fù),則斷開與對(duì)方的連接。
發(fā)包方既可以是服務(wù)端,也可以是客戶端,這要看具體實(shí)現(xiàn)。因?yàn)槭敲扛艄潭〞r(shí)間發(fā)送一次,類似心跳,所以發(fā)送的固定信息稱為心跳包。心跳包一般為比較小的包,可根據(jù)具體實(shí)現(xiàn)。心跳包主要應(yīng)用于長連接的保持與短線鏈接。
一般而言,應(yīng)該客戶端主動(dòng)向服務(wù)器發(fā)送心跳包,因?yàn)榉?wù)器向客戶端發(fā)送心跳包會(huì)影響服務(wù)器的性能。
實(shí)現(xiàn)原理:長連接的維持,是要客戶端程序,定時(shí)向服務(wù)端程序,發(fā)送一個(gè)維持連接包的。如果,長時(shí)間未發(fā)送維持連接包,服務(wù)端程序?qū)嚅_連接。
心跳機(jī)制有兩種實(shí)現(xiàn)方式,一種基于TCP自帶的心跳包,TCP的SO_KEEPALIVE選項(xiàng)可以,系統(tǒng)默認(rèn)的默認(rèn)跳幀頻率為2小時(shí),超過2小時(shí)后,本地的TCP 實(shí)現(xiàn)會(huì)發(fā)送一個(gè)數(shù)據(jù)包給遠(yuǎn)程的 Socket. 如果遠(yuǎn)程Socket 沒有發(fā)回響應(yīng), TCP實(shí)現(xiàn)就會(huì)持續(xù)嘗試 11 分鐘, 直到接收到響應(yīng)為止。 否則就會(huì)自動(dòng)斷開Socket連接。但TCP自帶的心跳包無法檢測比較敏感地知道對(duì)方的狀態(tài),默認(rèn)2小時(shí)的空閑時(shí)間,對(duì)于大多數(shù)的應(yīng)用而言太長了??梢允止ら_啟KeepAlive功能并設(shè)置合理的KeepAlive參數(shù)。
另一種在應(yīng)用層自己進(jìn)行實(shí)現(xiàn),基本步驟如下:
1、Client使用定時(shí)器,不斷發(fā)送心跳;
2、Server收到心跳后,回復(fù)一個(gè)包;
3、Server為每個(gè)Client啟動(dòng)超時(shí)定時(shí)器,如果在指定時(shí)間內(nèi)沒有收到Client的心跳包,則Client失效。
Client通過持有Socket的對(duì)象,可以隨時(shí)(使用sendObject方法)發(fā)送Massage Object(消息)給服務(wù)端。
如果keepAliveDelay毫秒(程序中是2秒)內(nèi)未發(fā)送任何數(shù)據(jù),則自動(dòng)發(fā)送一個(gè)KeepAlive Object(心跳)給服務(wù)端,用于維持連接。
由于,我們向服務(wù)端,可以發(fā)送很多不同的消息對(duì)象,服務(wù)端也可以返回不同的對(duì)象。所以,對(duì)于返回對(duì)象的處理,要編寫具體的ObjectAction實(shí)現(xiàn)類進(jìn)行處理。通過Client.addActionMap方法進(jìn)行添加。這樣,程序會(huì)回調(diào)處理
由于客戶端會(huì)定時(shí)(keepAliveDelay毫秒)發(fā)送維持連接的信息過來,所以,服務(wù)端要有一個(gè)檢測機(jī)制。
即當(dāng)服務(wù)端receiveTimeDelay毫秒(程序中是3秒)內(nèi)未接收任何數(shù)據(jù),則自動(dòng)斷開與客戶端的連接。
ActionMapping的原理與客戶端相似(相同)。
通過添加相應(yīng)的ObjectAction實(shí)現(xiàn)類,可以實(shí)現(xiàn)不同對(duì)象的響應(yīng)、應(yīng)答過程。
package com.zyz.mynative.demo02; /** * @author zyz * @version 1.0 * @data 2023/2/15 11:05 * @Description: */ import java.io.Serializable; import java.text.SimpleDateFormat; import java.util.Date; /** * * 維持連接的消息對(duì)象(心跳對(duì)象) * @author zyz * @version 1.0 * @data 2023/2/15 11:05 * @Description: */ public class KeepAlive implements Serializable{ private static final long serialVersionUID = -2813120366138988480L; /* 覆蓋該方法,僅用于測試使用。 * @see java.lang.Object#toString() */ @Override public String toString() { return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())+"\t維持連接包"; } }
package com.zyz.mynative.demo02; /** * @author zyz * @version 1.0 * @data 2023/2/15 11:06 * @Description: */ import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.Socket; import java.net.UnknownHostException; import java.util.concurrent.ConcurrentHashMap; /** * C/S架構(gòu)的客戶端對(duì)象,持有該對(duì)象,可以隨時(shí)向服務(wù)端發(fā)送消息。 * * @author zyz * @version 1.0 * @data 2023/2/15 11:06 * @Description: */ public class MyClient { /** * 處理服務(wù)端發(fā)回的對(duì)象,可實(shí)現(xiàn)該接口。 */ public static interface ObjectAction { void doAction(Object obj, MyClient client); } public static final class DefaultObjectAction implements ObjectAction { @Override public void doAction(Object obj, MyClient client) { System.out.println("處理:\t" + obj.toString()); } } private String serverIp; private int port; private Socket socket; //連接狀態(tài) private boolean running = false; /** * 最后一次發(fā)送數(shù)據(jù)的時(shí)間 */ private long lastSendTime; /** * 用于保存接收消息對(duì)象類型及該類型消息處理的對(duì)象 */ private ConcurrentHashMap<Class, ObjectAction> actionMapping = new ConcurrentHashMap<Class, ObjectAction>(); public MyClient(String serverIp, int port) { this.serverIp = serverIp; this.port = port; } public void start() throws UnknownHostException, IOException { if (running) { return; } socket = new Socket(serverIp, port); System.out.println("本地端口:" + socket.getLocalPort()); lastSendTime = System.currentTimeMillis(); running = true; //保持長連接的線程,每隔2秒項(xiàng)服務(wù)器發(fā)一個(gè)一個(gè)保持連接的心跳消息 new Thread(new KeepAliveWatchDog()).start(); //接受消息的線程,處理消息 new Thread(new ReceiveWatchDog()).start(); } public void stop() { if (running) { running = false; } ; } /** * 添加接收對(duì)象的處理對(duì)象。 * * @param cls 待處理的對(duì)象,其所屬的類。 * @param action 處理過程對(duì)象。 */ public void addActionMap(Class<Object> cls, ObjectAction action) { actionMapping.put(cls, action); } public void sendObject(Object obj) throws IOException { ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream()); oos.writeObject(obj); System.out.println("發(fā)送:\t" + obj); oos.flush(); } class KeepAliveWatchDog implements Runnable { long checkDelay = 10; //兩秒鐘檢測一次 long keepAliveDelay = 2000; @Override public void run() { while (running) { if (System.currentTimeMillis() - lastSendTime > keepAliveDelay) { try { MyClient.this.sendObject(new KeepAlive()); } catch (IOException e) { e.printStackTrace(); MyClient.this.stop(); } lastSendTime = System.currentTimeMillis(); } else { try { Thread.sleep(checkDelay); } catch (InterruptedException e) { e.printStackTrace(); MyClient.this.stop(); } } } } } class ReceiveWatchDog implements Runnable { @Override public void run() { while (running) { try { InputStream in = socket.getInputStream(); if (in.available() > 0) { ObjectInputStream ois = new ObjectInputStream(in); Object obj = ois.readObject(); System.out.println("接收:\t" + obj); ObjectAction oa = actionMapping.get(obj.getClass()); oa = oa == null ? new DefaultObjectAction() : oa; oa.doAction(obj, MyClient.this); } else { Thread.sleep(10); } } catch (Exception e) { e.printStackTrace(); MyClient.this.stop(); } } } } public static void main(String[] args) throws UnknownHostException, IOException { String serverIp = "127.0.0.1"; int port = 65432; MyClient client = new MyClient(serverIp, port); client.start(); } }
package com.zyz.mynative.demo02; import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ConcurrentHashMap; /** * C/S架構(gòu)的服務(wù)端對(duì)象。 * * @author zyz * @version 1.0 * @data 2023/2/15 11:08 * @Description: */ public class MyServer { /** * 要處理客戶端發(fā)來的對(duì)象,并返回一個(gè)對(duì)象,可實(shí)現(xiàn)該接口。 */ public interface ObjectAction { Object doAction(Object rev, MyServer server); } public static final class DefaultObjectAction implements ObjectAction { @Override public Object doAction(Object rev, MyServer server) { System.out.println("處理并返回:" + rev); return rev; } } private int port; private volatile boolean running = false; private long receiveTimeDelay = 3000; /** * ConcurrentHashMap是線程安全的,ConcurrentHashMap并非鎖住整個(gè)方法, * 而是通過原子操作和局部加鎖的方法保證了多線程的線程安全,且盡可能減少了性能損耗。 */ private ConcurrentHashMap<Class, ObjectAction> actionMapping = new ConcurrentHashMap<Class, ObjectAction>(); private Thread connWatchDog; public MyServer(int port) { this.port = port; } /** * 通過繼承Runnable ,開啟線程 */ public void start() { if (running) { return; } ; running = true; connWatchDog = new Thread(new ConnWatchDog()); connWatchDog.start(); } @SuppressWarnings("deprecation") public void stop() { if (running) { running = false; } ; if (connWatchDog != null) { connWatchDog.stop(); } ; } public void addActionMap(Class<Object> cls, ObjectAction action) { actionMapping.put(cls, action); } /** * 繼承Runnable 重寫run方法、實(shí)現(xiàn)線程 */ class ConnWatchDog implements Runnable { @Override public void run() { try { ServerSocket ss = new ServerSocket(port, 5); while (running) { Socket s = ss.accept(); new Thread(new SocketAction(s)).start(); } } catch (IOException e) { e.printStackTrace(); MyServer.this.stop(); } } } /** * 繼承Runnable 重寫run方法、實(shí)現(xiàn)線程,通過匿名內(nèi)部類 */ class SocketAction implements Runnable { Socket s; boolean run = true; long lastReceiveTime = System.currentTimeMillis(); public SocketAction(Socket s) { this.s = s; } @Override public void run() { while (running && run) { if (System.currentTimeMillis() - lastReceiveTime > receiveTimeDelay) { overThis(); } else { try { InputStream in = s.getInputStream(); if (in.available() > 0) { ObjectInputStream ois = new ObjectInputStream(in); Object obj = ois.readObject(); lastReceiveTime = System.currentTimeMillis(); System.out.println("接收:\t" + obj); ObjectAction oa = actionMapping.get(obj.getClass()); oa = oa == null ? new DefaultObjectAction() : oa; Object out = oa.doAction(obj, MyServer.this); if (out != null) { ObjectOutputStream oos = new ObjectOutputStream(s.getOutputStream()); oos.writeObject(out); oos.flush(); } } else { Thread.sleep(10); } } catch (Exception e) { e.printStackTrace(); overThis(); } } } } private void overThis() { if (run) { run = false; } ; if (s != null) { try { s.close(); } catch (IOException e) { e.printStackTrace(); } } System.out.println("關(guān)閉:" + s.getRemoteSocketAddress()); } } public static void main(String[] args) { int port = 65432; MyServer server = new MyServer(port); server.start(); } }
先開啟服務(wù)端,在開啟客戶端
到此,相信大家對(duì)“JAVA如何實(shí)現(xiàn)心跳檢測”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。