您好,登錄后才能下訂單哦!
dubbo中AllDispatcher的作用是什么,很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Dispatcher.java
@SPI(AllDispatcher.NAME) public interface Dispatcher { /** * dispatch the message to threadpool. * * @param handler * @param url * @return channel handler */ @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"}) // The last two parameters are reserved for compatibility with the old configuration ChannelHandler dispatch(ChannelHandler handler, URL url); }
Dispatcher接口定義了dispatch方法,返回ChannelHandler
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllDispatcher.java
public class AllDispatcher implements Dispatcher { public static final String NAME = "all"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new AllChannelHandler(handler, url); } }
AllDispatcher實(shí)現(xiàn)了Dispatcher接口,其dispatch方法返回的是AllChannelHandler
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/all/AllChannelHandler.java
public class AllChannelHandler extends WrappedChannelHandler { public AllChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } @Override public void connected(Channel channel) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } } @Override public void disconnected(Channel channel) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t); } } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out if(message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; if(request.isTwoWay()){ String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } @Override public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); } } }
AllChannelHandler繼承了WrappedChannelHandler,其connected、disconnected、received、caught均是通過父類的getExecutorService獲取線程池,然后執(zhí)行創(chuàng)建的ChannelEventRunnable;received方法在捕獲到異常時(shí)RejectedExecutionException且message是Request,而且request是twoWay的時(shí)候會(huì)返回SERVER_THREADPOOL_EXHAUSTED_ERROR
Dispatcher接口定義了dispatch方法,返回ChannelHandler
AllChannelHandler繼承了WrappedChannelHandler,其connected、disconnected、received、caught均是通過父類的getExecutorService獲取線程池,然后執(zhí)行創(chuàng)建的ChannelEventRunnable
AllChannelHandler的received方法在捕獲到異常時(shí)RejectedExecutionException且message是Request,而且request是twoWay的時(shí)候會(huì)返回SERVER_THREADPOOL_EXHAUSTED_ERROR
看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝您對(duì)億速云的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。