您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“Java Dubbo協(xié)議下的服務(wù)端線程如何使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Java Dubbo協(xié)議下的服務(wù)端線程如何使用”吧!
在了解服務(wù)端線程模型之前,先了解一下Dubbo對Channel上的操作抽象,Dubbo將Channel上的操作成了5中行為,分別是:建立連接、斷開連接、發(fā)送消息、接收消息、異常捕獲,Channel上的操作的接口為org.apache.dubbo.remoting.ChannelHandler
,該接口是SPI
的,用戶可以自己擴展,接口代碼如下:
該接口抽象的五種Channel上的行為解釋如下:
建立連接:connected,主要是的職責是在channel記錄read、write的時間,以及處理建立連接后的回調(diào)邏輯,比如dubbo支持在斷開后自定義回調(diào)的hook(onconnect),即在該操作中執(zhí)行。
斷開連接:disconnected,主要是的職責是在channel移除read、write的時間,以及處理端開連接后的回調(diào)邏輯,比如dubbo支持在斷開后自定義回調(diào)的hook(ondisconnect),即在該操作中執(zhí)行。
發(fā)送消息:sent,包括發(fā)送請求和發(fā)送響應(yīng)。記錄write的時間。
接收消息:received,包括接收請求和接收響應(yīng)。記錄read的時間。
異常捕獲:caught,用于處理在channel上發(fā)生的各類異常。
Dubbo框架的線程模型與以上這五種行為息息相關(guān),Dubbo協(xié)議Provider端線程模型提供了五種實現(xiàn),雖說都是五種但是別把二者混淆,線程模型的頂級接口是org.apache.dubbo.remoting.Dispatcher
,該接口也是SPI的,提供的五種實現(xiàn)分別是AllDispatcher
、DirectDispatcher
、MessageOnlyDispatcher
、ExecutionDispatcher
、ConnectionOrderedDispatcher
,默認的使用的是AllDispatcher
。
org.apache.dubbo.remoting.ChannelHandler
作為Channel上的行為的頂級接口對應(yīng)Dubbo協(xié)議Provider端的5種線程模型同樣也提供了對應(yīng)的5種實現(xiàn),分別是AllChannelHandler
、DirectChannelHandler
、MessageOnlyChannelHandler
、ExecutionChannelHandler
、ConnectionOrderedChannelHandler
,這里Channel上行為的具體實現(xiàn)不展開討論。
Channel上行為和線程模型之間使用策略可以參考org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers
的源代碼,這里不做詳細的介紹,下面的各個章節(jié)只針對5種線程模型做簡單的介紹。
IO線程上的操作:
接口響應(yīng)序列化
sent操作
Dubbo線程池上的操作:
received、connected、disconnected、caught都是在Dubbo線程池上執(zhí)行
服務(wù)端反序列化操作的Dubbo線程池上執(zhí)行
AllDispatcher
代碼如下,AllDispatcher
的dispatch
方法實例化了AllChannelHandler
,AllChannelHandler
實現(xiàn)了received、connected、disconnected、caught操作在dubbo線程池中,代碼如下:
public class AllDispatcher implements Dispatcher { public static final String NAME = "all"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new AllChannelHandler(handler, url); } }
public class AllChannelHandler extends WrappedChannelHandler { public AllChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } @Override public void connected(Channel channel) throws RemotingException { ExecutorService executor = getSharedExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } } @Override public void disconnected(Channel channel) throws RemotingException { ExecutorService executor = getSharedExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t); } } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if(message instanceof Request && t instanceof RejectedExecutionException){ sendFeedback(channel, (Request) message, t); return; } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } @Override public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService executor = getSharedExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); } } }
該線程模型Channel上的所有行為均在IO線程中執(zhí)行,并沒有在Dubbo線程池中執(zhí)行
DirectDispatcher
與AllDispatcher
相似,實例化了DirectChannelHandler
,DirectChannelHandler
只實現(xiàn)了received行為,但是received中獲取的線程池如果是ThreadlessExecutor
才會提交task,否則也是在ChannelHandler中執(zhí)行received行為,ThreadlessExecutor
和普通線程池最大的區(qū)別是不會管理任何線程,這里不展開討論。
public class DirectDispatcher implements Dispatcher { public static final String NAME = "direct"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new DirectChannelHandler(handler, url); } }
public class DirectChannelHandler extends WrappedChannelHandler { public DirectChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); if (executor instanceof ThreadlessExecutor) { try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } else { handler.received(channel, message); } } }
在IO線程中執(zhí)行的操作有:
sent、connected、disconnected、caught操作在IO線程上執(zhí)行。
序列化響應(yīng)在IO線程上執(zhí)行。
在Dubbo線程中執(zhí)行的操作有:
received都是在Dubbo線程上執(zhí)行的。
反序列化請求的行為在Dubbo中做的。
同樣的,我們可以直接看ExecutionChannelHandler
源碼,邏輯是當message的類型是Request
時received行為在Dubbo線程池執(zhí)行。感興趣的可以自己看源碼,這里不做介紹。
Message Only Dispatcher所有的received行為和反序列化都是在dubbo線程池中執(zhí)行的
public class MessageOnlyChannelHandler extends WrappedChannelHandler { public MessageOnlyChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if(message instanceof Request && t instanceof RejectedExecutionException){ sendFeedback(channel, (Request) message, t); return; } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } }
該線程模型與AllDispatcher
類似,sent操作和相應(yīng)的序列化是在IO線程上執(zhí)行;connected、disconnected、received、caught操作在dubbo線程池上執(zhí)行,他們的區(qū)別是在connected、disconnected行為上ConnectionOrderedDispatcher
做了線程池隔離,并且在Dubbo connected thread pool中提供了鏈接限制、告警燈能力,我們直接看ConnectionOrderedChannelHandler
源碼,代碼如下:
public class ConnectionOrderedChannelHandler extends WrappedChannelHandler { protected final ThreadPoolExecutor connectionExecutor; private final int queueWarningLimit; public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) { super(handler, url); String threadName = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME); connectionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)), new NamedThreadFactory(threadName, true), new AbortPolicyWithReport(threadName, url) ); // FIXME There's no place to release connectionExecutor! queueWarningLimit = url.getParameter(CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE); } @Override public void connected(Channel channel) throws RemotingException { try { checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } } @Override public void disconnected(Channel channel) throws RemotingException { try { checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t); } } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if (message instanceof Request && t instanceof RejectedExecutionException) { sendFeedback(channel, (Request) message, t); return; } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } @Override public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService executor = getSharedExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); } } private void checkQueueLength() { if (connectionExecutor.getQueue().size() > queueWarningLimit) { logger.warn(new IllegalThreadStateException("connectionordered channel handler queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queueWarningLimit)); } } }
到此,相信大家對“Java Dubbo協(xié)議下的服務(wù)端線程如何使用”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學習!
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。