溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

spring Kafka中的@KafkaListener源碼分析

發(fā)布時(shí)間:2023-02-25 11:15:21 來源:億速云 閱讀:191 作者:iii 欄目:開發(fā)技術(shù)

本篇內(nèi)容主要講解“spring Kafka中的@KafkaListener源碼分析”,感興趣的朋友不妨來看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“spring Kafka中的@KafkaListener源碼分析”吧!

    一、總體流程

    從spring啟動(dòng)開始處理@KafkaListener,到start消息監(jiān)聽整體流程圖

    spring Kafka中的@KafkaListener源碼分析

    二、源碼解讀

    1、postProcessAfterInitialization

    KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization

    	public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
    		if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
    		    Class<?> targetClass = AopUtils.getTargetClass(bean);
    		    
    		    // 掃描@KafkaListener注解
    			Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
    			
    			......
    			
    			if (annotatedMethods.isEmpty()) {
    				this.nonAnnotatedClasses.add(bean.getClass());
    				this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
    			}
    			else {
    				// Non-empty set of methods
    				for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
    					Method method = entry.getKey();
    					// 遍歷掃描到的所有@KafkaListener注解并開始處理
    					for (KafkaListener listener : entry.getValue()) {
    						processKafkaListener(listener, method, bean, beanName);
    					}
    				}
    				this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
    							+ beanName + "': " + annotatedMethods);
    			}
    			// 處理在類上的@KafkaListener注解
    			if (hasClassLevelListeners) {
    				processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
    			}
    		}
    		return bean;
    	}
    1.1、processKafkaListener

    KafkaListenerAnnotationBeanPostProcessor#processKafkaListener

    	protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
    		Method methodToUse = checkProxy(method, bean);
    		MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
    		endpoint.setMethod(methodToUse);
    		processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
    	}
    1.2、processListener

    KafkaListenerAnnotationBeanPostProcessor#processListener

    將每個(gè)kafkaListener轉(zhuǎn)變成MethodKafkaListenerEndpoint并注冊(cè)到KafkaListenerEndpointRegistrar容器,方便后續(xù)統(tǒng)一啟動(dòng)監(jiān)聽

    	protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
    			Object bean, Object adminTarget, String beanName) {
    
    		String beanRef = kafkaListener.beanRef();
    		if (StringUtils.hasText(beanRef)) {
    			this.listenerScope.addListener(beanRef, bean);
    		}
    		endpoint.setBean(bean);
    		endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
    		endpoint.setId(getEndpointId(kafkaListener));
    		endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
    		endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
    		endpoint.setTopics(resolveTopics(kafkaListener));
    		endpoint.setTopicPattern(resolvePattern(kafkaListener));
    		endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
    		String group = kafkaListener.containerGroup();
    
            ......
          
            // 注冊(cè)已經(jīng)封裝好的消費(fèi)端-endpoint
    		this.registrar.registerEndpoint(endpoint, factory);
    		
    		if (StringUtils.hasText(beanRef)) {
    			this.listenerScope.removeListener(beanRef);
    		}
    	}
    1.3、registerEndpoint

    KafkaListenerEndpointRegistrar#registerEndpoint

    	public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
    		
    	    ......
    		
    		KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
    		synchronized (this.endpointDescriptors) {
    		    // 如果到了需要立即啟動(dòng)監(jiān)聽的階段就直接注冊(cè)并監(jiān)聽(也就是創(chuàng)建消息監(jiān)聽容器并啟動(dòng))
    			if (this.startImmediately) { // Register and start immediately
    				this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
    						resolveContainerFactory(descriptor), true);
    			}
    			else {
    			    // 一般情況都先走這一步,添加至此列表,待bean后續(xù)的生命周期 統(tǒng)一注冊(cè)并啟動(dòng)
    				this.endpointDescriptors.add(descriptor);
    			}
    		}
    	}
    
    	public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
    			boolean startImmediately) {
    
            ......
            
    		synchronized (this.listenerContainers) {
    		
    			......
    			
    			// 1.創(chuàng)建消息監(jiān)聽容器
    			MessageListenerContainer container = createListenerContainer(endpoint, factory);
    			this.listenerContainers.put(id, container);
    			if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
    				List<MessageListenerContainer> containerGroup;
    				if (this.applicationContext.containsBean(endpoint.getGroup())) {
    					containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
    				}
    				else {
    					containerGroup = new ArrayList<MessageListenerContainer>();
    					this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
    				}
    				containerGroup.add(container);
    			}
                
                // 2.是否立即啟動(dòng)消息監(jiān)聽
    			if (startImmediately) {
    				startIfNecessary(container);
    			}
    		}
    	}
    1.4、startIfNecessary

    KafkaListenerEndpointRegistry#startIfNecessary
    啟動(dòng)消息監(jiān)聽

    	private void startIfNecessary(MessageListenerContainer listenerContainer) {
    		if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
    		    // 啟動(dòng)消息監(jiān)聽
    		    // 到這一步之后,消息監(jiān)聽以及處理都是KafkaMessageListenerContainer的邏輯
    		    // 到此也就打通了@KafkaListener到MessageListenerContainer消息監(jiān)聽容器的邏輯
    			listenerContainer.start();
    		}
    	}

    2、afterSingletonsInstantiated

    這一步是實(shí)例化(此處的實(shí)例化是已經(jīng)創(chuàng)建對(duì)象并完成了初始化操作)之后,緊接著的操作

    KafkaListenerAnnotationBeanPostProcessor#afterSingletonsInstantiated

    	public void afterSingletonsInstantiated() {
    		this.registrar.setBeanFactory(this.beanFactory);
    
            // 對(duì)"注冊(cè)員"信息的完善
    		if (this.beanFactory instanceof ListableBeanFactory) {
    			Map<String, KafkaListenerConfigurer> instances =
    					((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
    			for (KafkaListenerConfigurer configurer : instances.values()) {
    				configurer.configureKafkaListeners(this.registrar);
    			}
    		}
    
    		if (this.registrar.getEndpointRegistry() == null) {
    			if (this.endpointRegistry == null) {
    				Assert.state(this.beanFactory != null,
    						"BeanFactory must be set to find endpoint registry by bean name");
    				this.endpointRegistry = this.beanFactory.getBean(
    						KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
    						KafkaListenerEndpointRegistry.class);
    			}
    			this.registrar.setEndpointRegistry(this.endpointRegistry);
    		}
    
    		......
    
    		// Actually register all listeners
    		// 整個(gè)方法這里才是關(guān)鍵
    		// 創(chuàng)建MessageListenerContainer并注冊(cè)
    		this.registrar.afterPropertiesSet();
    	}
    2.1、afterPropertiesSet

    KafkaListenerEndpointRegistrar#afterPropertiesSet

    	public void afterPropertiesSet() {
    		registerAllEndpoints();
    	}
    2.2、registerAllEndpoints

    KafkaListenerEndpointRegistrar#registerAllEndpoints

    	protected void registerAllEndpoints() {
    		synchronized (this.endpointDescriptors) {
    			for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
    			    // 這里是真正的創(chuàng)建ListenerContainer監(jiān)聽對(duì)象并注冊(cè)
    				this.endpointRegistry.registerListenerContainer(
    						descriptor.endpoint, resolveContainerFactory(descriptor));
    			}
    			// 啟動(dòng)時(shí)所有消息監(jiān)聽對(duì)象都注冊(cè)之后,便將參數(shù)置為true
    			this.startImmediately = true;  // trigger immediate startup
    		}
    	}

    到此,相信大家對(duì)“spring Kafka中的@KafkaListener源碼分析”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

    向AI問一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI