溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

在android上如何使用WebSocket做消息通信功能

發(fā)布時(shí)間:2020-07-15 11:07:56 來(lái)源:億速云 閱讀:346 作者:清晨 欄目:開(kāi)發(fā)技術(shù)

小編給大家分享一下在android上如何使用WebSocket做消息通信功能,希望大家閱讀完這篇文章后大所收獲,下面讓我們一起去探討吧!

前言

消息推送功能可以說(shuō)移動(dòng)APP不可缺少的功能之一,一般簡(jiǎn)單的推送我們可以使用第三方推送的SDK,比如極光推送、信鴿推送等,但是對(duì)于消息聊天這種及時(shí)性有要求的或者三方推送不滿足業(yè)務(wù)需求的,我們就需要使用WebSocket實(shí)現(xiàn)消息推送功能。

基本流程

WebSocket是什么,這里就不做介紹了,我們這里使用的開(kāi)源框架是https://github.com/TakahikoKawasaki/nv-websocket-client

基于開(kāi)源協(xié)議我們封裝實(shí)現(xiàn)WebSocket的連接、注冊(cè)、心跳、消息分發(fā)、超時(shí)任務(wù)功能,基本流程如下:

在android上如何使用WebSocket做消息通信功能

連接功能

首先我們新建一個(gè)項(xiàng)目,在build.grade中添加配置

compile 'com.neovisionaries:nv-websocket-client:2.2'

新建websocket管理類(lèi)WsManger

public class WsManager {
 
 private volatile static WsManager wsManger;
 
 private WsManager() {
 }
 
 public static WsManager getWsManger() {
 if (wsManger == null) {
 synchronized (WsManager.class) {
 if (wsManger == null) {
  wsManger = new WsManager();
 }
 }
 }
 return wsManger;
 }
 
 
}

接下來(lái)添加連接方法,我們將webSocket的狀態(tài)分為三種,新建WsStatue枚舉類(lèi)對(duì)應(yīng)起來(lái)

public enum WsStatus {
 
 /**
 * 連接成功
 */
 CONNECT_SUCCESS,
 /**
 * 連接失敗
 */
 CONNECT_FAIL,
 /**
 * 正在連接
 */
 CONNECTING;
}

連接方法如下所示:

/**
 * 連接方法 這里要判斷是否登錄 此處省略
 */
public void connect() {
 //WEB_SOCKET_API 是連接的url地址,
 // CONNECT_TIMEOUT是連接的超時(shí)時(shí)間 這里是 5秒
 try {
 ws = new WebSocketFactory().createSocket(WEB_SOCKET_API, CONNECT_TIMEOUT)
 //設(shè)置幀隊(duì)列最大值為5
 .setFrameQueueSize(5)
 //設(shè)置不允許服務(wù)端關(guān)閉連接卻未發(fā)送關(guān)閉幀
 .setMissingCloseFrameAllowed(false)
 //添加回調(diào)監(jiān)聽(tīng)
 .addListener(new WsListener())
 //異步連接
 .connectAsynchronously();
 } catch (IOException e) {
 e.printStackTrace();
 }
 setStatus(WsStatus.CONNECTING);
}

調(diào)用連接方法后 我們來(lái)看連接的回調(diào) 也就是WsListener

/**
 * websocket回調(diào)事件
 */
private class WsListener extends WebSocketAdapter {
 
 
 @Override
 public void onConnected(WebSocket websocket, Map<String, List<String>> headers) throws Exception {
 Log.d(TAG, "onConnected: 連接成功");
 }
 
 @Override
 public void onConnectError(WebSocket websocket, WebSocketException exception) throws Exception {
 Log.d(TAG, "onConnectError: 連接失敗");
 }
 
 @Override
 public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame,
  WebSocketFrame clientCloseFrame,
  boolean closedByServer) throws Exception {
 Log.d(TAG, "onDisconnected: 斷開(kāi)連接");
 
 }
 
 @Override
 public void onTextMessage(WebSocket websocket, String text) throws Exception {
 Log.d(TAG, "onTextMessage: 收到消息:" + text);
 }
}

下面我們調(diào)用連接方法

WsManager.getWsManger().connect();

運(yùn)行項(xiàng)目我們可以看到如下打?。?/p>

 在android上如何使用WebSocket做消息通信功能

此處我們要做的處理是,如果收到連接失敗或者斷開(kāi)連接的回調(diào) 需要重新連接,我們重新調(diào)用一次連接方法即可,并且如果超過(guò)三次重連失敗,我們?cè)跇I(yè)務(wù)中可以通過(guò)調(diào)用接口來(lái)獲取數(shù)據(jù),避免數(shù)據(jù)丟失,此處細(xì)節(jié)省略。

協(xié)議封裝

此處協(xié)議如下所示:

{
 "action":"",
 "requestChild":{
 "clientType":"",
 "id":""
 }
}

心跳、發(fā)送請(qǐng)求都屬于客戶端主動(dòng)發(fā)送請(qǐng)求,對(duì)于請(qǐng)求結(jié)果我們分為成功和失敗以及超時(shí),發(fā)送超時(shí)我們是收不到服務(wù)器任何回復(fù)的,所以我們需要在發(fā)送之后將發(fā)送放在超時(shí)任務(wù)隊(duì)列中,如果請(qǐng)求成功將任務(wù)從超時(shí)隊(duì)列中移除,超時(shí)從超時(shí)隊(duì)列中獲取任務(wù)重新請(qǐng)求。

超時(shí)任務(wù)隊(duì)列中回調(diào)有成功、失敗、超時(shí)。

我們按照上述協(xié)議,新增對(duì)應(yīng)實(shí)體類(lèi),采用Builder設(shè)計(jì)模式

public class Request {
 
 /**
 * 行為
 */
 private String action;
 
 /**
 * 請(qǐng)求體
 */
 private RequestChild req;
 
 
 /**
 * 請(qǐng)求次數(shù)
 */
 private transient int reqCount;
 
 /**
 * 超時(shí)的時(shí)間
 */
 private transient int timeOut;
 
 
 public Request() {
 }
 
 
 public Request(String action, int reqCount, int timeOut, RequestChild req) {
 this.action = action;
 this.req = req;
 this.reqCount = reqCount;
 this.timeOut = timeOut;
 }
 
 
 public static class Builder {
 //action 請(qǐng)求類(lèi)型
 private String action;
 //請(qǐng)求子類(lèi)數(shù)據(jù) 按照具體業(yè)務(wù)劃分
 private RequestChild req;
 //請(qǐng)求次數(shù) 便于重試
 private int reqCount;
 //超時(shí)時(shí)間
 private int timeOut;
 
 public Builder action(String action) {
 this.action = action;
 return this;
 }
 
 
 public Builder req(RequestChild req) {
 this.req = req;
 return this;
 }
 
 
 public Builder reqCount(int reqCount) {
 this.reqCount = reqCount;
 return this;
 }
 
 public Builder timeOut(int timeOut) {
 this.timeOut = timeOut;
 return this;
 }
 
 public Request build() {
 return new Request(action, reqCount, timeOut, req);
 }
 
 }
} 

public class RequestChild {
 
 /**
 * 設(shè)備類(lèi)型
 */
 private String clientType;
 
 
 /**
 * 用于用戶注冊(cè)的id
 */
 private String id;
 
 public RequestChild(String clientType, String id) {
 this.clientType = clientType;
 this.id = id;
 }
 
 public RequestChild() {
 }
 
 
 public static class Builder {
 private String clientType;
 private String id;
 
 public RequestChild.Builder setClientType(String clientType) {
 this.clientType = clientType;
 return this;
 }
 
 
 public RequestChild.Builder setId(String id) {
 this.id = id;
 return this;
 }
 
 
 public RequestChild build() {
 return new RequestChild(clientType, id);
 }
 
 }
 
 
}

我們添加一個(gè)發(fā)送請(qǐng)求的方法如下:

/**
 * 發(fā)送請(qǐng)求
 *
 * @param request 請(qǐng)求體
 * @param reqCount 請(qǐng)求次數(shù)
 * @param requestListern 請(qǐng)求回調(diào)
 */
private void senRequest(Request request, final int reqCount, final RequestListern requestListern) {
 if (!isNetConnect()) {
 requestListern.requestFailed("網(wǎng)絡(luò)未連接");
 return;
 }
 
}

請(qǐng)求回調(diào)如下所示

public interface RequestListern {
 
 /**
 * 請(qǐng)求成功
 */
 void requestSuccess();
 
 /**
 * 請(qǐng)求失敗
 *
 * @param message 請(qǐng)求失敗消息提示
 */
 void requestFailed(String message);
}

接著我們要把請(qǐng)求放在超時(shí)隊(duì)列中,新建超時(shí)任務(wù)類(lèi),對(duì)應(yīng)的分別是請(qǐng)求參數(shù)、請(qǐng)求回調(diào)、任務(wù)調(diào)度

public class TimeOutTask {
 
 
 /**
 * 請(qǐng)求主體
 */
 private Request request;
 
 /**
 * 通用返回
 */
 private RequestCallBack requestCallBack;
 
 /**
 * r任務(wù)
 */
 private ScheduledFuture scheduledFuture;
 
 
 public TimeOutTask(Request request,
  RequestCallBack requestCallBack,
  ScheduledFuture scheduledFuture) {
 this.request = request;
 this.requestCallBack = requestCallBack;
 this.scheduledFuture = scheduledFuture;
 }
 
 public ScheduledFuture getScheduledFuture() {
 return scheduledFuture;
 }
 
 public void setScheduledFuture(ScheduledFuture scheduledFuture) {
 this.scheduledFuture = scheduledFuture;
 }
 
 public Request getRequest() {
 return request;
 }
 
 public void setRequest(Request request) {
 this.request = request;
 }
 
 public RequestCallBack getRequestCallBack() {
 return requestCallBack;
 }
 
 public void setRequestCallBack(RequestCallBack requestCallBack) {
 this.requestCallBack = requestCallBack;
 }
 
}

RequestCallBack是超時(shí)任務(wù)的回調(diào),只是比請(qǐng)求回調(diào)多了個(gè)超時(shí),因?yàn)槌瑫r(shí)的處理機(jī)制是一樣的,所以這里我們沒(méi)必要將超時(shí)回調(diào)到請(qǐng)求中

public interface RequestCallBack {
 
 /**
 * 請(qǐng)求成功
 */
 void requestSuccess();
 
 /**
 * 請(qǐng)求失敗
 *
 * @param request 請(qǐng)求體
 * @param message 請(qǐng)求失敗的消息
 */
 void requestFailed(String message, Request request);
 
 /**
 * 請(qǐng)求超時(shí)
 *
 * @param request 請(qǐng)求體
 */
 void timeOut(Request request);
}
/**
 * 添加超時(shí)任務(wù)
 */
private ScheduledFuture enqueueTimeout(final Request request, final long timeout) {
 Log.d(TAG, " " + "enqueueTimeout: 添加超時(shí)任務(wù)類(lèi)型為:" + request.getAction());
 return executor.schedule(new Runnable() {
 @Override
 public void run() {
 TimeOutTask timeoutTask = callbacks.remove(request.getAction());
 if (timeoutTask != null) {
 timeoutTask.getRequestCallBack().timeOut(timeoutTask.getRequest());
 }
 }
 }, timeout, TimeUnit.MILLISECONDS);
}

超時(shí)任務(wù)的方法是通過(guò)任務(wù)調(diào)度定時(shí)調(diào)用,請(qǐng)求成功后我們會(huì)把超時(shí)任務(wù)移除,當(dāng)?shù)搅顺瑫r(shí)時(shí)間時(shí),任務(wù)還存在就說(shuō)明任務(wù)超時(shí)了。

每次的任務(wù)我們以action為鍵值存在hashMap中

private Map<String, CallbackWrapper> callbacks = new HashMap<>();

將任務(wù)放入超時(shí)任務(wù)代碼如下所示:

final ScheduledFuture timeoutTask = enqueueTimeout(request, request.getTimeOut());
 
final RequestCallBack requestCallBack = new RequestCallBack() {
 @Override
 public void requestSuccess() {
 requestListern.requestSuccess();
 }
 
 @Override
 public void requestFailed(String message, Request request) {
 requestListern.requestFailed(message);
 }
 
 @Override
 public void timeOut(Request request) {
 timeOutHanlder(request);
 }
};
callbacks.put(request.getAction(),
 new CallbackWrapper(request, requestCallBack, timeoutTask));

一般而言,任務(wù)超時(shí)都是由于連接原因?qū)е?,所以我們這里可以嘗試重試一次,如果還是超時(shí),通過(guò) timeOutHanlder(request);方法 進(jìn)行重新連接,重連代碼和連接代碼一樣,這里就省略了,做好這步操作,我們就可以發(fā)送消息了。

