您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“如何解決Java Socket通信技術(shù)收發(fā)線程互斥的問(wèn)題”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
Java Socket通信技術(shù)在很長(zhǎng)的時(shí)間里都在使用,在不少的程序員眼中都有很多高的評(píng)價(jià)。那么下面我們就看看如何才能掌握這門復(fù)雜的編程語(yǔ)言,希望大家在今后的Java Socket通信技術(shù)使用中有所收獲。
下面就是Java Socket通信技術(shù)在解決收發(fā)線程互斥的代碼介紹。
package com.bill99.svr;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
/**
*<p>title: socket通信包裝類</p>
*<p>Description: </p>
*<p>CopyRight: CopyRight (c) 2009</p>
*<p>Company: 99bill.com</p>
*<p>Create date: 2009-10-14</P>
*author sunnylocus<A href="mailto:sunnylocus@163.com">
</A> * v0.10 2009-10-14 初類
* v0.11 2009-11-12 對(duì)命令收發(fā)邏輯及收發(fā)線程互斥機(jī)制進(jìn)行了優(yōu)化,
處理命令速度由原來(lái)8~16個(gè)/秒提高到25~32個(gè)/秒*/ public class SocketConnection {
private volatile Socket socket;
private int timeout = 1000*10; //超時(shí)時(shí)間,初始值10秒
private boolean isLaunchHeartcheck = false;//是否已啟動(dòng)心跳檢測(cè)
private boolean isNetworkConnect = false; //網(wǎng)絡(luò)是否已連接
private static String host = "";
private static int port;
static InputStream inStream = null;
static OutputStream outStream = null;
private static Logger log =Logger.getLogger
(SocketConnection.class);private static SocketConnection socketConnection = null;
private static java.util.Timer heartTimer=null;
//private final Map<String, Object> recMsgMap= Collections.
synchronizedMap(new HashMap<String, Object>());private final ConcurrentHashMap<String, Object> recMsgMap
= new ConcurrentHashMap<String, Object>();private static Thread receiveThread = null;
private final ReentrantLock lock = new ReentrantLock();
private SocketConnection(){
Properties conf = new Properties();
try {
conf.load(SocketConnection.class.getResourceAsStream
("test.conf"));this.timeout = Integer.valueOf(conf.getProperty("timeout"));
init(conf.getProperty("ip"),Integer.valueOf
(conf.getProperty("port")));} catch(IOException e) {
log.fatal("socket初始化異常!",e);
throw new RuntimeException("socket初始化異常,請(qǐng)檢查配置參數(shù)");
}
}
/**
* 單態(tài)模式
*/
public static SocketConnection getInstance() {
if(socketConnection==null) {
synchronized(SocketConnection.class) {
if(socketConnection==null) {
socketConnection = new SocketConnection();
return socketConnection;
}
}
}
return socketConnection;
}
private void init(String host,int port) throws IOException {
InetSocketAddress addr = new InetSocketAddress(host,port);
socket = new Socket();
synchronized (this) {
log.info("【準(zhǔn)備與"+addr+"建立連接】");
socket.connect(addr, timeout);
log.info("【與"+addr+"連接已建立】");
inStream = socket.getInputStream();
outStream = socket.getOutputStream();
socket.setTcpNoDelay(true);//數(shù)據(jù)不作緩沖,立即發(fā)送
socket.setSoLinger(true, 0);//socket關(guān)閉時(shí),立即釋放資源
socket.setKeepAlive(true);
socket.setTrafficClass(0x04|0x10);//高可靠性和最小延遲傳輸
isNetworkConnect=true;
receiveThread = new Thread(new ReceiveWorker());
receiveThread.start();
SocketConnection.host=host;
SocketConnection.port=port;
if(!isLaunchHeartcheck)
launchHeartcheck();
}
}
/**
* 心跳包檢測(cè)
*/
private void launchHeartcheck() {
if(socket == null)
throw new IllegalStateException("socket is not
established!");heartTimer = new Timer();
isLaunchHeartcheck = true;
heartTimer.schedule(new TimerTask() {
public void run() {
String msgStreamNo = StreamNoGenerator.getStreamNo("kq");
int mstType =9999;//999-心跳包請(qǐng)求
SimpleDateFormat dateformate = new SimpleDateFormat
("yyyyMMddHHmmss");String msgDateTime = dateformate.format(new Date());
int msgLength =38;//消息頭長(zhǎng)度
String commandstr = "00" +msgLength + mstType + msgStreamNo;
log.info("心跳檢測(cè)包 -> IVR "+commandstr);
int reconnCounter = 1;
while(true) {
String responseMsg =null;
try {
responseMsg = readReqMsg(commandstr);
} catch (IOException e) {
log.error("IO流異常",e);
reconnCounter ++;
}
if(responseMsg!=null) {
log.info("心跳響應(yīng)包 <- IVR "+responseMsg);
reconnCounter = 1;
break;
} else {
reconnCounter ++;
}
if(reconnCounter >3) {//重連次數(shù)已達(dá)三次,判定網(wǎng)絡(luò)連接中斷,
重新建立連接。連接未被建立時(shí)不釋放鎖reConnectToCTCC(); break;
}
}
}
},1000 * 60*1,1000*60*2);
}
/**
* 重連與目標(biāo)IP建立重連
*/
private void reConnectToCTCC() {
new Thread(new Runnable(){
public void run(){
log.info("重新建立與"+host+":"+port+"的連接");
//清理工作,中斷計(jì)時(shí)器,中斷接收線程,恢復(fù)初始變量
heartTimer.cancel();
isLaunchHeartcheck=false;
isNetworkConnect = false;
receiveThread.interrupt();
try {
socket.close();
} catch (IOException e1) {log.error("重連時(shí),關(guān)閉socket連
接發(fā)生IO流異常",e1);}//----------------
synchronized(this){
for(; ;){
try {
Thread.currentThread();
Thread.sleep(1000 * 1);
init(host,port);
this.notifyAll();
break ;
} catch (IOException e) {
log.error("重新建立連接未成功",e);
} catch (InterruptedException e){
log.error("重連線程中斷",e);
}
}
}
}
}).start();
}
/**
* 發(fā)送命令并接受響應(yīng)
* @param requestMsg
* @return
* @throws SocketTimeoutException
* @throws IOException
*/
public String readReqMsg(String requestMsg) throws IOException {
if(requestMsg ==null) {
return null;
}
if(!isNetworkConnect) {
synchronized(this){
try {
this.wait(1000*5); //等待5秒,如果網(wǎng)絡(luò)還沒(méi)有恢復(fù),拋出IO流異常
if(!isNetworkConnect) {
throw new IOException("網(wǎng)絡(luò)連接中斷!");
}
} catch (InterruptedException e) {
log.error("發(fā)送線程中斷",e);
}
}
}
String msgNo = requestMsg.substring(8, 8 + 24);//讀取流水號(hào)
outStream = socket.getOutputStream();
outStream.write(requestMsg.getBytes());
outStream.flush();
Condition msglock = lock.newCondition(); //消息鎖
//注冊(cè)等待接收消息
recMsgMap.put(msgNo, msglock);
try {
lock.lock();
msglock.await(timeout,TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("發(fā)送線程中斷",e);
} finally {
lock.unlock();
}
Object respMsg = recMsgMap.remove(msgNo); //響應(yīng)信息
if(respMsg!=null &&(respMsg != msglock)) {
//已經(jīng)接收到消息,注銷等待,成功返回消息
return (String) respMsg;
} else {
log.error(msgNo+" 超時(shí),未收到響應(yīng)消息");
throw new SocketTimeoutException(msgNo+" 超時(shí),未收到響應(yīng)消息");
}
}
public void finalize() {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
//消息接收線程
private class ReceiveWorker implements Runnable {
String intStr= null;
public void run() {
while(!Thread.interrupted()){
try {
byte[] headBytes = new byte[4];
if(inStream.read(headBytes)==-1){
log.warn("讀到流未尾,對(duì)方已關(guān)閉流!");
reConnectToCTCC();//讀到流未尾,對(duì)方已關(guān)閉流
return;
}
byte[] tmp =new byte[4];
tmp = headBytes;
String tempStr = new String(tmp).trim();
if(tempStr==null || tempStr.equals("")) {
log.error("received message is null");
continue;
}
intStr = new String(tmp);
int totalLength =Integer.parseInt(intStr);
//----------------
byte[] msgBytes = new byte[totalLength-4];
inStream.read(msgBytes);
String resultMsg = new String(headBytes)+ new
String(msgBytes);//抽出消息ID
String msgNo = resultMsg.substring(8, 8 + 24);
Condition msglock =(Condition) recMsgMap.get(msgNo);
if(msglock ==null) {
log.warn(msgNo+"序號(hào)可能已被注銷!響應(yīng)消息丟棄");
recMsgMap.remove(msgNo);
continue;
}
recMsgMap.put(msgNo, resultMsg);
try{
lock.lock();
msglock.signalAll();
}finally {
lock.unlock();
}
}catch(SocketException e){
log.error("服務(wù)端關(guān)閉socket",e);
reConnectToCTCC();
} catch(IOException e) {
log.error("接收線程讀取響應(yīng)數(shù)據(jù)時(shí)發(fā)生IO流異常",e);
} catch(NumberFormatException e){
log.error("收到?jīng)]良心包,String轉(zhuǎn)int異常,異常字符:"+intStr);
}
}
}
}
}
“如何解決Java Socket通信技術(shù)收發(fā)線程互斥的問(wèn)題”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。