溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

fescar分布式事務(wù)實現(xiàn)原理實例分析

發(fā)布時間:2022-03-01 09:10:56 來源:億速云 閱讀:165 作者:iii 欄目:開發(fā)技術(shù)

這篇文章主要介紹了fescar分布式事務(wù)實現(xiàn)原理實例分析的相關(guān)知識,內(nèi)容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇fescar分布式事務(wù)實現(xiàn)原理實例分析文章都會有所收獲,下面我們一起來看看吧。

    項目說明

    本博文所述代碼為fescar的0.1.2-SNAPSHOT版本,根據(jù)fescar后期的迭代計劃,其項目結(jié)構(gòu)和模塊實現(xiàn)都可能有很大的改變,特此說明。

    fescar的TXC模型

    fescar分布式事務(wù)實現(xiàn)原理實例分析

    上圖為fescar官方針對TXC模型制作的示意圖。不得不說大廠的圖制作的真的不錯,結(jié)合示意圖我們可以看到TXC實現(xiàn)的全貌。TXC的實現(xiàn)通過三個組件來完成。也就是上圖的三個深黃色部分,其作用如下,:

    • TM:全局事務(wù)管理器,在標注開啟fescar分布式事務(wù)的服務(wù)端開啟,并將全局事務(wù)發(fā)送到TC事務(wù)控制端管理

    • TC:事務(wù)控制中心,控制全局事務(wù)的提交或者回滾。這個組件需要獨立部署維護,目前只支持單機版本,后續(xù)迭代計劃會有集群版本

    • RM:資源管理器,主要負責(zé)分支事務(wù)的上報,本地事務(wù)的管理

    一段話簡述其實現(xiàn)過程:服務(wù)起始方發(fā)起全局事務(wù)并注冊到TC。在調(diào)用協(xié)同服務(wù)時,協(xié)同服務(wù)的事務(wù)分支事務(wù)會先完成階段一的事務(wù)提交或回滾,并生成事務(wù)回滾的undo_log日志,同時注冊當前協(xié)同服務(wù)到TC并上報其事務(wù)狀態(tài),歸并到同一個業(yè)務(wù)的全局事務(wù)中。此時若沒有問題繼續(xù)下一個協(xié)同服務(wù)的調(diào)用,期間任何協(xié)同服務(wù)的分支事務(wù)回滾,都會通知到TC,TC在通知全局事務(wù)包含的所有已完成一階段提交的分支事務(wù)回滾。如果所有分支事務(wù)都正常,最后回到全局事務(wù)發(fā)起方時,也會通知到TC,TC在通知全局事務(wù)包含的所有分支刪除回滾日志。在這個過程中為了解決寫隔離和度隔離的問題會涉及到TC管理的全局鎖。

    本博文的目標是深入代碼細節(jié),探究其基本思路是如何實現(xiàn)的。首先會從項目的結(jié)構(gòu)來簡述每個模塊的作用,繼而結(jié)合官方自帶的examples實例來探究整個分布式事務(wù)的實現(xiàn)過程。

    項目結(jié)構(gòu)解析

    項目拉下來,用IDE打開后的目錄結(jié)構(gòu)如下,下面先大致的看下每個模塊的實現(xiàn)

    fescar分布式事務(wù)實現(xiàn)原理實例分析

    common :公共組件,提供常用輔助類,靜態(tài)變量、擴展機制類加載器、以及定義全局的異常等

    config : 配置加載解析模塊,提供了配置的基礎(chǔ)接口,目前只有文件配置實現(xiàn),后續(xù)會有nacos等配置中心的實現(xiàn)

    core : 核心模塊主要封裝了TM、RM和TC通訊用RPC相關(guān)內(nèi)容

    distrbution :這個模塊目前是空的,distrbution是高性能的隊列,后期應(yīng)該應(yīng)用于事務(wù)日志的落地

    dubbo :dubbo模塊主要適配dubbo通訊框架,使用dubbo的filter機制來傳統(tǒng)全局事務(wù)的信息到分支

    examples :簡單的演示實例模塊,等下從這個模塊入手探索

    rm-datasource :資源管理模塊,比較核心的一個模塊,個人認為這個模塊命名為core要更合理一點。代理了JDBC的一些類,用來解析sql生成回滾日志、協(xié)調(diào)管理本地事務(wù)

    server : TC組件所在,主要協(xié)調(diào)管理全局事務(wù),負責(zé)全局事務(wù)的提交或者回滾,同時管理維護全局鎖。

    spring :和spring集成的模塊,主要是aop邏輯,是整個分布式事務(wù)的入口,研究fescar的突破口

    tm : 全局事務(wù)事務(wù)管理模塊,管理全局事務(wù)的邊界,全局事務(wù)開啟回滾點都在這個模塊控制

    通過【examples】模塊的實例看下效果

    第一步、

    先啟動TC也就是【Server】模塊,main方法直接啟動就好,默認服務(wù)端口8091

    第二步、

    回到examples模塊,將訂單,業(yè)務(wù),賬戶、倉庫四個服務(wù)的配置文件配置好,主要是mysql數(shù)據(jù)源和zookeeper連接地址,這里要注意下,默認dubbo的zk注冊中心依賴沒有,啟動的時候回拋找不到class的異常,需要添加如下的依賴:

    <dependency>
        <groupId>com.101tec</groupId>
        <artifactId>zkclient</artifactId>
        <version>0.10</version>
        <exclusions>
            <exclusion>
                <artifactId>slf4j-log4j12</artifactId>
                <groupId>org.slf4j</groupId>
            </exclusion>
        </exclusions>
    </dependency>

    第三步、

    在BusinessServiceImpl中的模擬拋異常的地方打個斷點,依次啟動OrderServiceImpl、StorageServiceImpl、AccountServiceImpl、BusinessServiceImpl四個服務(wù)、等進斷點后,查看數(shù)據(jù)庫account_tbl表,金額已減去400元,變成了599元。然后放開斷點、BusinessServiceImpl模塊模擬的異常觸發(fā),全局事務(wù)回滾,account_tbl表的金額就又回滾到999元了

    如上,我們已經(jīng)體驗到fescar事務(wù)的控制能力了,下面我們具體看下它是怎么控制的。

    fescar事務(wù)過程分析

    首先分析配置文件

    這個是一個鐵律,任何一個技術(shù)或框架要集成,配置文件肯定是一個突破口。從上面的例子我們了解到,實例模塊的配置文件中配置了一個全局事務(wù)掃描器實例,如:

    <bean class="com.alibaba.fescar.spring.annotation.GlobalTransactionScanner">
        <constructor-arg value="dubbo-demo-app"/>
        <constructor-arg value="my_test_tx_group"/>
    </bean>

    這個實例在項目啟動時會掃描所有實例,具體實現(xiàn)見【spring】模塊。并將標注了@GlobalTransactional注解的方法織入GlobalTransactionalInterceptor的invoke方法邏輯。同時應(yīng)用啟動時,會初始化TM(TmRpcClient)和RM(RmRpcClient)的實例,這個時候,服務(wù)已經(jīng)和TC事務(wù)控制中心勾搭上了。在往下看就涉及到TM模塊的事務(wù)模板類TransactionalTemplate。

    【TM】模塊啟動全局事務(wù)

    全局事務(wù)的開啟,提交、回滾都被封裝在TransactionlTemplate中完成了,代碼如:

    public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
        // 1. get or create a transaction
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
        // 2. begin transaction
        try {
            tx.begin(business.timeout(), business.name());
        } catch (TransactionException txe) {
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);
        }
        Object rs = null;
        try {
            // Do Your Business
            rs = business.execute();
        } catch (Throwable ex) {
            // 3. any business exception, rollback.
            try {
                tx.rollback();
                // 3.1 Successfully rolled back
                throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
            } catch (TransactionException txe) {
                // 3.2 Failed to rollback
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                    TransactionalExecutor.Code.RollbackFailure, ex);
            }
        }
        // 4. everything is fine, commit.
        try {
            tx.commit();
        } catch (TransactionException txe) {
            // 4.1 Failed to commit
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.CommitFailure);
        }
        return rs;
    }

    更詳細的實現(xiàn)在【TM】模塊中被分成了兩個Class實現(xiàn),如下:

    DefaultGlobalTransaction :全局事務(wù)具體的開啟,提交、回滾動作

    DefaultTransactionManager :負責(zé)使用TmRpcClient向TC控制中心發(fā)送指令,如開啟全局事務(wù)(GlobalBeginRequest)、提交(GlobalCommitRequest)、回滾(GlobalRollbackRequest)、查詢狀態(tài)(GlobalStatusRequest)等。

    以上是TM模塊核心內(nèi)容點,TM模塊完成全局事務(wù)開啟后,接下來就開始看看全局事務(wù)iD,xid是如何傳遞、RM組件是如何介入的

    【DUBBO】全局事務(wù)XID的傳遞

    首先是xid的傳遞,目前已經(jīng)實現(xiàn)了dubbo框架實現(xiàn)的微服務(wù)架構(gòu)下的傳遞,其他的像spring cloud和motan等的想要實現(xiàn)也很容易,通過一般RPC通訊框架都有的filter機制,將xid從全局事務(wù)的發(fā)起節(jié)點傳遞到服務(wù)協(xié)從節(jié)點,從節(jié)點接收到后綁定到當前線程上線文環(huán)境中,用于在分支事務(wù)執(zhí)行sql時判斷是否加入全局事務(wù)。fescar的實現(xiàn)見【dubbo】模塊如下:

    @Activate(group = { Constants.PROVIDER, Constants.CONSUMER }, order = 100)
    public class TransactionPropagationFilter implements Filter {
        private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationFilter.class);
        @Override
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            String xid = RootContext.getXID();
            String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("xid in RootContext[" + xid + "] xid in RpcContext[" + rpcXid + "]");
            }
            boolean bind = false;
            if (xid != null) {
                RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid);
            } else {
                if (rpcXid != null) {
                    RootContext.bind(rpcXid);
                    bind = true;
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("bind[" + rpcXid + "] to RootContext");
                    }
                }
            }
            try {
                return invoker.invoke(invocation);
            } finally {
                if (bind) {
                    String unbindXid = RootContext.unbind();
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("unbind[" + unbindXid + "] from RootContext");
                    }
                    if (!rpcXid.equalsIgnoreCase(unbindXid)) {
                        LOGGER.warn("xid in change during RPC from " + rpcXid + " to " + unbindXid);
                        if (unbindXid != null) {
                            RootContext.bind(unbindXid);
                            LOGGER.warn("bind [" + unbindXid + "] back to RootContext");
                        }
                    }
                }
            }
        }
    }

    上面代碼rpcXid不為空時,就加入到了RootContext的ContextCore中,這里稍微深入講下。ContextCore是一個可擴展實現(xiàn)的接口,目前默認的實現(xiàn)是ThreadLocalContextCore,基于ThreadLocal來保存維護當前的xid。這里fescar提供了可擴展的機制,實現(xiàn)在【common】模塊中,通過一個自定義的類加載器EnhancedServiceLoader加載需要擴展的服務(wù)類,這樣只需要在擴展類加上@LoadLevel注解。標記order屬性聲明高優(yōu)先級別,就可以達到擴展實現(xiàn)的目的。

    【RM】模塊本地資源管理的介入

    fescar針對本地事務(wù)相關(guān)的接口,通過代理機制都實現(xiàn)了一遍代理類,如數(shù)據(jù)源(DataSourceProxy)、ConnectionProxy、StatementProxy等。這個在配置文件中也可以看出來,也就是說,我們要使用fescar分布式事務(wù),一定要配置fescar提供的代理數(shù)據(jù)源。如:

    fescar分布式事務(wù)實現(xiàn)原理實例分析

    配置好代理數(shù)據(jù)源后,從DataSourceProxy出發(fā),本地針對數(shù)據(jù)庫的所有操作過程我們就可以隨意控制了。從上面xid傳遞,已經(jīng)知道了xid被保存在RootContext中了,那么請看下面的代碼,就非常清楚了:

    首先看StatementProxy的一段代碼

    fescar分布式事務(wù)實現(xiàn)原理實例分析

    在看ExecuteTemplate中的代碼

    fescar分布式事務(wù)實現(xiàn)原理實例分析

    和【TM】模塊中的事務(wù)管理模板類TransactionlTemplate類似,這里非常關(guān)鍵的邏輯代理也被封裝在了ExecuteTemplate模板類中。因重寫了Statement有了StatementProxy實現(xiàn),在執(zhí)行原JDBC的executeUpdate方法時,會調(diào)用到ExecuteTemplate的execute邏輯。在sql真正執(zhí)行前,會判斷RootCOntext當前上下文中是否包含xid,也就是判斷當前是否是全局分布式事務(wù)。如果不是,就直接使用本地事務(wù),如果是,這里RM就會增加一些分布式事務(wù)相關(guān)的邏輯了。這里根據(jù)sql的不同的類型,fescar封裝了五個不同的執(zhí)行器來處理,分別是UpdateExecutor、DeleteExecutor、InsertExecutor、SelectForUpdateExecutor、PlainExecutor,結(jié)構(gòu)如下圖:

    fescar分布式事務(wù)實現(xiàn)原理實例分析

    PLAINEXECUTOR:

    原生的JDBC接口實現(xiàn),未做任何處理,提供給全局事務(wù)中的普通的select查詢使用

    UPDATEEXECUTOR、DELETEEXECUTOR、INSERTEXECUTOR:

    三個DML增刪改執(zhí)行器實現(xiàn),主要在sql執(zhí)行的前后對sql語句進行了解析,實現(xiàn)了如下兩個抽象接口方法:

    protected abstract TableRecords beforeImage() throws SQLException;
    protected abstract TableRecords afterImage(TableRecords beforeImage) throws SQLException;

    在這個過程中通過解析sql生成了提供回滾操作的undo_log日志,日志目前是保存在msyql中的,和業(yè)務(wù)sql操作共用同一個事務(wù)。表的結(jié)構(gòu)如下:

    fescar分布式事務(wù)實現(xiàn)原理實例分析

    rollback_info保存的undo_log詳細信息,是longblob類型的,結(jié)構(gòu)如下:

    {
        "branchId":3958194,
        "sqlUndoLogs":[
            {
                "afterImage":{
                    "rows":[
                        {
                            "fields":[
                                {
                                    "keyType":"PrimaryKey",
                                    "name":"ID",
                                    "type":4,
                                    "value":10                             },
                                {
                                    "keyType":"NULL",
                                    "name":"COUNT",
                                    "type":4,
                                    "value":98                             }
                            ]
                        }
                    ],
                    "tableName":"storage_tbl"             },
                "beforeImage":{
                    "rows":[
                        {
                            "fields":[
                                {
                                    "keyType":"PrimaryKey",
                                    "name":"ID",
                                    "type":4,
                                    "value":10                             },
                                {
                                    "keyType":"NULL",
                                    "name":"COUNT",
                                    "type":4,
                                    "value":100                             }
                            ]
                        }
                    ],
                    "tableName":"storage_tbl"             },
                "sqlType":"UPDATE",
                "tableName":"storage_tbl"         }
        ],
        "xid":"192.168.7.77:8091:3958193" }

    這里貼的是一個update的操作,undo_log記錄的非常的詳細,通過全局事務(wù)xid關(guān)聯(lián)branchid,記錄數(shù)據(jù)操作的表名,操作字段名,以及sql執(zhí)行前后的記錄數(shù),如這個記錄,表名=storage_tbl,sql執(zhí)行前ID=10,count=100,sql執(zhí)行后id=10,count=98。如果整個全局事務(wù)失敗,需要回滾的時候就可以生成:

    update storage_tbl set count = 100 where id = 10;

    這樣的回滾sql語句執(zhí)行了。

    SELECTFORUPDATEEXECUTOR:

    fescar的AT模式在本地事務(wù)之上默認支持讀未提交的隔離級別,但是通過SelectForUpdateExecutor執(zhí)行器,可以支持讀已提交的隔離級別。代碼如:

    @Override
    public Object doExecute(Object... args) throws Throwable {
        SQLSelectRecognizer recognizer = (SQLSelectRecognizer) sqlRecognizer;
        Connection conn = statementProxy.getConnection();
        ResultSet rs = null;
        Savepoint sp = null;
        LockRetryController lockRetryController = new LockRetryController();
        boolean originalAutoCommit = conn.getAutoCommit();
        StringBuffer selectSQLAppender = new StringBuffer("SELECT ");
        selectSQLAppender.append(getTableMeta().getPkName());
        selectSQLAppender.append(" FROM " + getTableMeta().getTableName());
        String whereCondition = null;
        ArrayList<Object> paramAppender = new ArrayList<>();
        if (statementProxy instanceof ParametersHolder) {
            whereCondition = recognizer.getWhereCondition((ParametersHolder) statementProxy, paramAppender);
        } else {
            whereCondition = recognizer.getWhereCondition();
        }
        if (!StringUtils.isEmpty(whereCondition)) {
            selectSQLAppender.append(" WHERE " + whereCondition);
        }
        selectSQLAppender.append(" FOR UPDATE");
        String selectPKSQL = selectSQLAppender.toString();
        try {
            if (originalAutoCommit) {
                conn.setAutoCommit(false);
            }
            sp = conn.setSavepoint();
            rs = statementCallback.execute(statementProxy.getTargetStatement(), args);
            while (true) {
                // Try to get global lock of those rows selected
                Statement stPK = null;
                PreparedStatement pstPK = null;
                ResultSet rsPK = null;
                try {
                    if (paramAppender.isEmpty()) {
                        stPK = statementProxy.getConnection().createStatement();
                        rsPK = stPK.executeQuery(selectPKSQL);
                    } else {
                        pstPK = statementProxy.getConnection().prepareStatement(selectPKSQL);
                        for (int i = 0; i < paramAppender.size(); i++) {
                            pstPK.setObject(i + 1, paramAppender.get(i));
                        }
                        rsPK = pstPK.executeQuery();
                    }
                    TableRecords selectPKRows = TableRecords.buildRecords(getTableMeta(), rsPK);
                    statementProxy.getConnectionProxy().checkLock(selectPKRows);
                    break;
                } catch (LockConflictException lce) {
                    conn.rollback(sp);
                    lockRetryController.sleep(lce);
    
                } finally {
                    if (rsPK != null) {
                        rsPK.close();
                    }
                    if (stPK != null) {
                        stPK.close();
                    }
                    if (pstPK != null) {
                        pstPK.close();
                    }
                }
            }
        } finally {
            if (sp != null) {
                conn.releaseSavepoint(sp);
            }
            if (originalAutoCommit) {
                conn.setAutoCommit(true);
            }
        }
        return rs;
    }

    關(guān)鍵代碼見:

    TableRecords selectPKRows = TableRecords.buildRecords(getTableMeta(), rsPK);
    statementProxy.getConnectionProxy().checkLock(selectPKRows);

    通過selectPKRows表操作記錄拿到lockKeys,然后到TC控制器端查詢是否被全局鎖定了,如果被鎖定了,就重新嘗試,直到鎖釋放返回查詢結(jié)果。

    分支事務(wù)的注冊和上報

    在本地事務(wù)提交前,fescar會注冊和上報分支事務(wù)相關(guān)的信息,見ConnectionProxy類的commit部分代碼:

    @Override
    public void commit() throws SQLException {
        if (context.inGlobalTransaction()) {
            try {
                register();
            } catch (TransactionException e) {
                recognizeLockKeyConflictException(e);
            }
            try {
                if (context.hasUndoLog()) { 
                    UndoLogManager.flushUndoLogs(this);
                }
                targetConnection.commit();
            } catch (Throwable ex) {
                report(false);
                if (ex instanceof SQLException) {
                    throw (SQLException) ex;
                } else {
                    throw new SQLException(ex);
                }
            }
            report(true);
            context.reset();
        } else {
            targetConnection.commit();
        }
    }

    從這段代碼我們可以看到,首先是判斷是了是否是全局事務(wù),如果不是,就直接提交了,如果是,就先向TC控制器注冊分支事務(wù),為了寫隔離,在TC端會涉及到全局鎖的獲取。然后保存了用于回滾操作的undo_log日志,繼而真正提交本地事務(wù),最后向TC控制器上報事務(wù)狀態(tài)。此時,階段一的本地事務(wù)已完成了。

    【SERVER】模塊協(xié)調(diào)全局

    關(guān)于server模塊,我們可以聚焦在DefaultCoordinator這個類,這個是AbstractTCInboundHandler控制處理器默認實現(xiàn)。主要實現(xiàn)了全局事務(wù)開啟,提交,回滾,狀態(tài)查詢,分支事務(wù)注冊,上報,鎖檢查等接口,如:

    fescar分布式事務(wù)實現(xiàn)原理實例分析

    回到一開始的TransactionlTemplate,如果整個分布式事務(wù)失敗需要回滾了,首先是TM向TC發(fā)起回滾的指令,然后TC接收到后,解析請求后會被路由到默認控制器類的doGlobalRollback方法內(nèi),最終在TC控制器端執(zhí)行的代碼如下:

    @Override
    public void doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {
        for (BranchSession branchSession : globalSession.getReverseSortedBranches()) {
            BranchStatus currentBranchStatus = branchSession.getStatus();
            if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {
                continue;
            }
            try {
                BranchStatus branchStatus = resourceManagerInbound.branchRollback(XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),
                        branchSession.getResourceId(), branchSession.getApplicationData());
    
                switch (branchStatus) {
                    case PhaseTwo_Rollbacked:
                        globalSession.removeBranch(branchSession);
                        LOGGER.error("Successfully rolled back branch " + branchSession);
                        continue;
                    case PhaseTwo_RollbackFailed_Unretryable:
                        GlobalStatus currentStatus = globalSession.getStatus();
                        if (currentStatus.name().startsWith("Timeout")) {
                            globalSession.changeStatus(GlobalStatus.TimeoutRollbackFailed);
                        } else {
                            globalSession.changeStatus(GlobalStatus.RollbackFailed);
                        }
                        globalSession.end();
                        LOGGER.error("Failed to rollback global[" + globalSession.getTransactionId() + "] since branch[" + branchSession.getBranchId() + "] rollback failed");
                        return;
                    default:
                        LOGGER.info("Failed to rollback branch " + branchSession);
                        if (!retrying) {
                            queueToRetryRollback(globalSession);
                        }
                        return;
    
                }
            } catch (Exception ex) {
                LOGGER.info("Exception rollbacking branch " + branchSession, ex);
                if (!retrying) {
                    queueToRetryRollback(globalSession);
                    if (ex instanceof TransactionException) {
                        throw (TransactionException) ex;
                    } else {
                        throw new TransactionException(ex);
                    }
                }
    
            }
        }
        GlobalStatus currentStatus = globalSession.getStatus();
        if (currentStatus.name().startsWith("Timeout")) {
            globalSession.changeStatus(GlobalStatus.TimeoutRollbacked);
        } else {
            globalSession.changeStatus(GlobalStatus.Rollbacked);
        }
        globalSession.end();
    }

    如上代碼可以看到,回滾時從全局事務(wù)會話中迭代每個分支事務(wù),然后通知每個分支事務(wù)回滾。分支服務(wù)接收到請求后,首先會被路由到RMHandlerAT中的doBranchRollback方法,繼而調(diào)用了RM中的branchRollback方法,代碼如下:

    @Override
    public BranchStatus branchRollback(String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
        DataSourceProxy dataSourceProxy = get(resourceId);
        if (dataSourceProxy == null) {
            throw new ShouldNeverHappenException();
        }
        try {
            UndoLogManager.undo(dataSourceProxy, xid, branchId);
        } catch (TransactionException te) {
            if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
            } else {
                return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
            }
        }
        return BranchStatus.PhaseTwo_Rollbacked;
    }

    RM分支事務(wù)端最后執(zhí)行的是UndoLogManager的undo方法,通過xid和branchid從數(shù)據(jù)庫查詢出回滾日志,完成數(shù)據(jù)回滾操作,整個過程都是同步完成的。如果全局事務(wù)是成功的,TC也會有類似的上述協(xié)調(diào)過程,只不過是異步的將本次全局事務(wù)相關(guān)的undo_log清除了而已。至此,就完成了2階段的提交或回滾,也就完成了完整的全局事務(wù)事務(wù)的控制。

    關(guān)于“fescar分布式事務(wù)實現(xiàn)原理實例分析”這篇文章的內(nèi)容就介紹到這里,感謝各位的閱讀!相信大家對“fescar分布式事務(wù)實現(xiàn)原理實例分析”知識都有一定的了解,大家如果還想學(xué)習(xí)更多知識,歡迎關(guān)注億速云行業(yè)資訊頻道。

    向AI問一下細節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI