您好,登錄后才能下訂單哦!
一.導(dǎo)入Netty依賴
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.25.Final</version> </dependency>
二.搭建websocket服務(wù)器
@Component public class WebSocketServer { /** * 主線程池 */ private EventLoopGroup bossGroup; /** * 工作線程池 */ private EventLoopGroup workerGroup; /** * 服務(wù)器 */ private ServerBootstrap server; /** * 回調(diào) */ private ChannelFuture future; public void start() { future = server.bind(9001); System.out.println("netty server - 啟動(dòng)成功"); } public WebSocketServer() { bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(); server = new ServerBootstrap(); server.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new WebsocketInitializer()); } }
三.初始化Websocket
public class WebsocketInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // ------------------ // 用于支持Http協(xié)議 // ------------------ // websocket基于http協(xié)議,需要有http的編解碼器 pipeline.addLast(new HttpServerCodec()); // 對(duì)寫大數(shù)據(jù)流的支持 pipeline.addLast(new ChunkedWriteHandler()); // 添加對(duì)HTTP請(qǐng)求和響應(yīng)的聚合器:只要使用Netty進(jìn)行Http編程都需要使用 //設(shè)置單次請(qǐng)求的文件的大小 pipeline.addLast(new HttpObjectAggregator(1024 * 64)); //webSocket 服務(wù)器處理的協(xié)議,用于指定給客戶端連接訪問的路由 :/ws pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); // 添加Netty空閑超時(shí)檢查的支持 // 1. 讀空閑超時(shí)(超過一定的時(shí)間會(huì)發(fā)送對(duì)應(yīng)的事件消息) // 2. 寫空閑超時(shí) // 3. 讀寫空閑超時(shí) pipeline.addLast(new IdleStateHandler(4, 8, 12)); //添加心跳處理 pipeline.addLast(new HearBeatHandler()); // 添加自定義的handler pipeline.addLast(new ChatHandler()); } }
四.創(chuàng)建Netty監(jiān)聽器
@Component public class NettyListener implements ApplicationListener<ContextRefreshedEvent> { @Resource private WebSocketServer websocketServer; @Override public void onApplicationEvent(ContextRefreshedEvent event) { if(event.getApplicationContext().getParent() == null) { try { websocketServer.start(); } catch (Exception e) { e.printStackTrace(); } } } }
五.建立消息通道
public class UserChannelMap { /** * 用戶保存用戶id與通道的Map對(duì)象 */ // private static Map<String, Channel> userChannelMap; /* static { userChannelMap = new HashMap<String, Channel>(); }*/ /** * 定義一個(gè)channel組,管理所有的channel * GlobalEventExecutor.INSTANCE 是全局的事件執(zhí)行器,是一個(gè)單例 */ private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 存放用戶與Chanel的對(duì)應(yīng)信息,用于給指定用戶發(fā)送消息 */ private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>(); private UserChannelMap(){} /** * 添加用戶id與channel的關(guān)聯(lián) * @param userNum * @param channel */ public static void put(String userNum, Channel channel) { userChannelMap.put(userNum, channel); } /** * 根據(jù)用戶id移除用戶id與channel的關(guān)聯(lián) * @param userNum */ public static void remove(String userNum) { userChannelMap.remove(userNum); } /** * 根據(jù)通道id移除用戶與channel的關(guān)聯(lián) * @param channelId 通道的id */ public static void removeByChannelId(String channelId) { if(!StringUtils.isNotBlank(channelId)) { return; } for (String s : userChannelMap.keySet()) { Channel channel = userChannelMap.get(s); if(channelId.equals(channel.id().asLongText())) { System.out.println("客戶端連接斷開,取消用戶" + s + "與通道" + channelId + "的關(guān)聯(lián)"); userChannelMap.remove(s); UserService userService = SpringUtil.getBean(UserService.class); userService.logout(s); break; } } } /** * 打印所有的用戶與通道的關(guān)聯(lián)數(shù)據(jù) */ public static void print() { for (String s : userChannelMap.keySet()) { System.out.println("用戶id:" + s + " 通道:" + userChannelMap.get(s).id()); } } /** * 根據(jù)好友id獲取對(duì)應(yīng)的通道 * @param receiverNum 接收人編號(hào) * @return Netty通道 */ public static Channel get(String receiverNum) { return userChannelMap.get(receiverNum); } /** * 獲取channel組 * @return */ public static ChannelGroup getChannelGroup() { return channelGroup; } /** * 獲取用戶channel map * @return */ public static ConcurrentHashMap<String,Channel> getUserChannelMap(){ return userChannelMap; } }
六.自定義消息類型
public class Message { /** * 消息類型 */ private Integer type; /** * 聊天消息 */ private String message; /** * 擴(kuò)展消息字段 */ private Object ext; public Integer getType() { return type; } public void setType(Integer type) { this.type = type; } public MarketChatRecord getChatRecord() { return marketChatRecord; } public void setChatRecord(MarketChatRecord chatRecord) { this.marketChatRecord = chatRecord; } public Object getExt() { return ext; } public void setExt(Object ext) { this.ext = ext; } @Override public String toString() { return "Message{" + "type=" + type + ", marketChatRecord=" + marketChatRecord + ", ext=" + ext + '}'; } }
七.創(chuàng)建處理消息的handler
public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class); /** * 用來保存所有的客戶端連接 */ private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** *當(dāng)Channel中有新的事件消息會(huì)自動(dòng)調(diào)用 */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { // 當(dāng)接收到數(shù)據(jù)后會(huì)自動(dòng)調(diào)用 // 獲取客戶端發(fā)送過來的文本消息 Gson gson = new Gson(); log.info("服務(wù)器收到消息:{}",msg.text()); System.out.println("接收到消息數(shù)據(jù)為:" + msg.text()); Message message = gson.fromJson(msg.text(), Message.class); //根據(jù)業(yè)務(wù)要求進(jìn)行消息處理 switch (message.getType()) { // 處理客戶端連接的消息 case 0: // 建立用戶與通道的關(guān)聯(lián) // 處理客戶端發(fā)送好友消息 break; case 1: // 處理客戶端的簽收消息 break; case 2: // 將消息記錄設(shè)置為已讀 break; case 3: // 接收心跳消息 break; default: break; } } // 當(dāng)有新的客戶端連接服務(wù)器之后,會(huì)自動(dòng)調(diào)用這個(gè)方法 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info("handlerAdded 被調(diào)用"+ctx.channel().id().asLongText()); // 添加到channelGroup 通道組 UserChannelMap.getChannelGroup().add(ctx.channel()); // clients.add(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.info("{異常:}"+cause.getMessage()); // 刪除通道 UserChannelMap.getChannelGroup().remove(ctx.channel()); UserChannelMap.removeByChannelId(ctx.channel().id().asLongText()); ctx.channel().close(); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.info("handlerRemoved 被調(diào)用"+ctx.channel().id().asLongText()); //刪除通道 UserChannelMap.getChannelGroup().remove(ctx.channel()); UserChannelMap.removeByChannelId(ctx.channel().id().asLongText()); UserChannelMap.print(); } }
八.處理心跳
public class HearBeatHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent)evt; if(idleStateEvent.state() == IdleState.READER_IDLE) { System.out.println("讀空閑事件觸發(fā)..."); } else if(idleStateEvent.state() == IdleState.WRITER_IDLE) { System.out.println("寫空閑事件觸發(fā)..."); } else if(idleStateEvent.state() == IdleState.ALL_IDLE) { System.out.println("---------------"); System.out.println("讀寫空閑事件觸發(fā)"); System.out.println("關(guān)閉通道資源"); ctx.channel().close(); } } } }
搭建完成后調(diào)用測(cè)試
1.頁(yè)面訪問http://localhost:9001/ws
2.端口號(hào)9001和訪問路徑ws都是我們?cè)谏线吪渲玫?,然后傳入我們自定義的消息message類型。
3.大概流程:消息發(fā)送 :用戶1先連接通道,然后發(fā)送消息給用戶2,用戶2若是在線直接可以發(fā)送給用戶,若沒在線可以將消息暫存在redis或者通道里,用戶2鏈接通道的話,兩者可以直接通訊。
消息推送 :用戶1連接通道,根據(jù)通道id查詢要推送的人是否在線,或者推送給所有人,這里我只推送給指定的人。
到此這篇關(guān)于SpringBoot+Netty+WebSocket實(shí)現(xiàn)消息發(fā)送的示例代碼的文章就介紹到這了,更多相關(guān)SpringBoot Netty WebSocket消息發(fā)送內(nèi)容請(qǐng)搜索億速云以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持億速云!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。