您好,登錄后才能下訂單哦!
這篇文章將為大家詳細(xì)講解有關(guān)netty無縫切換rabbitmq和activem及qrocketmq實(shí)現(xiàn)聊天室單聊、群聊功能,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。
netty的pipeline處理鏈上的handler:需要IdleStateHandler心跳檢測(cè)channel是否有效,以及處理登錄認(rèn)證的UserAuthHandler和消息處理MessageHandler
protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(defLoopGroup, //編碼解碼器 new HttpServerCodec(), //將多個(gè)消息轉(zhuǎn)換成單一的消息對(duì)象 new HttpObjectAggregator(65536), //支持異步發(fā)送大的碼流,一般用于發(fā)送文件流 new ChunkedWriteHandler(), //檢測(cè)鏈路是否讀空閑,配合心跳handler檢測(cè)channel是否正常 new IdleStateHandler(60, 0, 0), //處理握手和認(rèn)證 new UserAuthHandler(), //處理消息的發(fā)送 new MessageHandler() ); }
對(duì)于所有連進(jìn)來的channel,我們需要保存起來,往后的群發(fā)消息需要依靠這些channel
public static void addChannel(Channel channel) { String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel); System.out.println("addChannel:" + remoteAddr); if (!channel.isActive()) { logger.error("channel is not active, address: {}", remoteAddr); } UserInfo userInfo = new UserInfo(); userInfo.setAddr(remoteAddr); userInfo.setChannel(channel); userInfo.setTime(System.currentTimeMillis()); userInfos.put(channel, userInfo); }
登錄后,channel就變成有效的channel,無效的channel之后將會(huì)丟棄
public static boolean saveUser(Channel channel, String nick, String password) { UserInfo userInfo = userInfos.get(channel); if (userInfo == null) { return false; } if (!channel.isActive()) { logger.error("channel is not active, address: {}, nick: {}", userInfo.getAddr(), nick); return false; } // 驗(yàn)證用戶名和密碼 if (nick == null || password == null) { return false; } LambdaQueryWrapper<account> lambdaQueryWrapper = new LambdaQueryWrapper<>(); lambdaQueryWrapper.eq(Account::getUsername, nick).eq(Account::getPassword, password); Account account = accountMapperStatic.selectOne(lambdaQueryWrapper); if (account == null) { return false; } // 增加一個(gè)認(rèn)證用戶 userCount.incrementAndGet(); userInfo.setNick(nick); userInfo.setAuth(true); userInfo.setId(account.getId()); userInfo.setUsername(account.getUsername()); userInfo.setGroupNumber(account.getGroupNumber()); userInfo.setTime(System.currentTimeMillis()); // 注冊(cè)該用戶推送消息的通道 offlineInfoTransmitStatic.registerPull(channel); return true; }
當(dāng)channel關(guān)閉時(shí),就不再接收消息。unregisterPull就是注銷信息消費(fèi)者,客戶端不再接取聊天消息。此外,從下方有一個(gè)加寫鎖的操作,就是為了避免channel還在發(fā)送消息時(shí),這邊突然關(guān)閉channel,這樣會(huì)導(dǎo)致報(bào)錯(cuò)。
public static void removeChannel(Channel channel) { try { logger.warn("channel will be remove, address is :{}", NettyUtil.parseChannelRemoteAddr(channel)); //加上讀寫鎖保證移除channel時(shí),避免channel關(guān)閉時(shí),還有別的線程對(duì)其操作,造成錯(cuò)誤 rwLock.writeLock().lock(); channel.close(); UserInfo userInfo = userInfos.get(channel); if (userInfo != null) { if (userInfo.isAuth()) { offlineInfoTransmitStatic.unregisterPull(channel); // 減去一個(gè)認(rèn)證用戶 userCount.decrementAndGet(); } userInfos.remove(channel); } } finally { rwLock.writeLock().unlock(); } }
為了無縫切換使用rabbitmq、rocketmq、activemq、不使用中間件存儲(chǔ)和轉(zhuǎn)發(fā)聊天消息這4種狀態(tài),定義如下4個(gè)接口。依次是發(fā)送單聊消息、群聊消息、客戶端啟動(dòng)接收消息、客戶端下線不接收消息。
public interface OfflineInfoTransmit { void pushP2P(Integer userId, String message); void pushGroup(String groupNumber, String message); void registerPull(Channel channel); void unregisterPull(Channel channel); }
其中,如何使用rabbitmq、rocketmq、activemq三種中間件中的一種來存儲(chǔ)和轉(zhuǎn)發(fā)聊天消息,它的處理流程如下:
單聊的模型參考線程池的模型,如果用戶在線,直接通過channel發(fā)送給用戶。如果用戶離線,則發(fā)往中間件存儲(chǔ),下次用戶上線時(shí)直接從中間件拉取消息。這樣做對(duì)比所有消息的發(fā)送都通過中間件來轉(zhuǎn)的好處是提升了性能
群聊則是完全通過中間件來轉(zhuǎn)發(fā)消息,消息發(fā)送中間件,客戶端從中間件接取消息。如果仍像單聊那樣操作,在線用戶直接通過channel發(fā)送,操作過于繁瑣,要判斷這個(gè)群組的哪些用戶是否在線
如果用戶在線就注冊(cè)消費(fèi)者,從中間件接取消息。否則,就斷開消費(fèi)者,消息保留在中間件中,以便客戶端下次上線時(shí)拉取。這樣就實(shí)現(xiàn)了離線消息的接收。
不管使用哪種中間件或使用不使用中間件,它的處理流程都遵循上面的3個(gè)要求,就能無縫切換上方的4種方法來存儲(chǔ)和轉(zhuǎn)發(fā)消息。需要哪種方法開啟相應(yīng)注解即可。
項(xiàng)目地址:https://github.com/shuangyueliao/netty-chat
關(guān)于netty無縫切換rabbitmq和activem及qrocketmq實(shí)現(xiàn)聊天室單聊、群聊功能就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。