您好,登錄后才能下訂單哦!
小編給大家分享一下Spring-Integration執(zhí)行過程的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
Spring-Integration基于Spring,在應(yīng)用程序中啟用了輕量級消息傳遞,并支持通過聲明式適配器與外部系統(tǒng)集成。這一段官網(wǎng)的介紹,概況了整個Integration的用途。個人感覺消息傳遞是真正的重點。
如上圖所示,典型的生產(chǎn)者-消費者模式,中間通過一個特定的通道進(jìn)行數(shù)據(jù)傳輸,說到這,是不是隱隱感覺到queue的存在。確實事實上這個所謂的通道默認(rèn)就是用的 blockingqueue。
Spring-Integration網(wǎng)上的資料是真少,再加上源碼分析的是更少。關(guān)于Spring-Integration的基本介紹直接去官網(wǎng)上看更加的直觀,這邊就不累述了。
今天主要是看個簡單的hello word進(jìn)來分析下整個執(zhí)行過程。
先看下代碼:
<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns="http://www.springframework.org/schema/integration" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd"> <annotation-config/> <channel id="oc" > <queue/> </channel> <beans:bean id="Beans" class="com.example.demo.Beans"/> </beans:beans>
@Configuration public class Beans { @ServiceActivator(inputChannel = "ic", outputChannel = "oc") public String sayHello(String name) { return "Hello " + name; } }
public class HelloWorldDemo { @Test public void testDemo() throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("/demo.xml", HelloWorldDemo.class); DirectChannel inputChannel = context.getBean("ic", DirectChannel.class); PollableChannel outputChannel = context.getBean("oc", PollableChannel.class); inputChannel.send(new GenericMessage<String>("World")); System.out.println("==> HelloWorldDemo: " + outputChannel.receive(0).getPayload()); context.close(); } } out: ==> HelloWorldDemo: Hello World
上面的代碼演示了調(diào)用方法的入站通道適配器和標(biāo)準(zhǔn)的出站通道適配器, 它們之間是一個帶注解的ServiceActivator。關(guān)于這個ServiceActivator就是一個消息端點。
消息端點的主要作用是以非侵入性方式將應(yīng)用程序代碼連接到消息傳遞框架。換句話說,理想情況下,應(yīng)用程序代碼應(yīng)該不知道消息對象或消息管道。這類似于 MVC 范式中controller 的作用。正如controller 處理 HTTP 請求一樣,消息端點處理消息。以及controller 映射到 URL 模式一樣,消息端點映射到消息通道。這兩種情況的目標(biāo)是相同的。
ServiceActivator是用于將服務(wù)實例連接到消息傳遞系統(tǒng)的通用端點。必須配置輸入消息通道,如果要調(diào)用的服務(wù)方法能夠返回值,還可以提供輸出消息通道。
具體流程如下圖:
上面的代碼比較簡單,但是或許會發(fā)現(xiàn)我們只定義了輸出通道oc,輸入通道ic竟然沒有定義也能正常應(yīng)用,是不是很奇怪?帶著疑問我們先看下ServiceActivator的源碼:
注釋上寫的很清楚,如果輸入通道不存在,將在應(yīng)用程序上下文中注冊具有此名稱的DirectChannel 。具體在哪定義,我們后面會看到,現(xiàn)在不急,先一步步來看他的執(zhí)行過程。
我們?nèi)植檎襍erviceActivator,看他是哪邊進(jìn)行處理的,最后發(fā)現(xiàn)了MessagingAnnotationPostProcessor類,用來處理方法級消息注解的BeanPostProcessor實現(xiàn)。
@Override public void afterPropertiesSet() { Assert.notNull(this.beanFactory, "BeanFactory must not be null"); ((BeanDefinitionRegistry) this.beanFactory).registerBeanDefinition( IntegrationContextUtils.DISPOSABLES_BEAN_NAME, BeanDefinitionBuilder.genericBeanDefinition(Disposables.class, Disposables::new) .getRawBeanDefinition()); this.postProcessors.put(Filter.class, new FilterAnnotationPostProcessor(this.beanFactory)); this.postProcessors.put(Router.class, new RouterAnnotationPostProcessor(this.beanFactory)); this.postProcessors.put(Transformer.class, new TransformerAnnotationPostProcessor(this.beanFactory)); this.postProcessors.put(ServiceActivator.class, new ServiceActivatorAnnotationPostProcessor(this.beanFactory)); this.postProcessors.put(Splitter.class, new SplitterAnnotationPostProcessor(this.beanFactory)); this.postProcessors.put(Aggregator.class, new AggregatorAnnotationPostProcessor(this.beanFactory)); this.postProcessors.put(InboundChannelAdapter.class, new InboundChannelAdapterAnnotationPostProcessor(this.beanFactory)); this.postProcessors.put(BridgeFrom.class, new BridgeFromAnnotationPostProcessor(this.beanFactory)); this.postProcessors.put(BridgeTo.class, new BridgeToAnnotationPostProcessor(this.beanFactory)); Map<Class<? extends Annotation>, MethodAnnotationPostProcessor<?>> customPostProcessors = setupCustomPostProcessors(); if (!CollectionUtils.isEmpty(customPostProcessors)) { this.postProcessors.putAll(customPostProcessors); } }
在afterPropertiesSet方法中,我們看到定義了一個后處理器postProcessors,里面注冊了相關(guān)的注解處理類。包含各種消息端點處理,除了上面寫的ServiceActivator,還有過濾器,路由,轉(zhuǎn)換器等各種不同的端點方法。
接著往向下看,既然實現(xiàn)了BeanPostProcessor
,那必然要用到postProcessAfterInitialization
方法實現(xiàn),這里的流程大概就是遍歷出包含有@ServiceActivator的bean方法,用來做后續(xù)處理。我們直接看重點的代碼。
Object result = postProcessor.postProcess(bean, beanName, targetMethod, annotations);
在AbstractMethodAnnotationPostProcessor
中有個共通方法postProcess用來生成對應(yīng)的端點信息。具體代碼:
@Override public Object postProcess(Object bean, String beanName, Method method, List<Annotation> annotations) { Object sourceHandler = null; if (beanAnnotationAware() && AnnotatedElementUtils.isAnnotated(method, Bean.class.getName())) { if (!this.beanFactory.containsBeanDefinition(resolveTargetBeanName(method))) { this.logger.debug("Skipping endpoint creation; perhaps due to some '@Conditional' annotation."); return null; } else { sourceHandler = resolveTargetBeanFromMethodWithBeanAnnotation(method); } } //生成對應(yīng)的MessageHandler,用來執(zhí)行對應(yīng)的注解的方法 MessageHandler handler = createHandler(bean, method, annotations); if (!(handler instanceof ReactiveMessageHandlerAdapter)) { orderable(method, handler); producerOrRouter(annotations, handler); if (!handler.equals(sourceHandler)) { handler = registerHandlerBean(beanName, method, handler); } handler = annotated(method, handler); handler = adviceChain(beanName, annotations, handler); } //將MessageHandler實現(xiàn)連接到消息端點,生成對應(yīng)的endpoint。 AbstractEndpoint endpoint = createEndpoint(handler, method, annotations); if (endpoint != null) { return endpoint; } else { return handler; } }
這里面主要是兩件事:
根據(jù)模板模式中不同的createHandler抽象方法實現(xiàn),生成對應(yīng)的MessageHandler。譬如說我們這邊的ServiceActivatorAnnotationPostProcessor
將MessageHandler實現(xiàn)連接到消息端點,生成對應(yīng)的endpoint。
1.createHandler
@Override protected MessageHandler createHandler(Object bean, Method method, List<Annotation> annotations) { AbstractReplyProducingMessageHandler serviceActivator; if (AnnotatedElementUtils.isAnnotated(method, Bean.class.getName())) { ... else { serviceActivator = new ServiceActivatingHandler(bean, method); } String requiresReply = MessagingAnnotationUtils.resolveAttribute(annotations, "requiresReply", String.class); if (StringUtils.hasText(requiresReply)) { serviceActivator.setRequiresReply(resolveAttributeToBoolean(requiresReply)); } String isAsync = MessagingAnnotationUtils.resolveAttribute(annotations, "async", String.class); if (StringUtils.hasText(isAsync)) { serviceActivator.setAsync(resolveAttributeToBoolean(isAsync)); } //是否設(shè)置了輸出通道 setOutputChannelIfPresent(annotations, serviceActivator); return serviceActivator; }
createHandler
的代碼比較簡單,就是根據(jù)注解中的幾個屬性還有對應(yīng)的方法參數(shù),生成ServiceActivatingHandler
。追溯下去ServiceActivatingHandler
中最后會生成一個委托對象MessagingMethodInvokerHelper
用來以反射的方式來執(zhí)行目標(biāo)方法。
2.createEndpoint
createEndpoint
字面上都能知道是生成消息端點,事實上也是,把生成的handler和對應(yīng)的管道進(jìn)行關(guān)聯(lián)。具體看下代碼體會:
protected AbstractEndpoint createEndpoint(MessageHandler handler, @SuppressWarnings("unused") Method method, List<Annotation> annotations) { AbstractEndpoint endpoint = null; //取得注解中inputChannelName String inputChannelName = MessagingAnnotationUtils.resolveAttribute(annotations, getInputChannelAttribute(), String.class); if (StringUtils.hasText(inputChannelName)) { MessageChannel inputChannel; try { //從beanFactory中取得對應(yīng)的通道bean inputChannel = this.channelResolver.resolveDestination(inputChannelName); } catch (DestinationResolutionException e) { //取不到,則自動注冊一個類型為DirectChannel的inputChannel if (e.getCause() instanceof NoSuchBeanDefinitionException) { inputChannel = new DirectChannel(); this.beanFactory.registerSingleton(inputChannelName, inputChannel); inputChannel = (MessageChannel) this.beanFactory.initializeBean(inputChannel, inputChannelName); if (this.disposables != null) { this.disposables.add((DisposableBean) inputChannel); } } else { throw e; } } Assert.notNull(inputChannel, () -> "failed to resolve inputChannel '" + inputChannelName + "'"); //生成endpoint endpoint = doCreateEndpoint(handler, inputChannel, annotations); } return endpoint; }
上面的代碼中,我們就能清楚的看到為什么我們在demo中沒有注冊輸入通道也能正常應(yīng)用的原因了,從而回答之前的疑問。
protected AbstractEndpoint doCreateEndpoint(MessageHandler handler, MessageChannel inputChannel, List<Annotation> annotations) { .... else if (inputChannel instanceof SubscribableChannel) { //生成SubscribableChannel類型對應(yīng)的執(zhí)行端點 return new EventDrivenConsumer((SubscribableChannel) inputChannel, handler); } else if (inputChannel instanceof PollableChannel) { return pollingConsumer(inputChannel, handler, pollers); } else { throw new IllegalArgumentException("Unsupported 'inputChannel' type: '" + inputChannel.getClass().getName() + "'. " + "Must be one of 'SubscribableChannel', 'PollableChannel' or 'ReactiveStreamsSubscribableChannel'"); } }
通道類型一共有兩種,一種是發(fā)布訂閱,一種是可輪詢的,我們是默認(rèn)是走的第一種,因為DirectChannel默認(rèn)就是個SubscribableChannel。所以最終我們生成了對應(yīng)的信息端點類EventDrivenConsumer。
我們先看下EventDrivenConsumer整體結(jié)構(gòu):
EventDrivenConsumer上面有一個抽象類AbstractEndpoint
,最上面實現(xiàn)了Lifecycle
接口,所以生命周期跟著容器走,我們直接跳到star方法看:
@Override protected void doStart() { this.logComponentSubscriptionEvent(true); //把handler和inputChannel進(jìn)行綁定 this.inputChannel.subscribe(this.handler); if (this.handler instanceof Lifecycle) { ((Lifecycle) this.handler).start(); } }
@Override public synchronized boolean addHandler(MessageHandler handler) { Assert.notNull(handler, "handler must not be null"); Assert.isTrue(this.handlers.size() < this.maxSubscribers, "Maximum subscribers exceeded"); boolean added = this.handlers.add(handler); if (this.handlers.size() == 1) { this.theOneHandler = handler; } else { this.theOneHandler = null; } return added; }
上面的代碼主要就是把handler注冊到inputChannel中,這樣只要inputChannel通道一收到信息,就會通知他注冊的handlers進(jìn)行處理。代碼中比較清楚的記錄了一切的操作,就不多解釋了。
執(zhí)行完上面一系列的注冊,已經(jīng)把這一些的通道打通了,剩下的就是真正的發(fā)送操作了。下面分析下inputChannel.send(new GenericMessage<String>("World"));
看看send操作:
/** * 在此頻道上發(fā)送消息。 如果通道已滿,則此方法將阻塞,直到發(fā)生超時或發(fā)送線程中斷。 如果指定的超時時間為 0,則該方法將立即返回。 如果小于零,它將無限期阻塞(請參閱send(Message) )。 * 參數(shù): * messageArg – 要發(fā)送的消息 * timeout - 以毫秒為單位的超時時間 * 返回: * true如果消息發(fā)送成功, false如果消息無法在規(guī)定時間內(nèi)發(fā)送或發(fā)送線程被中斷 */ @Override public boolean send(Message<?> messageArg, long timeout) { ... try { //message是否需要轉(zhuǎn)換 message = convertPayloadIfNecessary(message); //發(fā)送前攔截器 if (interceptorList.getSize() > 0) { interceptorStack = new ArrayDeque<>(); message = interceptorList.preSend(message, this, interceptorStack); if (message == null) { return false; } } if (this.metricsCaptor != null) { sample = this.metricsCaptor.start(); } //發(fā)送操作 sent = doSend(message, timeout); if (sample != null) { sample.stop(sendTimer(sent)); } metricsProcessed = true; if (debugEnabled) { logger.debug("postSend (sent=" + sent + ") on channel '" + this + "', message: " + message); } //發(fā)送后攔截器 if (interceptorStack != null) { interceptorList.postSend(message, this, sent); interceptorList.afterSendCompletion(message, this, sent, null, interceptorStack); } return sent; } catch (Exception ex) { ... } }
真正的send操作跟下去,會發(fā)現(xiàn)層次極深,礙于篇幅,我們直接跟到重點代碼:
@Override protected final void handleMessageInternal(Message<?> message) { Object result; if (this.advisedRequestHandler == null) { //反射執(zhí)行對應(yīng)的端點方法 result = handleRequestMessage(message); } else { result = doInvokeAdvisedRequestHandler(message); } if (result != null) { //往outputChannel發(fā)送執(zhí)行結(jié)果 sendOutputs(result, message); } ... }
handleRequestMessage的操作就是用之前我們handler中的委托類MessagingMethodInvokerHelper
去反射運行對應(yīng)的端點方法,然后把執(zhí)行結(jié)果發(fā)送outputChannel。最后我們直接定位到具體的發(fā)送操作:
@Override protected boolean doSend(Message<?> message, long timeout) { Assert.notNull(message, "'message' must not be null"); try { if (this.queue instanceof BlockingQueue) { BlockingQueue<Message<?>> blockingQueue = (BlockingQueue<Message<?>>) this.queue; if (timeout > 0) { return blockingQueue.offer(message, timeout, TimeUnit.MILLISECONDS); } if (timeout == 0) { return blockingQueue.offer(message); } blockingQueue.put(message); return true; } else { try { return this.queue.offer(message); } finally { this.queueSemaphore.release(); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; } }
看到這,我們就明白了數(shù)據(jù)的去向,存儲在隊列里了,生產(chǎn)者產(chǎn)生的數(shù)據(jù)就已經(jīng)生成了,所以發(fā)送的操作基本上就告一段落了。
數(shù)據(jù)已經(jīng)生成,后面就是看如何消費操作了,下面分析下 outputChannel.receive(0).getPayload()
操作:
/** * 從該通道接收第一條可用消息。 如果通道不包含任何消息,則此方法將阻塞,直到分配的超時時間過去。 如果指定的超時時間為 0,則該方法將立即返回。 如果小于零,它將無限期阻塞(參見receive() )。 * 參數(shù): * timeout - 以毫秒為單位的超時時間 * 返回: * 如果在分配的時間內(nèi)沒有可用的消息或接收線程被中斷,則為第一個可用消息或null 。 */ @Override // NOSONAR complexity @Nullable public Message<?> receive(long timeout) { ... try { //接受前攔截器操作 if (interceptorList.getSize() > 0) { interceptorStack = new ArrayDeque<>(); //一旦調(diào)用接收并在實際檢索消息之前調(diào)用 if (!interceptorList.preReceive(this, interceptorStack)) { return null; } } //接收操作 Message<?> message = doReceive(timeout); ... //在檢索到 Message 之后但在將其返回給調(diào)用者之前立即調(diào)用。 必要時可以修改消息 if (interceptorStack != null && message != null) { message = interceptorList.postReceive(message, this); } //在接收完成后調(diào)用,而不管已引發(fā)的任何異常,從而允許適當(dāng)?shù)馁Y源清理 interceptorList.afterReceiveCompletion(message, this, null, interceptorStack); return message; } catch (RuntimeException ex) { ... } }
最后的doReceive操作,其實大家都心知肚明了,就是從上面的隊列中直接讀取數(shù)據(jù),代碼比較簡單,就不注釋了:
@Override @Nullable protected Message<?> doReceive(long timeout) { try { if (timeout > 0) { if (this.queue instanceof BlockingQueue) { return ((BlockingQueue<Message<?>>) this.queue).poll(timeout, TimeUnit.MILLISECONDS); } else { return pollNonBlockingQueue(timeout); } } if (timeout == 0) { return this.queue.poll(); } if (this.queue instanceof BlockingQueue) { return ((BlockingQueue<Message<?>>) this.queue).take(); } else { Message<?> message = this.queue.poll(); while (message == null) { this.queueSemaphore.tryAcquire(50, TimeUnit.MILLISECONDS); // NOSONAR ok to ignore result message = this.queue.poll(); } return message; } } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } }
以上是“Spring-Integration執(zhí)行過程的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。