溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

JPA多數(shù)據(jù)源分布式事務(wù)的示例分析

發(fā)布時(shí)間:2022-02-24 09:34:02 來(lái)源:億速云 閱讀:222 作者:小新 欄目:開發(fā)技術(shù)

這篇文章主要介紹了JPA多數(shù)據(jù)源分布式事務(wù)的示例分析,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

    問(wèn)題背景

    在解決mysql字段脫敏處理時(shí),結(jié)合sharding-jdbc的脫敏組件功能,為了sql兼容和最小化應(yīng)用改造,博主給出了一個(gè)多數(shù)據(jù)源融合的字段脫敏解決方案(只把包含脫敏字段表的操作走sharding-jdbc脫敏代理數(shù)據(jù)源)。這個(gè)方案解決了問(wèn)題的同時(shí),帶來(lái)了一個(gè)新的問(wèn)題,數(shù)據(jù)源的事務(wù)是獨(dú)立的,正如我文中所述《JPA項(xiàng)目多數(shù)據(jù)源模式整合sharding-jdbc實(shí)現(xiàn)數(shù)據(jù)脫敏》,在spring上下文中,每個(gè)數(shù)據(jù)源對(duì)應(yīng)一個(gè)獨(dú)立的事務(wù)管理器,默認(rèn)的事務(wù)管理器的數(shù)據(jù)源就用業(yè)務(wù)本身的數(shù)據(jù)源,所以需要加密的業(yè)務(wù)使用時(shí),需要指定@Transactional注解里的事務(wù)管理器名稱為脫敏對(duì)應(yīng)的事務(wù)管理器名稱。簡(jiǎn)單的業(yè)務(wù)場(chǎng)景這樣用也就沒有問(wèn)題了,但是一般的業(yè)務(wù)場(chǎng)景總有一個(gè)事務(wù)覆蓋兩個(gè)數(shù)據(jù)源的操作,這個(gè)時(shí)候單指定哪個(gè)事務(wù)管理器都不行,so,這里需要一種多數(shù)據(jù)源的事務(wù)管理器。

    XA事務(wù)方案

    XA協(xié)議采用2PC(兩階段提交)的方式來(lái)管理分布式事務(wù)。XA接口提供資源管理器與事務(wù)管理器之間進(jìn)行通信的標(biāo)準(zhǔn)接口。在JDBC的XA事務(wù)相關(guān)api抽象里,相關(guān)接口定義如下

    XADataSource,XA協(xié)議數(shù)據(jù)源

    public interface XADataSource extends CommonDataSource {
      /**
       * 嘗試建立物理數(shù)據(jù)庫(kù)連接,使用給定的用戶名和密碼。返回的連接可以在分布式事務(wù)中使用
       */
      XAConnection getXAConnection() throws SQLException;
       //省略getLogWriter等非關(guān)鍵方法
     }

    XAConnection

    public interface XAConnection extends PooledConnection {
        /**
         * 檢索一個(gè){@code XAResource}對(duì)象,事務(wù)管理器將使用該對(duì)象管理該{@code XAConnection}對(duì)象在分布式事務(wù)中的事務(wù)行為
         */
        javax.transaction.xa.XAResource getXAResource() throws SQLException;
    }

    XAResource

    public interface XAResource {
        /**
         * 提交xid指定的全局事務(wù)
         */
        void commit(Xid xid, boolean onePhase) throws XAException;
        /**
         * 結(jié)束代表事務(wù)分支執(zhí)行的工作。資源管理器從指定的事務(wù)分支中分離XA資源,并讓事務(wù)完成。
         */
        void end(Xid xid, int flags) throws XAException;
        /**
         * 通知事務(wù)管理器忽略此xid事務(wù)分支
         */
        void forget(Xid xid) throws XAException;
        /**
         * 判斷是否同一個(gè)資源管理器
         */
        boolean isSameRM(XAResource xares) throws XAException;
        /**
         * 指定xid事務(wù)準(zhǔn)備階段
         */
        int prepare(Xid xid) throws XAException;
        /**
         * 從資源管理器獲取準(zhǔn)備好的事務(wù)分支的列表。事務(wù)管理器在恢復(fù)期間調(diào)用此方法,
         * 以獲取當(dāng)前處于準(zhǔn)備狀態(tài)或初步完成狀態(tài)的事務(wù)分支的列表。
         */
        Xid[] recover(int flag) throws XAException;
        /**
         * 通知資源管理器回滾代表事務(wù)分支完成的工作。
         */
        void rollback(Xid xid) throws XAException;
        /**
         * 代表xid中指定的事務(wù)分支開始工作。
         */
        void start(Xid xid, int flags) throws XAException;
        //省略非關(guān)鍵方法
    }

    相比較普通的事務(wù)管理,JDBC的XA協(xié)議管理多了一個(gè)XAResource資源管理器,XA事務(wù)相關(guān)的行為(開啟、準(zhǔn)備、提交、回滾、結(jié)束)都由這個(gè)資源管理器來(lái)控制,這些都是框架內(nèi)部的行為,體現(xiàn)在開發(fā)層面提供的數(shù)據(jù)源也變成了XADataSource。而JTA的抽象里,定義了UserTransaction、TransactionManager。想要使用JTA事務(wù),必須先實(shí)現(xiàn)這兩個(gè)接口。所以,如果我們要使用JTA+XA控制多數(shù)據(jù)源的事務(wù),在sprign boot里以Atomikos為例,

    引入Atomikos依賴

    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jta-atomikos</artifactId>
    </dependency>

    spring boot已經(jīng)幫我們把XA事務(wù)管理器自動(dòng)裝載類定義好了,如:

    創(chuàng)建JTA事務(wù)管理器

    @Configuration(proxyBeanMethods = false)
    @EnableConfigurationProperties({ AtomikosProperties.class, JtaProperties.class })
    @ConditionalOnClass({ JtaTransactionManager.class, UserTransactionManager.class })
    @ConditionalOnMissingBean(PlatformTransactionManager.class)
    class AtomikosJtaConfiguration {
    	@Bean(initMethod = "init", destroyMethod = "shutdownWait")
    	@ConditionalOnMissingBean(UserTransactionService.class)
    	UserTransactionServiceImp userTransactionService(AtomikosProperties atomikosProperties,
    			JtaProperties jtaProperties) {
    		Properties properties = new Properties();
    		if (StringUtils.hasText(jtaProperties.getTransactionManagerId())) {
    			properties.setProperty("com.atomikos.icatch.tm_unique_name", jtaProperties.getTransactionManagerId());
    		}
    		properties.setProperty("com.atomikos.icatch.log_base_dir", getLogBaseDir(jtaProperties));
    		properties.putAll(atomikosProperties.asProperties());
    		return new UserTransactionServiceImp(properties);
    	}
    	@Bean(initMethod = "init", destroyMethod = "close")
    	@ConditionalOnMissingBean(TransactionManager.class)
    	UserTransactionManager atomikosTransactionManager(UserTransactionService userTransactionService) throws Exception {
    		UserTransactionManager manager = new UserTransactionManager();
    		manager.setStartupTransactionService(false);
    		manager.setForceShutdown(true);
    		return manager;
    	}
    	@Bean
    	@ConditionalOnMissingBean(XADataSourceWrapper.class)
    	AtomikosXADataSourceWrapper xaDataSourceWrapper() {
    		return new AtomikosXADataSourceWrapper();
    	}
    	@Bean
    	JtaTransactionManager transactionManager(UserTransaction userTransaction, TransactionManager transactionManager,
    			ObjectProvidertransactionManagerCustomizers) {
    		JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(userTransaction, transactionManager);
    		transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(jtaTransactionManager));
    		return jtaTransactionManager;
    	}
    }

    顯然,想要使用XA事務(wù),除了需要提供UserTransaction、TransactionManager的實(shí)現(xiàn)。還必須要有一個(gè)XADataSource,而sharding-jdbc代理的數(shù)據(jù)源是DataSource的,我們需要將XADataSource包裝成普通的DataSource,spring已經(jīng)提供了一個(gè)AtomikosXADataSourceWrapper的XA數(shù)據(jù)源包裝器,而且在AtomikosJtaConfiguration里已經(jīng)注冊(cè)到Spring上下文中,所以我們?cè)谧远x數(shù)據(jù)源時(shí)可以直接注入包裝器實(shí)例,然后,因?yàn)槭荍PA環(huán)境,所以在創(chuàng)建EntityManagerFactory實(shí)例時(shí),需要指定JPA的事務(wù)管理類型為JTA,綜上,普通的業(yè)務(wù)默認(rèn)數(shù)據(jù)源配置如下:

    /**
     * @author: kl @kailing.pub
     * @date: 2020/5/18
     */
    @Configuration
    @EnableConfigurationProperties({JpaProperties.class, DataSourceProperties.class})
    public class DataSourceConfiguration{
        @Primary
        @Bean
        public DataSource dataSource(AtomikosXADataSourceWrapper wrapper, DataSourceProperties dataSourceProperties) throws Exception {
            MysqlXADataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(MysqlXADataSource.class).build();
            return wrapper.wrapDataSource(dataSource);
        }
        @Primary
        @Bean(initMethod = "afterPropertiesSet")
        public LocalContainerEntityManagerFactoryBean entityManagerFactory(JpaProperties jpaProperties, DataSource dataSource, EntityManagerFactoryBuilder factoryBuilder) {
            return factoryBuilder.dataSource(dataSource)
                    .packages(Constants.BASE_PACKAGES)
                    .properties(jpaProperties.getProperties())
                    .persistenceUnit("default")
                    .jta(true)
                    .build();
        }
        @Bean
        @Primary
        public EntityManager entityManager(EntityManagerFactory entityManagerFactory){
            //必須使用SharedEntityManagerCreator創(chuàng)建SharedEntityManager實(shí)例,否則SimpleJpaRepository中的事務(wù)不生效
            return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
        }
    }

    sharding-jdbc加密數(shù)據(jù)源和普通業(yè)務(wù)數(shù)據(jù)源其實(shí)是同一個(gè)數(shù)據(jù)源,只是走加解密邏輯的數(shù)據(jù)源需要被sharding-jdbc的加密組件代理一層,加上了加解密的處理邏輯。所以配置如下:

    /**
     * @author: kl @kailing.pub
     * @date: 2020/5/18
     */
    @Configuration
    @EnableConfigurationProperties({JpaProperties.class,SpringBootEncryptRuleConfigurationProperties.class, SpringBootPropertiesConfigurationProperties.class})
    public class EncryptDataSourceConfiguration {
        @Bean
        public DataSource encryptDataSource(DataSource dataSource,SpringBootPropertiesConfigurationProperties props,SpringBootEncryptRuleConfigurationProperties encryptRule) throws SQLException {
            return EncryptDataSourceFactory.createDataSource(dataSource, new EncryptRuleConfigurationYamlSwapper().swap(encryptRule), props.getProps());
        }
        @Bean(initMethod = "afterPropertiesSet")
        public LocalContainerEntityManagerFactoryBean encryptEntityManagerFactory(@Qualifier("encryptDataSource") DataSource dataSource,JpaProperties jpaProperties, EntityManagerFactoryBuilder factoryBuilder) throws SQLException {
            return factoryBuilder.dataSource(dataSource)
                    .packages(Constants.BASE_PACKAGES)
                    .properties(jpaProperties.getProperties())
                    .persistenceUnit("encryptPersistenceUnit")
                    .jta(true)
                    .build();
        }
        @Bean
        public EntityManager encryptEntityManager(@Qualifier("encryptEntityManagerFactory") EntityManagerFactory entityManagerFactory){
            //必須使用SharedEntityManagerCreator創(chuàng)建SharedEntityManager實(shí)例,否則SimpleJpaRepository中的事務(wù)不生效
            return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
        }
    }

    遇到問(wèn)題1、

    Connection pool exhausted - try increasing 'maxPoolSize' and/or 'borrowConnectionTimeout' on the DataSourceBean.

    解決問(wèn)題:默認(rèn)AtomikosXADataSourceWrapper包裝器初始化的數(shù)據(jù)源連接池最大為1,所以需要添加配置參數(shù)如:

    spring.jta.atomikos.datasource.max-pool-size=20

    遇到問(wèn)題2、

    XAER_INVAL: Invalid arguments (or unsupported command)

    解決問(wèn)題:這個(gè)是mysql實(shí)現(xiàn)XA的bug,僅當(dāng)您在同一事務(wù)中多次訪問(wèn)同一MySQL數(shù)據(jù)庫(kù)時(shí),才會(huì)發(fā)生此問(wèn)題,在mysql連接url加上如下參數(shù)即可,如:

    spring.datasource.url = jdbc:mysql://127.0.0.1:3306/xxx?pinGlobalTxToPhysicalConnection=true

    Mysql XA事務(wù)行為

    在這個(gè)場(chǎng)景中,雖然是多數(shù)據(jù)源,但是底層鏈接的是同一個(gè)mysql數(shù)據(jù)庫(kù),所以XA事務(wù)行為為,從第一個(gè)執(zhí)行的sql開始(并不是JTA事務(wù)begin階段),生成xid并XA START事務(wù),然后XA END。第二個(gè)數(shù)據(jù)源的sql執(zhí)行時(shí)會(huì)判斷是否同一個(gè)mysql資源,如果是同一個(gè)則用剛生成的xid重新XA START RESUME,然后XA END,最終雖然在應(yīng)用層是兩個(gè)DataSource,其實(shí)最后只會(huì)調(diào)用XA COMMIT一次。mysql驅(qū)動(dòng)實(shí)現(xiàn)的XAResource的start如下:

    public void start(Xid xid, int flags) throws XAException {
            StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH);
            commandBuf.append("XA START ");
            appendXid(commandBuf, xid);
            switch (flags) {
                case TMJOIN:
                    commandBuf.append(" JOIN");
                    break;
                case TMRESUME:
                    commandBuf.append(" RESUME");
                    break;
                case TMNOFLAGS:
                    // no-op
                    break;
                default:
                    throw new XAException(XAException.XAER_INVAL);
            }
            dispatchCommand(commandBuf.toString());
            this.underlyingConnection.setInGlobalTx(true);
        }

    第一次sql執(zhí)行時(shí),flags=0,走的TMNOFLAGS邏輯,第二次sql執(zhí)行時(shí),flags=134217728,走的TMRESUME,重新開啟事務(wù)的邏輯。以上是Mysql XA的真實(shí)事務(wù)邏輯,但是博主研究下來(lái)發(fā)現(xiàn),msyql xa并不支持XA START RESUME這種語(yǔ)句,而且有很多限制《Mysql XA交易限制》,所以在mysql數(shù)據(jù)庫(kù)使用XA事務(wù)時(shí),最好了解下mysql xa的缺陷

    鏈?zhǔn)绞聞?wù)方案

    鏈?zhǔn)绞聞?wù)不是我首創(chuàng)的叫法,在spring-data-common項(xiàng)目的Transaction包下,已經(jīng)有一個(gè)默認(rèn)實(shí)現(xiàn)ChainedTransactionManager,前文中《深入理解spring的@Transactional工作原理》已經(jīng)分析了Spring的事務(wù)抽象,由PlatformTransactionManager(事務(wù)管理器)、TransactionStatus(事務(wù)狀態(tài))、TransactionDefinition(事務(wù)定義)等形態(tài)組成,ChainedTransactionManager也是實(shí)現(xiàn)了PlatformTransactionManager和TransactionStatus。實(shí)現(xiàn)原理也很簡(jiǎn)單,在ChainedTransactionManager內(nèi)部維護(hù)了事務(wù)管理器的集合,通過(guò)代理編排真實(shí)的事務(wù)管理器,在事務(wù)開啟、提交、回滾時(shí),都分別操作集合里的事務(wù)。以達(dá)到對(duì)多個(gè)事務(wù)的統(tǒng)一管理。這個(gè)方案比較簡(jiǎn)陋,而且有缺陷,在提交階段,如果異常不是發(fā)生在第一個(gè)數(shù)據(jù)源,那么會(huì)存在之前的提交不會(huì)回滾,所以在使用ChainedTransactionManager時(shí),盡量把出問(wèn)題可能性比較大的事務(wù)管理器放鏈的后面(開啟事務(wù)、提交事務(wù)順序相反)。這里只是拋出了一種新的多數(shù)據(jù)源事務(wù)管理的思路,能用XA盡量用XA管理。

    普通的業(yè)務(wù)默認(rèn)數(shù)據(jù)源配置如下:

    /**
     * @author: kl @kailing.pub
     * @date: 2020/5/18
     */
    @Configuration
    @EnableConfigurationProperties({JpaProperties.class, DataSourceProperties.class})
    public class DataSourceConfiguration{
        @Primary
        @Bean
        public DataSource dataSource(DataSourceProperties dataSourceProperties){
           return dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build();
        }
        @Primary
        @Bean(initMethod = "afterPropertiesSet")
        public LocalContainerEntityManagerFactoryBean entityManagerFactory(JpaProperties jpaProperties, DataSource dataSource, EntityManagerFactoryBuilder factoryBuilder) {
            return factoryBuilder.dataSource(dataSource)
                    .packages(Constants.BASE_PACKAGES)
                    .properties(jpaProperties.getProperties())
                    .persistenceUnit("default")
                    .build();
        }
        @Bean
        @Primary
        public EntityManager entityManager(EntityManagerFactory entityManagerFactory){
            //必須使用SharedEntityManagerCreator創(chuàng)建SharedEntityManager實(shí)例,否則SimpleJpaRepository中的事務(wù)不生效
            return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
        }
        @Primary
        @Bean
        public PlatformTransactionManager transactionManager(EntityManagerFactory entityManagerFactory){
            JpaTransactionManager txManager = new JpaTransactionManager();
            txManager.setEntityManagerFactory(entityManagerFactory);
            return txManager;
        }
    }

    sharding-jdbc加密數(shù)據(jù)源配置如下:

    /**
     * @author: kl @kailing.pub
     * @date: 2020/5/18
     */
    @Configuration
    @EnableConfigurationProperties({JpaProperties.class,SpringBootEncryptRuleConfigurationProperties.class, SpringBootPropertiesConfigurationProperties.class})
    public class EncryptDataSourceConfiguration {
        @Bean
        public DataSource encryptDataSource(DataSource dataSource,SpringBootPropertiesConfigurationProperties props,SpringBootEncryptRuleConfigurationProperties encryptRule) throws SQLException {
            return EncryptDataSourceFactory.createDataSource(dataSource, new EncryptRuleConfigurationYamlSwapper().swap(encryptRule), props.getProps());
        }
        @Bean(initMethod = "afterPropertiesSet")
        public LocalContainerEntityManagerFactoryBean encryptEntityManagerFactory(@Qualifier("encryptDataSource") DataSource dataSource,JpaProperties jpaProperties, EntityManagerFactoryBuilder factoryBuilder) throws SQLException {
            return factoryBuilder.dataSource(dataSource)
                    .packages(Constants.BASE_PACKAGES)
                    .properties(jpaProperties.getProperties())
                    .persistenceUnit("encryptPersistenceUnit")
                    .build();
        }
        @Bean
        public EntityManager encryptEntityManager(@Qualifier("encryptEntityManagerFactory") EntityManagerFactory entityManagerFactory){
            //必須使用SharedEntityManagerCreator創(chuàng)建SharedEntityManager實(shí)例,否則SimpleJpaRepository中的事務(wù)不生效
            return SharedEntityManagerCreator.createSharedEntityManager(entityManagerFactory);
        }
        @Bean
        public PlatformTransactionManager chainedTransactionManager(PlatformTransactionManager transactionManager) throws SQLException {
            JpaTransactionManager encryptTransactionManager = new JpaTransactionManager();
            encryptTransactionManager.setEntityManagerFactory(encryptEntityManagerFactory());
            //使用鏈?zhǔn)绞聞?wù)管理器包裝真正的transactionManager、txManager事務(wù)
            ChainedTransactionManager chainedTransactionManager = new ChainedTransactionManager(encryptTransactionManager,transactionManager);
            return chainedTransactionManager;
        }
    }

    使用這種方案,在涉及到多數(shù)據(jù)源的業(yè)務(wù)時(shí),需要指定使用哪個(gè)事務(wù)管理器,如:

    @PersistenceContext(unitName = "encryptPersistenceUnit")
        private EntityManager entityManager;
        @PersistenceContext
        private EntityManager manager;
        @Transactional(transactionManager = "chainedTransactionManager")
        public AccountModel  save(AccountDTO dto){
            AccountModel accountModel = AccountMapper.INSTANCE.dtoTo(dto);
            entityManager.persist(accountModel);
            entityManager.flush();
            AccountModel accountMode2 = AccountMapper.INSTANCE.dtoTo(dto);
            manager.persist(accountMode2);
            manager.flush();
            return accountModel;
        }

    感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“JPA多數(shù)據(jù)源分布式事務(wù)的示例分析”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來(lái)學(xué)習(xí)!

    向AI問(wèn)一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

    jpa
    AI