您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“redis流數(shù)據(jù)推送多用戶的方法是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“redis流數(shù)據(jù)推送多用戶的方法是什么”吧!
1 當(dāng)用戶幾百 幾千個(gè)時(shí) 如何推送?取締線程池 采用單線程異步同步推送。
2 現(xiàn)在的邏輯:
每次項(xiàng)目重新啟動(dòng): 初始化channel 、服務(wù)端斷開連接-重新連接。當(dāng)有服務(wù)連接不上的時(shí)候定時(shí)器連接。
這樣會(huì)產(chǎn)生一個(gè)問題:異步連接的任務(wù)特別多 導(dǎo)致服務(wù)奔潰
/** * 關(guān)閉連接時(shí) */@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress(); int port = ipSocket.getPort(); String host = ipSocket.getHostString(); String serverUrl = host + ":" + port; String prev = redisTemplate.opsForValue().get(SOCKET_CONNECT_PREFIX + serverUrl); Integer cur = Math.toIntExact(System.currentTimeMillis() / 1000); if (null == prev || cur - Integer.parseInt(prev) > 20 * 60) { redisTemplate.opsForValue().set(SOCKET_CONNECT_PREFIX + serverUrl, cur + ""); log.info("服務(wù)端斷開連接=====" + host + port + "20分鐘之后 重新連接"); final EventLoop eventLoop = ctx.channel().eventLoop(); Bootstrap bootstrap = defaultProcessHandler.getBootstrap(eventLoop); eventLoop.schedule( () -> defaultProcessHandler.doConnect(bootstrap, host + ":" + port, new AtomicInteger(0)), 20, TimeUnit.MINUTES); } else { log.warn(serverUrl + "距離上次斷開連接不足20分鐘==" + (cur - Integer.parseInt(prev)) + "s"); } super.channelInactive(ctx);}
在系統(tǒng)啟動(dòng)初始化 連接10次:
3 如何保證發(fā)送數(shù)據(jù)的完整性 TCP 粘包問題
服務(wù)端添加按換行符分隔的解碼器;
serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) {
//此方法每次客戶端連接都會(huì)調(diào)用,是為通道初始化的方法
//獲得通道channel中的管道鏈(執(zhí)行鏈、handler鏈)
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(Short.MAX_VALUE * 10));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringHandler());
log.info("success to initHandler!");
}
});
ChannelFuture future = channels.get(callbackUrl.getUrl());if (null != future) { try { boolean result = sendMsg(future, pushDataStr); this.redisTemplate.opsForValue().set(callbackUrl.getUrl(), offset.toString()); if (!result) { log.error("this channel push failed {}", callbackUrl.getUrl()); returnVal = false; } } catch (Exception e) { log.error("push exception", e); returnVal = false; }}return returnVal;
當(dāng)推送成功 會(huì)記錄次用戶 此次的數(shù)據(jù)游標(biāo)數(shù)據(jù)到redis.
當(dāng)推送服務(wù)掛斷之后,會(huì)進(jìn)行任務(wù)的初始化 此時(shí)會(huì)從redis中讀取每個(gè)客戶上次讀取的位置offest 提交到任務(wù)線程池
這個(gè)問題同樣解決了服務(wù)重啟之后,依然可以從上次讀取結(jié)束的位置接著讀取。讀取任務(wù)的開始游標(biāo)位置 :是上次服務(wù)成功處理后的游標(biāo)。
BaseReceiver 接口 handler 方法 每次接收到數(shù)據(jù) 返回當(dāng)前游標(biāo)值。當(dāng)進(jìn)行業(yè)務(wù)處理成功之后 返回true .會(huì)自動(dòng)進(jìn)行后續(xù)數(shù)據(jù)讀取。當(dāng)客戶那邊的接收數(shù)據(jù)服務(wù)端掛了之后 首先會(huì)進(jìn)行自動(dòng)重連操作,此時(shí)讀取datahub數(shù)據(jù)的線程依然在返回?cái)?shù)據(jù) 但是不能推送成功,所以游標(biāo)值 不會(huì)后移。
到此,相信大家對“redis流數(shù)據(jù)推送多用戶的方法是什么”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。