您好,登錄后才能下訂單哦!
rocketmq中ListenerContainerConfiguration的作用是什么,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@Configuration public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton { private final static Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class); private ConfigurableApplicationContext applicationContext; private AtomicLong counter = new AtomicLong(0); private StandardEnvironment environment; private RocketMQProperties rocketMQProperties; private ObjectMapper objectMapper; public ListenerContainerConfiguration(ObjectMapper rocketMQMessageObjectMapper, StandardEnvironment environment, RocketMQProperties rocketMQProperties) { this.objectMapper = rocketMQMessageObjectMapper; this.environment = environment; this.rocketMQProperties = rocketMQProperties; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = (ConfigurableApplicationContext) applicationContext; } @Override public void afterSingletonsInstantiated() { Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); if (Objects.nonNull(beans)) { beans.forEach(this::registerContainer); } } private void registerContainer(String beanName, Object bean) { Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName()); } RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); validate(annotation); String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext; genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); if (!container.isRunning()) { try { container.start(); } catch (Exception e) { log.error("Started container failed. {}", container, e); throw new RuntimeException(e); } } log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); } private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); String nameServer = environment.resolvePlaceholders(annotation.nameServer()); nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer; String accessChannel = environment.resolvePlaceholders(annotation.accessChannel()); container.setNameServer(nameServer); if (!StringUtils.isEmpty(accessChannel)) { container.setAccessChannel(AccessChannel.valueOf(accessChannel)); } container.setTopic(environment.resolvePlaceholders(annotation.topic())); container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener) bean); container.setObjectMapper(objectMapper); container.setName(name); // REVIEW ME, use the same clientId or multiple? return container; } private void validate(RocketMQMessageListener annotation) { if (annotation.consumeMode() == ConsumeMode.ORDERLY && annotation.messageModel() == MessageModel.BROADCASTING) { throw new BeanDefinitionValidationException( "Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!"); } } }
ListenerContainerConfiguration實現(xiàn)了ApplicationContextAware、SmartInitializingSingleton接口
其setApplicationContext方法保存了applicationContext;其afterSingletonsInstantiated方法會獲取標(biāo)注了RocketMQMessageListener注解的bean,然后挨個執(zhí)行registerContainer
registerContainer方法首先判斷該bean是否是RocketMQListener的實現(xiàn)類,不是則拋出IllegalStateException;接著獲取RocketMQMessageListener注解的信息,判斷是否設(shè)置了不支持的屬性;之后通過createRocketMQListenerContainer創(chuàng)建DefaultRocketMQListenerContainer并注冊到applicationContext,然后對于沒有running的container執(zhí)行start方法
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RocketMQMessageListener { String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}"; String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}"; String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}"; String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}"; String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}"; /** * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve * load balance. It's required and needs to be globally unique. * * * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion. */ String consumerGroup(); /** * Topic name. */ String topic(); /** * Control how to selector message. * * @see SelectorType */ SelectorType selectorType() default SelectorType.TAG; /** * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92} */ String selectorExpression() default "*"; /** * Control consume mode, you can choice receive message concurrently or orderly. */ ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY; /** * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice. */ MessageModel messageModel() default MessageModel.CLUSTERING; /** * Max consumer thread number. */ int consumeThreadMax() default 64; /** * Max consumer timeout, default 30s. */ long consumeTimeout() default 30000L; /** * The property of "access-key". */ String accessKey() default ACCESS_KEY_PLACEHOLDER; /** * The property of "secret-key". */ String secretKey() default SECRET_KEY_PLACEHOLDER; /** * Switch flag instance for message trace. */ boolean enableMsgTrace() default true; /** * The name value of message trace topic.If you don't config,you can use the default trace topic name. */ String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER; /** * The property of "name-server". */ String nameServer() default NAME_SERVER_PLACEHOLDER; /** * The property of "access-channel". */ String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER; }
RocketMQMessageListener注解定義了consumerGroup、topic、selectorType、selectorExpression、consumeMode、messageModel、consumeThreadMax、consumeTimeout、accessKey、secretKey、enableMsgTrace、customizedTraceTopic、nameServer、accessChannel屬性
ListenerContainerConfiguration實現(xiàn)了ApplicationContextAware、SmartInitializingSingleton接口
其setApplicationContext方法保存了applicationContext;其afterSingletonsInstantiated方法會獲取標(biāo)注了RocketMQMessageListener注解的bean,然后挨個執(zhí)行registerContainer
registerContainer方法首先判斷該bean是否是RocketMQListener的實現(xiàn)類,不是則拋出IllegalStateException;接著獲取RocketMQMessageListener注解的信息,判斷是否設(shè)置了不支持的屬性;之后通過createRocketMQListenerContainer創(chuàng)建DefaultRocketMQListenerContainer并注冊到applicationContext,然后對于沒有running的container執(zhí)行start方法
看完上述內(nèi)容,你們掌握rocketmq中ListenerContainerConfiguration的作用是什么的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。