溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

redis流數(shù)據(jù)推送多用戶的方法是什么

發(fā)布時(shí)間:2021-12-18 17:29:27 來源:億速云 閱讀:190 作者:iii 欄目:大數(shù)據(jù)

本篇內(nèi)容主要講解“redis流數(shù)據(jù)推送多用戶的方法是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“redis流數(shù)據(jù)推送多用戶的方法是什么”吧!

1 當(dāng)用戶幾百 幾千個(gè)時(shí) 如何推送?取締線程池  采用單線程異步同步推送。

redis流數(shù)據(jù)推送多用戶的方法是什么

2 現(xiàn)在的邏輯:

每次項(xiàng)目重新啟動(dòng): 初始化channel 、服務(wù)端斷開連接-重新連接。當(dāng)有服務(wù)連接不上的時(shí)候定時(shí)器連接。

這樣會(huì)產(chǎn)生一個(gè)問題:異步連接的任務(wù)特別多 導(dǎo)致服務(wù)奔潰 

/** * 關(guān)閉連接時(shí) */@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {  InetSocketAddress ipSocket = (InetSocketAddress) ctx.channel().remoteAddress();  int port = ipSocket.getPort();  String host = ipSocket.getHostString();  String serverUrl = host + ":" + port;  String prev = redisTemplate.opsForValue().get(SOCKET_CONNECT_PREFIX + serverUrl);  Integer cur = Math.toIntExact(System.currentTimeMillis() / 1000);  if (null == prev || cur - Integer.parseInt(prev) > 20 * 60) {    redisTemplate.opsForValue().set(SOCKET_CONNECT_PREFIX + serverUrl, cur + "");    log.info("服務(wù)端斷開連接=====" + host + port + "20分鐘之后 重新連接");    final EventLoop eventLoop = ctx.channel().eventLoop();    Bootstrap bootstrap = defaultProcessHandler.getBootstrap(eventLoop);    eventLoop.schedule(        () -> defaultProcessHandler.doConnect(bootstrap, host + ":" + port, new AtomicInteger(0)),        20,        TimeUnit.MINUTES);  } else {    log.warn(serverUrl + "距離上次斷開連接不足20分鐘==" + (cur - Integer.parseInt(prev)) + "s");  }  super.channelInactive(ctx);}

在系統(tǒng)啟動(dòng)初始化 連接10次:

redis流數(shù)據(jù)推送多用戶的方法是什么

3 如何保證發(fā)送數(shù)據(jù)的完整性  TCP 粘包問題

服務(wù)端添加按換行符分隔的解碼器;

serverBootstrap.childHandler(new ChannelInitializer<Channel>() {  @Override  protected void initChannel(Channel channel) {    //此方法每次客戶端連接都會(huì)調(diào)用,是為通道初始化的方法
   //獲得通道channel中的管道鏈(執(zhí)行鏈、handler鏈)    ChannelPipeline pipeline = channel.pipeline();    pipeline.addLast(new LineBasedFrameDecoder(Short.MAX_VALUE * 10));    pipeline.addLast(new StringDecoder());    pipeline.addLast(new StringHandler());
   log.info("success to initHandler!");  }});

 

4 關(guān)于推送服務(wù)斷開之后 重新連接上來 接著上次發(fā)送的位置繼續(xù)推送。
ChannelFuture future = channels.get(callbackUrl.getUrl());if (null != future) {  try {    boolean result = sendMsg(future, pushDataStr);    this.redisTemplate.opsForValue().set(callbackUrl.getUrl(), offset.toString());    if (!result) {      log.error("this channel push failed {}", callbackUrl.getUrl());      returnVal = false;    }  } catch (Exception e) {    log.error("push exception", e);    returnVal = false;  }}return returnVal;

當(dāng)推送成功  會(huì)記錄次用戶 此次的數(shù)據(jù)游標(biāo)數(shù)據(jù)到redis.

當(dāng)推送服務(wù)掛斷之后,會(huì)進(jìn)行任務(wù)的初始化 此時(shí)會(huì)從redis中讀取每個(gè)客戶上次讀取的位置offest 提交到任務(wù)線程池

這個(gè)問題同樣解決了服務(wù)重啟之后,依然可以從上次讀取結(jié)束的位置接著讀取。讀取任務(wù)的開始游標(biāo)位置 :是上次服務(wù)成功處理后的游標(biāo)。

redis流數(shù)據(jù)推送多用戶的方法是什么

redis流數(shù)據(jù)推送多用戶的方法是什么

5 當(dāng)接收數(shù)據(jù)的服務(wù)端 重新斷開之后,如何保證接著上次讀取的位置?

BaseReceiver 接口  handler 方法  每次接收到數(shù)據(jù) 返回當(dāng)前游標(biāo)值。當(dāng)進(jìn)行業(yè)務(wù)處理成功之后  返回true .會(huì)自動(dòng)進(jìn)行后續(xù)數(shù)據(jù)讀取。當(dāng)客戶那邊的接收數(shù)據(jù)服務(wù)端掛了之后 首先會(huì)進(jìn)行自動(dòng)重連操作,此時(shí)讀取datahub數(shù)據(jù)的線程依然在返回?cái)?shù)據(jù) 但是不能推送成功,所以游標(biāo)值 不會(huì)后移。

到此,相信大家對“redis流數(shù)據(jù)推送多用戶的方法是什么”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI