您好,登錄后才能下訂單哦!
本篇內(nèi)容介紹了“RocketMQTransactionAnnotationProcessor的原理和用法”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
本文主要研究一下RocketMQTransactionAnnotationProcessor
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java
public class RocketMQTransactionAnnotationProcessor implements BeanPostProcessor, Ordered, ApplicationContextAware { private final static Logger log = LoggerFactory.getLogger(RocketMQTransactionAnnotationProcessor.class); private ApplicationContext applicationContext; private final Set<Class<?>> nonProcessedClasses = Collections.newSetFromMap(new ConcurrentHashMap<Class<?>, Boolean>(64)); private TransactionHandlerRegistry transactionHandlerRegistry; public RocketMQTransactionAnnotationProcessor(TransactionHandlerRegistry transactionHandlerRegistry) { this.transactionHandlerRegistry = transactionHandlerRegistry; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (!this.nonProcessedClasses.contains(bean.getClass())) { Class<?> targetClass = AopUtils.getTargetClass(bean); RocketMQTransactionListener listener = AnnotationUtils.findAnnotation(targetClass, RocketMQTransactionListener.class); this.nonProcessedClasses.add(bean.getClass()); if (listener == null) { // for quick search log.trace("No @RocketMQTransactionListener annotations found on bean type: {}", bean.getClass()); } else { try { processTransactionListenerAnnotation(listener, bean); } catch (MQClientException e) { log.error("Failed to process annotation " + listener, e); throw new BeanCreationException("Failed to process annotation " + listener, e); } } } return bean; } private void processTransactionListenerAnnotation(RocketMQTransactionListener listener, Object bean) throws MQClientException { if (transactionHandlerRegistry == null) { throw new MQClientException("Bad usage of @RocketMQTransactionListener, " + "the class must work with RocketMQTemplate", null); } if (!RocketMQLocalTransactionListener.class.isAssignableFrom(bean.getClass())) { throw new MQClientException("Bad usage of @RocketMQTransactionListener, " + "the class must implement interface RocketMQLocalTransactionListener", null); } TransactionHandler transactionHandler = new TransactionHandler(); transactionHandler.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory()); transactionHandler.setName(listener.txProducerGroup()); transactionHandler.setBeanName(bean.getClass().getName()); transactionHandler.setListener((RocketMQLocalTransactionListener) bean); transactionHandler.setCheckExecutor(listener.corePoolSize(), listener.maximumPoolSize(), listener.keepAliveTime(), listener.blockingQueueSize()); RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), listener.accessKey(), listener.secretKey()); if (Objects.nonNull(rpcHook)) { transactionHandler.setRpcHook(rpcHook); } else { log.debug("Access-key or secret-key not configure in " + listener + "."); } transactionHandlerRegistry.registerTransactionHandler(transactionHandler); } @Override public int getOrder() { return LOWEST_PRECEDENCE; } }
RocketMQTransactionAnnotationProcessor實(shí)現(xiàn)了BeanPostProcessor, Ordered, ApplicationContextAware接口
postProcessAfterInitialization方法會(huì)查找標(biāo)記有RocketMQTransactionListener注解的bean,然后執(zhí)行processTransactionListenerAnnotation方法
processTransactionListenerAnnotation方法會(huì)創(chuàng)建transactionHandler,然后執(zhí)行transactionHandlerRegistry.registerTransactionHandler進(jìn)行注冊(cè)
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/config/TransactionHandler.java
class TransactionHandler { private String name; private String beanName; private RocketMQLocalTransactionListener bean; private BeanFactory beanFactory; private ThreadPoolExecutor checkExecutor; private RPCHook rpcHook; public String getBeanName() { return beanName; } public void setBeanName(String beanName) { this.beanName = beanName; } public String getName() { return name; } public void setName(String name) { this.name = name; } public RPCHook getRpcHook() { return rpcHook; } public void setRpcHook(RPCHook rpcHook) { this.rpcHook = rpcHook; } public BeanFactory getBeanFactory() { return beanFactory; } public void setBeanFactory(BeanFactory beanFactory) { this.beanFactory = beanFactory; } public void setListener(RocketMQLocalTransactionListener listener) { this.bean = listener; } public RocketMQLocalTransactionListener getListener() { return this.bean; } public void setCheckExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, int blockingQueueSize) { this.checkExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(blockingQueueSize)); } public ThreadPoolExecutor getCheckExecutor() { return checkExecutor; } }
TransactionHandler包含了name、beanName、bean、beanFactory、checkExecutor、rpcHook屬性
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
public class TransactionHandlerRegistry implements DisposableBean { private RocketMQTemplate rocketMQTemplate; private final Set<String> listenerContainers = new ConcurrentSet<>(); public TransactionHandlerRegistry(RocketMQTemplate template) { this.rocketMQTemplate = template; } @Override public void destroy() throws Exception { listenerContainers.clear(); } public void registerTransactionHandler(TransactionHandler handler) throws MQClientException { if (listenerContainers.contains(handler.getName())) { throw new MQClientException(-1, String .format("The transaction name [%s] has been defined in TransactionListener [%s]", handler.getName(), handler.getBeanName())); } listenerContainers.add(handler.getName()); rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(), handler.getListener(), handler.getCheckExecutor(), handler.getRpcHook()); } }
TransactionHandlerRegistry實(shí)現(xiàn)了DisposableBean接口,其clear方法直接清空l(shuí)istenerContainers;registerTransactionHandler方法會(huì)往listenerContainers添加該handler的name,然后執(zhí)行rocketMQTemplate.createAndStartTransactionMQProducer來(lái)創(chuàng)建并啟動(dòng)TransactionMQProducer
RocketMQTransactionAnnotationProcessor實(shí)現(xiàn)了BeanPostProcessor, Ordered, ApplicationContextAware接口
postProcessAfterInitialization方法會(huì)查找標(biāo)記有RocketMQTransactionListener注解的bean,然后執(zhí)行processTransactionListenerAnnotation方法
processTransactionListenerAnnotation方法會(huì)創(chuàng)建transactionHandler,然后執(zhí)行transactionHandlerRegistry.registerTransactionHandler進(jìn)行注冊(cè)
“RocketMQTransactionAnnotationProcessor的原理和用法”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注億速云網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。