您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“spring Kafka中的@KafkaListener源碼分析”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“spring Kafka中的@KafkaListener源碼分析”吧!
從spring啟動(dòng)開始處理@KafkaListener,到start消息監(jiān)聽整體流程圖
KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { if (!this.nonAnnotatedClasses.contains(bean.getClass())) { Class<?> targetClass = AopUtils.getTargetClass(bean); // 掃描@KafkaListener注解 Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass); ...... if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(bean.getClass()); this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass()); } else { // Non-empty set of methods for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) { Method method = entry.getKey(); // 遍歷掃描到的所有@KafkaListener注解并開始處理 for (KafkaListener listener : entry.getValue()) { processKafkaListener(listener, method, bean, beanName); } } this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '" + beanName + "': " + annotatedMethods); } // 處理在類上的@KafkaListener注解 if (hasClassLevelListeners) { processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName); } } return bean; }
KafkaListenerAnnotationBeanPostProcessor#processKafkaListener
protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) { Method methodToUse = checkProxy(method, bean); MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>(); endpoint.setMethod(methodToUse); processListener(endpoint, kafkaListener, bean, methodToUse, beanName); }
KafkaListenerAnnotationBeanPostProcessor#processListener
將每個(gè)kafkaListener轉(zhuǎn)變成MethodKafkaListenerEndpoint并注冊(cè)到KafkaListenerEndpointRegistrar容器,方便后續(xù)統(tǒng)一啟動(dòng)監(jiān)聽
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean, Object adminTarget, String beanName) { String beanRef = kafkaListener.beanRef(); if (StringUtils.hasText(beanRef)) { this.listenerScope.addListener(beanRef, bean); } endpoint.setBean(bean); endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory); endpoint.setId(getEndpointId(kafkaListener)); endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId())); endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener)); endpoint.setTopics(resolveTopics(kafkaListener)); endpoint.setTopicPattern(resolvePattern(kafkaListener)); endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix")); String group = kafkaListener.containerGroup(); ...... // 注冊(cè)已經(jīng)封裝好的消費(fèi)端-endpoint this.registrar.registerEndpoint(endpoint, factory); if (StringUtils.hasText(beanRef)) { this.listenerScope.removeListener(beanRef); } }
KafkaListenerEndpointRegistrar#registerEndpoint
public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) { ...... KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory); synchronized (this.endpointDescriptors) { // 如果到了需要立即啟動(dòng)監(jiān)聽的階段就直接注冊(cè)并監(jiān)聽(也就是創(chuàng)建消息監(jiān)聽容器并啟動(dòng)) if (this.startImmediately) { // Register and start immediately this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor), true); } else { // 一般情況都先走這一步,添加至此列表,待bean后續(xù)的生命周期 統(tǒng)一注冊(cè)并啟動(dòng) this.endpointDescriptors.add(descriptor); } } } public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory, boolean startImmediately) { ...... synchronized (this.listenerContainers) { ...... // 1.創(chuàng)建消息監(jiān)聽容器 MessageListenerContainer container = createListenerContainer(endpoint, factory); this.listenerContainers.put(id, container); if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) { List<MessageListenerContainer> containerGroup; if (this.applicationContext.containsBean(endpoint.getGroup())) { containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class); } else { containerGroup = new ArrayList<MessageListenerContainer>(); this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup); } containerGroup.add(container); } // 2.是否立即啟動(dòng)消息監(jiān)聽 if (startImmediately) { startIfNecessary(container); } } }
KafkaListenerEndpointRegistry#startIfNecessary
啟動(dòng)消息監(jiān)聽
private void startIfNecessary(MessageListenerContainer listenerContainer) { if (this.contextRefreshed || listenerContainer.isAutoStartup()) { // 啟動(dòng)消息監(jiān)聽 // 到這一步之后,消息監(jiān)聽以及處理都是KafkaMessageListenerContainer的邏輯 // 到此也就打通了@KafkaListener到MessageListenerContainer消息監(jiān)聽容器的邏輯 listenerContainer.start(); } }
這一步是實(shí)例化(此處的實(shí)例化是已經(jīng)創(chuàng)建對(duì)象并完成了初始化操作)之后,緊接著的操作
KafkaListenerAnnotationBeanPostProcessor#afterSingletonsInstantiated
public void afterSingletonsInstantiated() { this.registrar.setBeanFactory(this.beanFactory); // 對(duì)"注冊(cè)員"信息的完善 if (this.beanFactory instanceof ListableBeanFactory) { Map<String, KafkaListenerConfigurer> instances = ((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class); for (KafkaListenerConfigurer configurer : instances.values()) { configurer.configureKafkaListeners(this.registrar); } } if (this.registrar.getEndpointRegistry() == null) { if (this.endpointRegistry == null) { Assert.state(this.beanFactory != null, "BeanFactory must be set to find endpoint registry by bean name"); this.endpointRegistry = this.beanFactory.getBean( KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, KafkaListenerEndpointRegistry.class); } this.registrar.setEndpointRegistry(this.endpointRegistry); } ...... // Actually register all listeners // 整個(gè)方法這里才是關(guān)鍵 // 創(chuàng)建MessageListenerContainer并注冊(cè) this.registrar.afterPropertiesSet(); }
KafkaListenerEndpointRegistrar#afterPropertiesSet
public void afterPropertiesSet() { registerAllEndpoints(); }
KafkaListenerEndpointRegistrar#registerAllEndpoints
protected void registerAllEndpoints() { synchronized (this.endpointDescriptors) { for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) { // 這里是真正的創(chuàng)建ListenerContainer監(jiān)聽對(duì)象并注冊(cè) this.endpointRegistry.registerListenerContainer( descriptor.endpoint, resolveContainerFactory(descriptor)); } // 啟動(dòng)時(shí)所有消息監(jiān)聽對(duì)象都注冊(cè)之后,便將參數(shù)置為true this.startImmediately = true; // trigger immediate startup } }
到此,相信大家對(duì)“spring Kafka中的@KafkaListener源碼分析”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。