/**
 * 超時(shí)任務(wù)
 */
private void timeOutHanlder(Request requset) {
 setStatus(WsStatus.CONNECT_FAIL);
 //這里假裝有重連
 Log.d(TAG, "timeOutHanlder: 請(qǐng)求超時(shí) 準(zhǔn)備重連");
}

到這里我們的流程基本可以走通了。

心跳

首先我們要了解下心跳的作用是什么,心跳是在連接成功后,通過(guò)固定的間隔時(shí)間向服務(wù)器發(fā)送詢問(wèn),當(dāng)前是否還在線,有很多人說(shuō)心跳失敗我們就重連,成功就繼續(xù)心跳,但是這里要注意的是,我們一般是收不到心跳失敗回調(diào)的,心跳也是向服務(wù)器發(fā)送數(shù)據(jù),所以我們要將所有的主動(dòng)請(qǐng)求都放在超時(shí)任務(wù)隊(duì)列中,

所以對(duì)websocket來(lái)說(shuō) 請(qǐng)求結(jié)果有三種:成功、失敗、超時(shí),對(duì)于用戶 只有成功、失敗即可。

至于心跳、注冊(cè)等請(qǐng)求發(fā)送的數(shù)據(jù)是什么,這就得看我們與服務(wù)端定的協(xié)議是什么樣了,通常來(lái)說(shuō) 分為action 和 requestBody,協(xié)議格式我們?cè)俚诙揭呀?jīng)封裝好了,這里我們以心跳任務(wù)為例驗(yàn)證上面的封裝。

/**
 * 心跳
 */
void keepAlive() {
 
 Request request = new Request.Builder()
 .reqCount(0)
 .timeOut(REQUEST_TIMEOUT)
 .action(ACTION_KEEPALIVE).build();
 
 WsManager.getWsManger().senRequest(request, request.getReqCount() + 1, new RequestListern() {
 @Override
 public void requestSuccess() {
 Log.d(TAG, "requestSuccess: 心跳發(fā)送成功了");
 }
 
 @Override
 public void requestFailed(String message) {
 }
 });
}

我們每間隔10s中開(kāi)啟一次心跳任務(wù)

/**
 * 開(kāi)始心跳
 */
public void startKeepAlive() {
 mHandler.postDelayed(mKeepAliveTask, HEART_BEAT_RATE);
}
/**
 * 心跳任務(wù)
 */
private Runnable mKeepAliveTask = new Runnable() {
 
 @Override
 public void run() {
 keepAlive();
 mHandler.removeCallbacks(mKeepAliveTask);
 mHandler.postDelayed(mKeepAliveTask, HEART_BEAT_RATE);
 }
};

為了便于操作演示,在主頁(yè)面上加個(gè)按鈕 ,點(diǎn)擊按鈕調(diào)用startKeepAlive方法,運(yùn)行如下所示: 

在android上如何使用WebSocket做消息通信功能

我們可以看到心跳返回的statue是300 不成功,5秒之后走到了請(qǐng)求超時(shí)的方法中,所以如果狀態(tài)返回成功的話,我們需要回調(diào)給調(diào)用者

/**
 * 處理 任務(wù)回調(diào)
 *
 * @param action 請(qǐng)求類(lèi)型
 */
void disPatchCallbackWarp(String action, boolean isSuccess) {
 CallbackWrapper callBackWarp = callbacks.remove(action);
 if (callBackWarp == null) {
 Logger.d(TAG+" "+ "disPatchCallbackWarp: 任務(wù)隊(duì)列為空");
 } else {
 callBackWarp.getScheduledFuture().cancel(true);
 if (isSuccess) {
 callBackWarp.getRequestCallBack().requestSuccess();
 } else {
 callBackWarp.getRequestCallBack().requestFailed("", new Request());
 }
 
 }
}

這樣調(diào)用者才知道成功或失敗。

發(fā)送其他消息與心跳一樣,只是請(qǐng)求參數(shù)不同而已,修改Request參數(shù)即可。這樣我們根據(jù)協(xié)議和業(yè)務(wù)就實(shí)現(xiàn)一個(gè)比較規(guī)范的webSocket消息推送流程了。

看完了這篇文章,相信你對(duì)在android上如何使用WebSocket做消息通信功能有了一定的了解,想了解更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI