溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

@RabbitListener起作用的原理是什么

發(fā)布時(shí)間:2023-03-21 10:51:53 來(lái)源:億速云 閱讀:146 作者:iii 欄目:開(kāi)發(fā)技術(shù)

這篇文章主要講解了“@RabbitListener起作用的原理是什么”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“@RabbitListener起作用的原理是什么”吧!

一、前言

在spring中,定義rabbitMq的消費(fèi)者可以相當(dāng)方便,只需要在消息處理類或者類方法加上@RabbitListener注解,指定隊(duì)列名稱即可。

如下代碼:

@Component
public class RabbitMqListener1 {
    @RabbitListener(queues = "queue1")
    public void consumer1(Message message) {

    }

    @RabbitListener(queues = "queue2")
    public void consumer2(String messsageBody) {

    }
}


@Component
@RabbitListener(queues = "queue3")
public class RabbitMqListener2 {
    @RabbitHandler(isDefault=true)
    public void consumer3() {

    }
}

注意!?。∪绻鸃RabbitListener加在類上面,需要有一個(gè)默認(rèn)的處理方法@RabbitHandler(isDefault=true),默認(rèn)是false。

不設(shè)置一個(gè)true,消費(fèi)mq消息的時(shí)候會(huì)出現(xiàn)“Listener method ‘no match’ threw exception”異常。

原因在RabbitListenerAnnotationBeanPostProcessor.processMultiMethodListeners方法,有興趣的可以看下。

可以看到代碼相當(dāng)?shù)暮?jiǎn)單。但是?。?!為什么加上這個(gè)注解,就能作為一個(gè)consumer接受mq的消息呢?為啥處理mq消息的方法,入?yún)⒖梢阅敲措S意?

有經(jīng)驗(yàn)的程序員,可能會(huì)有這樣的設(shè)想:

1、單純看這些listener的代碼,只是定義了由spring管理的bean,要能監(jiān)聽(tīng)rabbitMq的消息,肯定需要有另外一個(gè)類,這個(gè)類會(huì)掃描所有加了@RabbitListener的bean,進(jìn)行加工。

2、看這些listener的代碼,可以發(fā)現(xiàn)處理mq消息的,都是具體的某個(gè)方法。那加工的過(guò)程,應(yīng)該就是利用反射拿到對(duì)象、方法和@RabbitListener中的queue屬性,然后建立一個(gè)綁定關(guān)系(對(duì)象+方法)——>(queue的consumer)。queue的consumer在接收到mq消息后,找到綁定的“對(duì)象+方法”,再通過(guò)反射的方式,調(diào)用真正的處理方法。

3、mq消息的處理方法,可以那么隨意,應(yīng)該是queue的consumer在調(diào)用真正處理方法之前,需要根據(jù)處理方法的參數(shù)類型,做一次數(shù)據(jù)轉(zhuǎn)換。

接下來(lái),就去看看源碼,看一下設(shè)想是不是正確的~~

二、源碼分析

1、誰(shuí)來(lái)掃描@RabbitListener注解的bean

在springBoot使用rabbit,一般是在@Configuration類上加上@EnableRabbit注解來(lái)開(kāi)啟rabbit功能。那我們就去看看@EnableRabbit注解的源碼,看這個(gè)注解的作用

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(RabbitBootstrapConfiguration.class)
public @interface EnableRabbit {
}

可以看到,這個(gè)注解的作用,是導(dǎo)入RabbitBootstrapConfiguration配置類

@Configuration
public class RabbitBootstrapConfiguration {

    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
        return new RabbitListenerAnnotationBeanPostProcessor();
    }

    @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
    public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
        return new RabbitListenerEndpointRegistry();
    }
}

RabbitBootstrapConfiguration 配置類的作用,就是定義了RabbitListenerAnnotationBeanPostProcessor 和RabbitListenerEndpointRegistry 兩個(gè)bean。

看到RabbitListenerAnnotationBeanPostProcessor 這個(gè)類名,就可以猜到,該類的實(shí)例bean就是用來(lái)掃描加了@RabbitListener 的類,并做一些加工。

(“RabbitListenerAnnotationBean”——針對(duì)添加了@RabbitListener注解的bean; “PostProcessor”——后置加工)

2、怎么建立(對(duì)象+方法)——>(queue的consumer)的映射關(guān)系

分析一下RabbitListenerAnnotationBeanPostProcessor類的源碼

// 實(shí)現(xiàn)了BeanPostProcessor、Ordered、BeanFactoryAware、BeanClassLoaderAware、EnvironmentAware和SmartInitializingSingleton 6個(gè)接口
public class RabbitListenerAnnotationBeanPostProcessor
        implements BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware,
        SmartInitializingSingleton {
        
    .......
    
    // 完成初始化bean之后,調(diào)用該方法
    @Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        Class<?> targetClass = AopUtils.getTargetClass(bean);

        TypeMetadata metadata = this.typeCache.get(targetClass);
        if (metadata == null) {
            metadata = buildMetadata(targetClass);
            this.typeCache.putIfAbsent(targetClass, metadata);
        }

        for (ListenerMethod lm : metadata.listenerMethods) {
            for (RabbitListener rabbitListener : lm.annotations) {
                processAmqpListener(rabbitListener, lm.method, bean, beanName);
            }
        }
        if (metadata.handlerMethods.length > 0) {
            processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
        }
        return bean;
    }

    // 根據(jù)Class,獲取元數(shù)據(jù)
    private TypeMetadata buildMetadata(Class<?> targetClass) {
        Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);
        final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
        final List<ListenerMethod> methods = new ArrayList<ListenerMethod>();
        final List<Method> multiMethods = new ArrayList<Method>();
        ReflectionUtils.doWithMethods(targetClass, new ReflectionUtils.MethodCallback() {

            @Override
            public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
                Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method);
                if (listenerAnnotations.size() > 0) {
                    methods.add(new ListenerMethod(method,
                            listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));
                }
                if (hasClassLevelListeners) {
                    RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);
                    if (rabbitHandler != null) {
                        multiMethods.add(method);
                    }
                }

            }
        }, ReflectionUtils.USER_DECLARED_METHODS);
        if (methods.isEmpty() && multiMethods.isEmpty()) {
            return TypeMetadata.EMPTY;
        }
        return new TypeMetadata(
                methods.toArray(new ListenerMethod[methods.size()]),
                multiMethods.toArray(new Method[multiMethods.size()]),
                classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()]));
    }

    // 檢查一下是否使用jdk代理,使用jdk代理方式必須實(shí)現(xiàn)了接口
    // new一個(gè)MethodRabbitListenerEndpoint對(duì)象,交由processListener方法進(jìn)行處理
    protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
        Method methodToUse = checkProxy(method, bean);
        MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
        endpoint.setMethod(methodToUse);
        endpoint.setBeanFactory(this.beanFactory);
        processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
    }

// 前面大半代碼都是對(duì)MethodRabbitListenerEndpoint對(duì)象的屬性設(shè)置:處理消息的bean、消息處理方法的工廠類、監(jiān)聽(tīng)的隊(duì)列名。。。。
// 通過(guò)beanFactory獲取RabbitListenerContainerFactory類的bean
// 調(diào)用RabbitListenerEndpointRegistar的registerEndpoint方法注冊(cè)mq消息消費(fèi)端點(diǎn)
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
            Object adminTarget, String beanName) {
        endpoint.setBean(bean);
        endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
        endpoint.setId(getEndpointId(rabbitListener));
        endpoint.setQueueNames(resolveQueues(rabbitListener));
        String group = rabbitListener.group();
        if (StringUtils.hasText(group)) {
            Object resolvedGroup = resolveExpression(group);
            if (resolvedGroup instanceof String) {
                endpoint.setGroup((String) resolvedGroup);
            }
        }

        endpoint.setExclusive(rabbitListener.exclusive());
        String priority = resolve(rabbitListener.priority());
        if (StringUtils.hasText(priority)) {
            try {
                endpoint.setPriority(Integer.valueOf(priority));
            }
            catch (NumberFormatException ex) {
                throw new BeanInitializationException("Invalid priority value for " +
                        rabbitListener + " (must be an integer)", ex);
            }
        }

        String rabbitAdmin = resolve(rabbitListener.admin());
        if (StringUtils.hasText(rabbitAdmin)) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name");
            try {
                endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));
            }
            catch (NoSuchBeanDefinitionException ex) {
                throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
                        adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" +
                        rabbitAdmin + "' was found in the application context", ex);
            }
        }


        RabbitListenerContainerFactory<?> factory = null;
        String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
        if (StringUtils.hasText(containerFactoryBeanName)) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
            try {
                factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
            }
            catch (NoSuchBeanDefinitionException ex) {
                throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
                        adminTarget + "] for bean " + beanName + ", no " +
                        RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +
                        containerFactoryBeanName + "' was found in the application context", ex);
            }
        }

        this.registrar.registerEndpoint(endpoint, factory);
    }
    ........

}

這個(gè)類的代碼比較長(zhǎng),只貼部分比較主要的部分,其他的,可以自己查看源碼進(jìn)行了解。

RabbitListenerAnnotationBeanPostProcessor實(shí)現(xiàn)了BeanPostProcessor(bean初始化后的后置處理)、Ordered(后置處理的排序)、BeanFactoryAware(注入BeanFactory)、BeanClassLoaderAware(注入BeanClassLoader)、EnvironmentAware(注入spring環(huán)境)和SmartInitializingSingleton(單例bean初始化后的回調(diào)) 6個(gè)接口。

我們需要關(guān)注的是BeanPostProcessor接口定義的方法,看postProcessAfterInitialization方法的代碼,大致流程為:

1、通過(guò)AopUtils得到bean代理的對(duì)象的class

2、判斷緩存中是否有該class的類型元數(shù)據(jù),如果沒(méi)有則調(diào)用buildMetadata方法生成類型元數(shù)據(jù)并放入緩存

3、遍歷加了@RabbitListener注解的方法,調(diào)用processAmqpListener方法進(jìn)行處理

4、調(diào)用processMultiMethodListeners方法對(duì)加了@RabbitHandler的方法進(jìn)行處理

關(guān)于buildMetadata方法:

代碼不復(fù)雜,就是利用反射,拿到class中,添加了@RabbitListener和@RabbitHandler注解的方法。另外,從代碼中也可以看出,@RabbitHandler注解要生效,必須在class上增加@RabbitListener注解

關(guān)于processAmqpListener方法:

沒(méi)有什么實(shí)際內(nèi)容,就干兩個(gè)事情:

1、檢查一下是否使用jdk代理,使用jdk代理方式必須實(shí)現(xiàn)了接口

2、new一個(gè)MethodRabbitListenerEndpoint對(duì)象,交由processListener方法進(jìn)行處理

關(guān)于processListener方法:

1、前面大半代碼都是對(duì)MethodRabbitListenerEndpoint對(duì)象的屬性設(shè)置:處理消息的bean、消息處理方法的工廠類、監(jiān)聽(tīng)的隊(duì)列名。。。。

其中要關(guān)注一下setMessageHandlerMethodFactory方法,查看MessageHandlerMethodFactory接口的源碼

public interface MessageHandlerMethodFactory {
    InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method);

從入?yún)⒑头祷刂悼梢钥闯鰜?lái),這個(gè)工廠的作用就是將spring的bean對(duì)象和方法包裝成一個(gè)InvocableHandlerMethod對(duì)象,也就是我們上面提到的(對(duì)象+方法)。

2、通過(guò)beanFactory獲取RabbitListenerContainerFactory類的bean。

3、調(diào)用RabbitListenerEndpointRegistar的registerEndpoint方法注冊(cè)mq消息消費(fèi)端點(diǎn)。

繼續(xù)往下追,看一下RabbitListenerEndpointRegistar的代碼:

public class RabbitListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
    // 將整個(gè)endpointDescriptors數(shù)組進(jìn)行注冊(cè)
    protected void registerAllEndpoints() {
        synchronized (this.endpointDescriptors) {
            for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
                this.endpointRegistry.registerListenerContainer(
                        descriptor.endpoint, resolveContainerFactory(descriptor));
            }
            this.startImmediately = true;  // trigger immediate startup
        }
    }
    
    // 解析得到RabbitListenerContainerFactory
    // 如果AmqpListenerEndpointDescriptor 的containerFactory屬性不為空,直接返回containerFactory
    // 如果為空,嘗試從beanFactory獲取
    private RabbitListenerContainerFactory<?> resolveContainerFactory(AmqpListenerEndpointDescriptor descriptor) {
        if (descriptor.containerFactory != null) {
            return descriptor.containerFactory;
        }
        else if (this.containerFactory != null) {
            return this.containerFactory;
        }
        else if (this.containerFactoryBeanName != null) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
            this.containerFactory = this.beanFactory.getBean(
                    this.containerFactoryBeanName, RabbitListenerContainerFactory.class);
            return this.containerFactory;  // Consider changing this if live change of the factory is required
        }
        else {
            throw new IllegalStateException("Could not resolve the " +
                    RabbitListenerContainerFactory.class.getSimpleName() + " to use for [" +
                    descriptor.endpoint + "] no factory was given and no default is set.");
        }
    }
    
    // new一個(gè)AmqpListenerEndpointDescriptor對(duì)象
    // 如果立即啟動(dòng),則調(diào)用RabbitListenerEndpointRegistry注冊(cè)器來(lái)注冊(cè)消息監(jiān)聽(tīng)
    // 如果不是立即啟動(dòng),則添加到endpointDescriptors列表中,后面通過(guò)registerAllEndpoints方法統(tǒng)一啟動(dòng)
    public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
        Assert.notNull(endpoint, "Endpoint must be set");
        Assert.hasText(endpoint.getId(), "Endpoint id must be set");
        // Factory may be null, we defer the resolution right before actually creating the container
        AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
        synchronized (this.endpointDescriptors) {
            if (this.startImmediately) { // Register and start immediately
                this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
                        resolveContainerFactory(descriptor), true);
            }
            else {
                this.endpointDescriptors.add(descriptor);
            }
        }
    }
}

從上面的代碼可以看出,我們關(guān)心的內(nèi)容,應(yīng)該是在RabbitListenerEndpointRegistry類的registerListenerContainer方法?。?/p>

public class RabbitListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware,
        ApplicationListener<ContextRefreshedEvent> {
        // 檢查是否被注冊(cè)過(guò),注冊(cè)過(guò)就不能注冊(cè)第二次
        // 調(diào)用createListenerContainer創(chuàng)建消息監(jiān)聽(tīng)
        // 關(guān)于分組消費(fèi)的,我們不關(guān)心
        // 是否立即啟動(dòng),是的話,同步調(diào)用startIfNecessary方法
        public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
                                          boolean startImmediately) {
        Assert.notNull(endpoint, "Endpoint must not be null");
        Assert.notNull(factory, "Factory must not be null");

        String id = endpoint.getId();
        Assert.hasText(id, "Endpoint id must not be empty");
        synchronized (this.listenerContainers) {
            Assert.state(!this.listenerContainers.containsKey(id),
                    "Another endpoint is already registered with id '" + id + "'");
            MessageListenerContainer container = createListenerContainer(endpoint, factory);
            this.listenerContainers.put(id, container);
            if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
                List<MessageListenerContainer> containerGroup;
                if (this.applicationContext.containsBean(endpoint.getGroup())) {
                    containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
                }
                else {
                    containerGroup = new ArrayList<MessageListenerContainer>();
                    this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
                }
                containerGroup.add(container);
            }
            if (startImmediately) {
                startIfNecessary(container);
            }
        }

    // 其實(shí)就是調(diào)用了RabbitListenerContainerFactory的createListenerContainer生成了一個(gè)MessageListenerContainer對(duì)象
    protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
            RabbitListenerContainerFactory<?> factory) {

        MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);

        if (listenerContainer instanceof InitializingBean) {
            try {
                ((InitializingBean) listenerContainer).afterPropertiesSet();
            }
            catch (Exception ex) {
                throw new BeanInitializationException("Failed to initialize message listener container", ex);
            }
        }

        int containerPhase = listenerContainer.getPhase();
        if (containerPhase < Integer.MAX_VALUE) {  // a custom phase value
            if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
                throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
                        this.phase + " vs " + containerPhase);
            }
            this.phase = listenerContainer.getPhase();
        }

        return listenerContainer;
    }
}

createListenerContainer方法調(diào)用了RabbitListenerContainerFactory接口的createListenerContainer方法創(chuàng)建一個(gè)MessageListenerContainer對(duì)象。

在這里,如果是通過(guò)RabbitAutoConfiguration自動(dòng)配置的,那么RabbitListenerContainerFactory接口的具體實(shí)現(xiàn)類是SimpleRabbitListenerContainerFactory,MessageListenerContainer接口的具體實(shí)現(xiàn)類是SimpleMessageListenerContainer。有興趣的話,可以去看下rabbitMq自動(dòng)配置的幾個(gè)類。

RabbitListenerContainerFactory接口的createListenerContainer方法是由AbstractRabbitListenerContainerFactory抽象類實(shí)現(xiàn),代碼如下:

    @Override
    public C createListenerContainer(RabbitListenerEndpoint endpoint) {
        C instance = createContainerInstance();

        if (this.connectionFactory != null) {
            instance.setConnectionFactory(this.connectionFactory);
        }
        if (this.errorHandler != null) {
            instance.setErrorHandler(this.errorHandler);
        }
        if (this.messageConverter != null) {
            instance.setMessageConverter(this.messageConverter);
        }
        if (this.acknowledgeMode != null) {
            instance.setAcknowledgeMode(this.acknowledgeMode);
        }
        if (this.channelTransacted != null) {
            instance.setChannelTransacted(this.channelTransacted);
        }
        if (this.autoStartup != null) {
            instance.setAutoStartup(this.autoStartup);
        }
        if (this.phase != null) {
            instance.setPhase(this.phase);
        }
        instance.setListenerId(endpoint.getId());
        // 最重要的一行?。?!
        endpoint.setupListenerContainer(instance);
        initializeContainer(instance);

        return instance;
    }

乍一看,都是對(duì)MessageListenerContainer實(shí)例的初始化,實(shí)際上有一行,相當(dāng)重要“ endpoint.setupListenerContainer(instance); ”,這一行最終是走到

AbstractRabbitListenerEndpoint.setupListenerContainer
public abstract class AbstractRabbitListenerEndpoint implements RabbitListenerEndpoint, BeanFactoryAware {
    ......
    
    // 設(shè)置MessageListenerContainer,最重要的就是設(shè)置監(jiān)聽(tīng)的隊(duì)列名稱?。?!
    @Override
    public void setupListenerContainer(MessageListenerContainer listenerContainer) {
        SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) listenerContainer;

        boolean queuesEmpty = getQueues().isEmpty();
        boolean queueNamesEmpty = getQueueNames().isEmpty();
        if (!queuesEmpty && !queueNamesEmpty) {
            throw new IllegalStateException("Queues or queue names must be provided but not both for " + this);
        }
        if (queuesEmpty) {
            Collection<String> names = getQueueNames();
            container.setQueueNames(names.toArray(new String[names.size()]));
        }
        else {
            Collection<Queue> instances = getQueues();
            container.setQueues(instances.toArray(new Queue[instances.size()]));
        }

        container.setExclusive(isExclusive());
        if (getPriority() != null) {
            Map<String, Object> args = new HashMap<String, Object>();
            args.put("x-priority", getPriority());
            container.setConsumerArguments(args);
        }

        if (getAdmin() != null) {
            container.setRabbitAdmin(getAdmin());
        }
        setupMessageListener(listenerContainer);
    }
    
    // 創(chuàng)建MessageListener
    protected abstract MessageListener createMessageListener(MessageListenerContainer container);

    // 創(chuàng)建MessageListener,設(shè)置到MessageListenerContainer 里
    private void setupMessageListener(MessageListenerContainer container) {
        MessageListener messageListener = createMessageListener(container);
        Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener");
        container.setupMessageListener(messageListener);
    }
    ......
}

用@RabbitLinstener注解的方法,使用的endpoint是MethodRabbitListenerEndpoint繼承自AbstractRabbitListenerEndpoint,所以看看AbstractRabbitListenerEndpoint的createMessageListener方法

public class MethodRabbitListenerEndpoint extends AbstractRabbitListenerEndpoint {
    ......
    @Override
    protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
        Assert.state(this.messageHandlerMethodFactory != null,
                "Could not create message listener - MessageHandlerMethodFactory not set");
        MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
        messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
        String replyToAddress = getDefaultReplyToAddress();
        if (replyToAddress != null) {
            messageListener.setResponseAddress(replyToAddress);
        }
        MessageConverter messageConverter = container.getMessageConverter();
        if (messageConverter != null) {
            messageListener.setMessageConverter(messageConverter);
        }
        if (getBeanResolver() != null) {
            messageListener.setBeanResolver(getBeanResolver());
        }
        return messageListener;
    }

    protected MessagingMessageListenerAdapter createMessageListenerInstance() {
        return new MessagingMessageListenerAdapter(this.bean, this.method);
    }
    
    ......
}

從上面代碼可以看出,createMessageListener方法返回了一個(gè)MessagingMessageListenerAdapter實(shí)例,MessagingMessageListenerAdapter實(shí)現(xiàn)了MessageListener接口

到這里,我們就能得出一些結(jié)論:

1、有@RabbitListener注解的方法,會(huì)生成MethodRabbitListenerEndpoint對(duì)象

2、通過(guò)MethodRabbitListenerEndpoint對(duì)象和SimpleRabbitListenerContainerFactory工廠bean,生成SimpleMessageListenerContainer對(duì)象

3、SimpleMessageListenerContainer對(duì)象保存了要監(jiān)聽(tīng)的隊(duì)列名,創(chuàng)建了用于處理消息的MessagingMessageListenerAdapter實(shí)例

4、MessagingMessageListenerAdapter持有@RabbitListener注解的對(duì)象和方法,起到一個(gè)適配器的作用

SimpleMessageListenerContainer是相當(dāng)重要的一個(gè)類,,包裝了整個(gè)mq消息消費(fèi)需要的信息:

1、保存了要監(jiān)聽(tīng)的隊(duì)列名,啟動(dòng)的時(shí)候,根據(jù)隊(duì)列名創(chuàng)建從服務(wù)器拉取消息的consumer&mdash;&mdash;BlockingQueueConsumer

2、創(chuàng)建了一個(gè)MessagingMessageListenerAdapter對(duì)象,當(dāng)consumer從服務(wù)器拿到消息后,由MessagingMessageListenerAdapter進(jìn)行處理

3、誰(shuí)來(lái)做數(shù)據(jù)轉(zhuǎn)換?

是MessagingMessageListenerAdapter,有興趣的,可以看看MessagingMessageListenerAdapter轉(zhuǎn)換參數(shù)的源碼。

感謝各位的閱讀,以上就是“@RabbitListener起作用的原理是什么”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)@RabbitListener起作用的原理是什么這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI