您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“Dubbo服務(wù)導(dǎo)出到本地的方法”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“Dubbo服務(wù)導(dǎo)出到本地的方法”吧!
context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"}); // 刪除了一些步驟 public void refresh() throws BeansException, IllegalStateException { synchronized (this.startupShutdownMonitor) { try { // 1. 里面的核心代碼就是初始化了applicationEventMulticaster,用于后面發(fā)布事件使用 // this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory); initApplicationEventMulticaster(); // 2. 初始化非延遲加載的bean,這里就會初始化dubbo配置的一些bean,包括ServiceBean,用于服務(wù)導(dǎo)出 finishBeanFactoryInitialization(beanFactory); // 3. 發(fā)布容器刷新事件,這里面是服務(wù)導(dǎo)出的入口 finishRefresh(); } } }
// 步驟2分析 // 這里Spring容器會初始化非延遲加載的bean,包括<dubbo:service/>表示的bean // <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/> finishBeanFactoryInitialization(beanFactory);
// Spring容器初始化<dubbo:service/>表示的ServiceBean時會創(chuàng)建ServiceBean對象,由于ServiceBean實現(xiàn)了 // ApplicationContextAware接口,所以Spring容器會先調(diào)用setApplicationContext給其注入Spring容器 class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware { @Override public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; // 給SpringExtensionFactory注入Spring容器 SpringExtensionFactory.addApplicationContext(applicationContext); if (applicationContext != null) { SPRING_CONTEXT = applicationContext; try { // method是addListener方法,調(diào)用該方法用于給applicationEventMulticaster // 添加listener method.invoke(applicationContext, new Object[]{this}); supportedApplicationListener = true; } } } }
// 步驟3分析,發(fā)布相關(guān)事件,這里會發(fā)布容器刷新事件 finishRefresh(); protected void finishRefresh() { initLifecycleProcessor(); getLifecycleProcessor().onRefresh(); // 1). 發(fā)布容器刷新事件,ServiceBean監(jiān)聽的就是該事件 // ServiceBean implements ApplicationListener<ContextRefreshedEvent> publishEvent(new ContextRefreshedEvent(this)); LiveBeansView.registerApplicationContext(this); } // 2). 步驟1會走到這里,這里會獲取之前的applicationEventMulticaster,用于發(fā)布事件 getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); // 3). 走到這里,listener就是之前調(diào)用 method.invoke(applicationContext, new Object[]{this}); // 加進(jìn)去的ServiceBean,this表示ServiceBean,也是listener,event就是容器刷新事件 doInvokeListener(listener, event); // 4) 走到這里,最終調(diào)用ServiceBean實現(xiàn)的onApplicationEvent方法 listener.onApplicationEvent(event);
這樣,就走到了Dubbo暴露服務(wù)的入口
的方法。這也是Dubbo官方文檔中提及的入口方法,參考 服務(wù)導(dǎo)出
public void onApplicationEvent(ContextRefreshedEvent event) { // 如果服務(wù)沒有被暴露并且服務(wù)沒有被取消暴露,則打印日志 if (isDelay() && !isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } // 導(dǎo)出服務(wù) export(); } }
接下來研究一下 Dubbo 導(dǎo)出服務(wù)的過程。Dubbo 服務(wù)導(dǎo)出過程始于Spring容器發(fā)布刷新事件,Dubbo在接收到事件后,會立即執(zhí)行服務(wù)導(dǎo)出邏輯。整個邏輯大致可分為三個部分,第一部分是前置工作,主要用于檢查參數(shù),組裝 URL。第二部分是導(dǎo)出服務(wù),包含導(dǎo)出服務(wù)到本地 (JVM),和導(dǎo)出服務(wù)到遠(yuǎn)程兩個過程。第三部分是向注冊中心注冊服務(wù),用于服務(wù)發(fā)現(xiàn)。下面將會對這三個部分代碼進(jìn)行詳細(xì)的分析。
服務(wù)導(dǎo)出的入口方法是ServiceBean的onApplicationEvent
。onApplicationEvent 是一個事件響應(yīng)方法,該方法會在收到Spring上下文刷新事件后執(zhí)行服務(wù)導(dǎo)出操作。方法代碼如下
public void onApplicationEvent(ContextRefreshedEvent event) { // 如果服務(wù)沒有被暴露并且服務(wù)沒有被取消暴露,則打印日志 if (isDelay() && !isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } // 導(dǎo)出服務(wù) export(); } }
這個方法首先會根據(jù)條件決定是否導(dǎo)出服務(wù),比如有些服務(wù)設(shè)置了延時導(dǎo)出,那么此時就不應(yīng)該在此處導(dǎo)出。還有一些服務(wù)已經(jīng)被導(dǎo)出了,或者當(dāng)前服務(wù)被取消導(dǎo)出了,此時也不能再次導(dǎo)出相關(guān)服務(wù)。注意這里的 isDelay 方法,這個方法字面意思是“是否延遲導(dǎo)出服務(wù)”,返回 true 表示延遲導(dǎo)出,false 表示不延遲導(dǎo)出。但是該方法真實意思卻并非如此,當(dāng)方法返回 true 時,表示無需延遲導(dǎo)出。返回 false 時,表示需要延遲導(dǎo)出。與字面意思恰恰相反,這個需要大家注意一下。 前置工作主要包含兩個部分,分別是配置檢查,以及 URL 裝配。在導(dǎo)出服務(wù)之前,Dubbo 需要檢查用戶的配置是否合理,或者為用戶補充缺省配置。配置檢查完成后,接下來需要根據(jù)這些配置組裝 URL。在 Dubbo 中,URL 的作用十分重要。Dubbo 使用 URL 作為配置載體,所有的拓展點都是通過 URL 獲取配置。這一點,官方文檔中有所說明。下面的export方法會走到doExport()
方法。
public synchronized void export() { if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && !export) { return; } if (delay != null && delay > 0) { delayExportExecutor.schedule(new Runnable() { @Override public void run() { doExport(); } }, delay, TimeUnit.MILLISECONDS); } else { doExport(); } }
以下是配置檢查的相關(guān)分析,代碼比較多,需要大家耐心看一下。下面對配置檢查的邏輯進(jìn)行簡單的總結(jié),如下:
檢測 dubbo:service 標(biāo)簽的 interface 屬性合法性,不合法則拋出異常
檢測 ProviderConfig、ApplicationConfig 等核心配置類對象是否為空,若為空,則嘗試從其他配置類對象中獲取相應(yīng)的實例。
檢測并處理泛化服務(wù)和普通服務(wù)類
檢測本地存根配置,并進(jìn)行相應(yīng)的處理
對 ApplicationConfig、RegistryConfig 等配置類進(jìn)行檢測,為空則嘗試創(chuàng)建,若無法創(chuàng)建則拋出異常配置檢查并非本文重點,因此這里不打算對 doExport 方法所調(diào)用的方法進(jìn)行分析(doExportUrls 方法除外)。在這些方法中,除了appendProperties方法稍微復(fù)雜一些,其他方法邏輯不是很復(fù)雜。因此,大家可自行分析。
protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("Already unexported!"); } if (exported) { return; } exported = true; if (interfaceName == null || interfaceName.length() == 0) { // 拋異常 } checkDefault(); if (provider != null) { if (application == null) { application = provider.getApplication(); } if (module == null) { module = provider.getModule(); } if (registries == null) { registries = provider.getRegistries(); } if (monitor == null) { monitor = provider.getMonitor(); } if (protocols == null) { protocols = provider.getProtocols(); } } if (module != null) { if (registries == null) { registries = module.getRegistries(); } if (monitor == null) { monitor = module.getMonitor(); } } if (application != null) { if (registries == null) { registries = application.getRegistries(); } if (monitor == null) { monitor = application.getMonitor(); } } // 檢測ref是否為泛化服務(wù)類型 if (ref instanceof GenericService) { // 設(shè)置interfaceClass為GenericService interfaceClass = GenericService.class; if (StringUtils.isEmpty(generic)) { // 設(shè)置generic = true generic = Boolean.TRUE.toString(); } } else { try { // 獲得接口類型 interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } // 對interfaceClass,以及<dubbo:method>標(biāo)簽中的必要字段進(jìn)行檢查 checkInterfaceAndMethods(interfaceClass, methods); // 對ref合法性進(jìn)行檢測 checkRef(); generic = Boolean.FALSE.toString(); } // stub local一樣都是配置本地存根 if (local != null) { if ("true".equals(local)) { local = interfaceName + "Local"; } Class<?> localClass; try { localClass = ClassHelper.forNameWithThreadContextClassLoader(local); } } if (stub != null) { if ("true".equals(stub)) { stub = interfaceName + "Stub"; } Class<?> stubClass; try { stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub); } } checkApplication(); checkRegistry(); checkProtocol(); appendProperties(this); // 本地存根、mock合法性校驗 checkStubAndMock(interfaceClass); if (path == null || path.length() == 0) { path = interfaceName; } // 核心代碼,暴露服務(wù)、注冊邏輯就在其中 doExportUrls(); ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref); ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel); }
Dubbo 允許我們使用不同的協(xié)議導(dǎo)出服務(wù),也允許我們向多個注冊中心注冊服務(wù)。Dubbo在doExportUrls方法中對多協(xié)議,多注冊中心進(jìn)行了支持。相關(guān)代碼如下
/** * 多協(xié)議多注冊中心暴露服務(wù)進(jìn)行支持 */ private void doExportUrls() { // 加載注冊中心鏈接 List<URL> registryURLs = loadRegistries(true); // 遍歷protocols,并在每個協(xié)議下暴露 for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
上面代碼首先是通過loadRegistries加載注冊中心鏈接,然后再遍歷ProtocolConfig集合導(dǎo)出每個服務(wù)。并在導(dǎo)出服務(wù)的過程中,將服務(wù)注冊到注冊中心。我們先來看一下loadRegistries方法的邏輯。先可以打開看下該方法可以得到什么。
<!-- provider's application name, used for tracing dependency relationship --> <dubbo:application name="demo-provider"/> <!-- use multicast registry center to export service --> <dubbo:registry address="zookeeper://10.101.99.127:2181"/> <!-- use dubbo protocol to export service on port 20880 --> <dubbo:protocol name="dubbo" port="20880"/> <dubbo:provider server="netty4"/> <!-- service implementation, as same as regular local bean --> <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/> <!-- declare the service interface to be exported --> <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>
protected List<URL> loadRegistries(boolean provider) { checkRegistry(); List<URL> registryList = new ArrayList<URL>(); // 如果registries為空,直接返回空集合 if (registries != null && !registries.isEmpty()) { // 遍歷注冊中心配置集合registries for (RegistryConfig config : registries) { // 獲得地址 String address = config.getAddress(); // 若地址為空,則設(shè)置為0.0.0.0 if (address == null || address.length() == 0) { address = Constants.ANYHOST_VALUE; } String sysaddress = System.getProperty("dubbo.registry.address"); if (sysaddress != null && sysaddress.length() > 0) { address = sysaddress; } // 如果地址為N/A,則跳過 if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) { Map<String, String> map = new HashMap<String, String>(); // 添加ApplicationConfig中的字段信息到map中 appendParameters(map, application); // 添加RegistryConfig字段信息到map中 appendParameters(map, config); // 添加path、協(xié)議版本 map.put("path", RegistryService.class.getName()); map.put("dubbo", Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } // 如果map中沒有protocol,則默認(rèn)為使用dubbo協(xié)議 if (!map.containsKey("protocol")) { if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) { map.put("protocol", "remote"); } else { map.put("protocol", "dubbo"); } } // 解析得到URL列表,address可能包含多個注冊中心ip,因此解析得到的是一個URL列表 List<URL> urls = UrlUtils.parseURLs(address, map); // 遍歷URL 列表 for (URL url : urls) { // 將URL協(xié)議頭設(shè)置為registry url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol()); // 這里將協(xié)議設(shè)置為了registry,這也是后面調(diào)用的是RegistryProtocol的export()方法原因 url = url.setProtocol(Constants.REGISTRY_PROTOCOL); // 通過判斷條件,決定是否添加url到registryList中,條件如下: // 如果是服務(wù)提供者,并且是注冊中心服務(wù)或者是消費者端,并且是訂閱服務(wù),則加入到registryList if ((provider && url.getParameter(Constants.REGISTER_KEY, true)) || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) { registryList.add(url); } } } } } return registryList; }
ProtocolConfig主要封裝了<dubbo:protocol name="dubbo" port="20880"/>標(biāo)簽的信息,意思是使用Dubbo協(xié)議暴露服務(wù)。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { // 獲取協(xié)議名 String name = protocolConfig.getName(); // 如果為空,則是默認(rèn)的dubbo if (name == null || name.length() == 0) { name = "dubbo"; } Map<String, String> map = new HashMap<String, String>(); // 設(shè)置服務(wù)提供者側(cè) map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } // 這段代碼其實完成了子節(jié)點配置信息對父節(jié)點的覆蓋 appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider, Constants.DEFAULT_KEY); appendParameters(map, protocolConfig); appendParameters(map, this); // 如果method的配置列表不為空 if (methods != null && !methods.isEmpty()) { // 遍歷method配置列表 for (MethodConfig method : methods) { // 把方法名加入map appendParameters(map, method, method.getName()); // 添加 MethodConfig對象的字段信息到map中,鍵=方法名.屬性名 // 比如存儲<dubbo:method name="sayHello" retries="2">對應(yīng)的MethodConfig, // 鍵=sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"} String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); // 如果retryValue為false,則不重試,設(shè)置值為0 if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } }
if (ProtocolUtils.isGeneric(generic)) { map.put(Constants.GENERIC_KEY, generic); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(Constants.TOKEN_KEY, token); } }
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) { protocolConfig.setRegister(false); map.put("notify", "false"); } // export service String contextPath = protocolConfig.getContextpath(); if ((contextPath == null || contextPath.length() == 0) && provider != null) { contextPath = provider.getContextpath(); } String host = this.findConfigedHosts(protocolConfig, registryURLs, map); Integer port = this.findConfigedPorts(protocolConfig, name, map); URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); }
String scope = url.getParameter(Constants.SCOPE_KEY); // don't export when none is configured if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { // 暴露到本地 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } // 暴露到遠(yuǎn)程 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { // 后面分析 } } this.urls.add(url); }
前置工作做完,接下來就可以進(jìn)行服務(wù)導(dǎo)出了。服務(wù)導(dǎo)出分為導(dǎo)出到本地(JVM)和導(dǎo)出到遠(yuǎn)程。
// 暴露到本地 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } private void exportLocal(URL url) { // 如果協(xié)議不是injvm if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { // 生成本地的url,分別把協(xié)議改為injvm,設(shè)置host和port URL local = URL.valueOf(url.toFullString()).setProtocol(Constants.LOCAL_PROTOCOL).setHost(LOCALHOST).setPort(0); ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref)); // 通過代理工程創(chuàng)建invoker // 再調(diào)用export方法進(jìn)行暴露服務(wù),生成Exporter // 這里的protocol是生成的拓展代理對象,具體可看https://segmentfault.com/a/1190000020384210 // 它是在運行時才根據(jù)URL中的protocol參數(shù)去決定運行哪個Protocol實例的export方法,這里由于前面 // setProtocol(Constants.LOCAL_PROTOCOL),所以調(diào)用的是InjvmProtocol的export方法 Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); // 把生成的暴露者加入集合 exporters.add(exporter); } }
下面兩個是url和local的具體值,因為Dubbo采用自適應(yīng)拓展機(jī)制,exportLocal(URL url)中用到的protocol是自適應(yīng)拓展,protocol的export方法會用到URL中protocol參數(shù)從而決定具體生成protocol的哪個實例,所以URL的protocol值可以關(guān)注下。
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
下面分析下面這句代碼。它是核心方法,分為兩步。
Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); 1) proxyFactory.getInvoker(ref, (Class) interfaceClass, local) -> 返回invoker 2) protocol.export(invoker)
// 步驟1)分析 // proxyFactory也是自適應(yīng)拓展代理帶,它默認(rèn)使用JavassistProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); // 這里調(diào)用的就是JavassistProxyFactory的getInvoker方法 public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // 創(chuàng)建Wrapper對象 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); // 創(chuàng)建匿名Invoker類對象,并實現(xiàn)doInvoke方法 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments) throws Throwable { // 調(diào)用Wrapper的invokeMethod方法,invokeMethod最終會調(diào)用目標(biāo)方法 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
在 Dubbo 中,Invoker是一個非常重要的模型
。在服務(wù)提供端,以及服務(wù)引用端均會出現(xiàn)Invoker。Dubbo 官方文檔中對Invoker進(jìn)行了說明,這里引用一下。Invoker是實體域,它是Dubbo的核心模型,其它模型都向它靠擾,或轉(zhuǎn)換成它,它代表一個可執(zhí)行體,可向它發(fā)起invoke調(diào)用
,它有可能是一個本地的實現(xiàn),也可能是一個遠(yuǎn)程的實現(xiàn),也可能一個集群實現(xiàn)。這里面getInvoker方法創(chuàng)建了一個匿名Invoker對象,我理解是通過invoke實行遠(yuǎn)程調(diào)用時,會走wrapper.invokeMethod方法,而wrapper實際上是一個代理類,調(diào)用wrapper.invokeMethod最終會走proxy,也就是DemoService的sayHello方法。Wrapper創(chuàng)建比較復(fù)雜,可以參考 Dubbo中JavaAssist的Wrapper.getWrapper生成代理分析。
// 步驟2分析,調(diào)用的是InjvmProtocol的export方法 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 該方法只是創(chuàng)建了一個,因為暴露到本地,所以在同一個jvm中,所以不需要其他操作 return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap); }
到此,相信大家對“Dubbo服務(wù)導(dǎo)出到本地的方法”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。