您好,登錄后才能下訂單哦!
netty的pipeline處理鏈上的handler:需要IdleStateHandler心跳檢測(cè)channel是否可以,以及處理登錄認(rèn)證的UserAuthHandler和消息處理MessageHandler
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(defLoopGroup,
//編碼解碼器
new HttpServerCodec(),
//將多個(gè)消息轉(zhuǎn)換成單一的消息對(duì)象
new HttpObjectAggregator(65536),
//支持異步發(fā)送大的碼流,一般用于發(fā)送文件流
new ChunkedWriteHandler(),
//檢測(cè)鏈路是否讀空閑,配合心跳handler檢測(cè)channel是否正常
new IdleStateHandler(60, 0, 0),
//處理握手和認(rèn)證
new UserAuthHandler(),
//處理消息的發(fā)送
new MessageHandler()
);
}
對(duì)于所有連進(jìn)來(lái)的channel,我們需要保存起來(lái),往后的群發(fā)消息需要依靠這些channel
public static void addChannel(Channel channel) {
String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel);
System.out.println("addChannel:" + remoteAddr);
if (!channel.isActive()) {
logger.error("channel is not active, address: {}", remoteAddr);
}
UserInfo userInfo = new UserInfo();
userInfo.setAddr(remoteAddr);
userInfo.setChannel(channel);
userInfo.setTime(System.currentTimeMillis());
userInfos.put(channel, userInfo);
}
登錄后,channel就變成有效的channel,無(wú)效的channel之后將會(huì)丟棄
public static boolean saveUser(Channel channel, String nick, String password) {
UserInfo userInfo = userInfos.get(channel);
if (userInfo == null) {
return false;
}
if (!channel.isActive()) {
logger.error("channel is not active, address: {}, nick: {}", userInfo.getAddr(), nick);
return false;
}
if (nick == null || password == null) {
return false;
}
LambdaQueryWrapper<Account> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(Account::getUsername, nick).eq(Account::getPassword, password);
Account account = accountMapperStatic.selectOne(lambdaQueryWrapper);
if (account == null) {
return false;
}
// 增加一個(gè)認(rèn)證用戶(hù)
userCount.incrementAndGet();
userInfo.setNick(nick);
userInfo.setAuth(true);
userInfo.setId(account.getId());
userInfo.setUsername(account.getUsername());
userInfo.setGroupNumber(account.getGroupNumber());
userInfo.setTime(System.currentTimeMillis());
// 注冊(cè)該用戶(hù)推送消息的通道
offlineInfoTransmitStatic.registerPull(channel);
return true;
}
當(dāng)channel關(guān)閉時(shí),就不再接收消息。unregisterPull就是注銷(xiāo)信息消費(fèi)者,客戶(hù)端不再接取聊天消息。此外,從下方有一個(gè)加寫(xiě)鎖的操作,就是為了避免channel還在發(fā)送消息時(shí),這邊突然關(guān)閉channel,這樣會(huì)導(dǎo)致報(bào)錯(cuò)。
public static void removeChannel(Channel channel) {
try {
logger.warn("channel will be remove, address is :{}", NettyUtil.parseChannelRemoteAddr(channel));
//加上讀寫(xiě)鎖保證移除channel時(shí),避免channel關(guān)閉時(shí),還有別的線程對(duì)其操作,造成錯(cuò)誤
rwLock.writeLock().lock();
channel.close();
UserInfo userInfo = userInfos.get(channel);
if (userInfo != null) {
if (userInfo.isAuth()) {
offlineInfoTransmitStatic.unregisterPull(channel);
// 減去一個(gè)認(rèn)證用戶(hù)
userCount.decrementAndGet();
}
userInfos.remove(channel);
}
} finally {
rwLock.writeLock().unlock();
}
}
為了無(wú)縫切換使用rabbitmq、rocketmq、activemq、不使用中間件存儲(chǔ)和轉(zhuǎn)發(fā)聊天消息這4種狀態(tài),定義如下4個(gè)接口。依次是發(fā)送單聊消息、群聊消息、客戶(hù)端啟動(dòng)接收消息、客戶(hù)端下線不接收消息。
public interface OfflineInfoTransmit {
void pushP2P(Integer userId, String message);
void pushGroup(String groupNumber, String message);
void registerPull(Channel channel);
void unregisterPull(Channel channel);
}
其中,如何使用rabbitmq、rocketmq、activemq三種中間件中的一種來(lái)存儲(chǔ)和轉(zhuǎn)發(fā)聊天消息,它的處理流程如下:
代碼地址:
https://github.com/shuangyueliao/netty-chat
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。