溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

MySQL 中基于 XA 實(shí)現(xiàn)的分布式事務(wù)

發(fā)布時(shí)間:2020-05-08 16:46:26 來(lái)源:網(wǎng)絡(luò) 閱讀:13162 作者:zhailuxu 欄目:MySQL數(shù)據(jù)庫(kù)

五、MySQL 中基于 XA 實(shí)現(xiàn)的分布式事務(wù)

5.1 XA協(xié)議

首先我們來(lái)簡(jiǎn)要看下分布式事務(wù)處理的XA規(guī)范
MySQL 中基于 XA 實(shí)現(xiàn)的分布式事務(wù)
可知XA規(guī)范中分布式事務(wù)有AP,RM,TM組成:

  • 其中應(yīng)用程序(Application Program ,簡(jiǎn)稱AP):AP定義事務(wù)邊界(定義事務(wù)開始和結(jié)束)并訪問(wèn)事務(wù)邊界內(nèi)的資源。

  • 資源管理器(Resource Manager,簡(jiǎn)稱RM):Rm管理計(jì)算機(jī)共享的資源,許多軟件都可以去訪問(wèn)這些資源,資源包含比如數(shù)據(jù)庫(kù)、文件系統(tǒng)、打印機(jī)服務(wù)器等。

  • 事務(wù)管理器(Transaction Manager ,簡(jiǎn)稱TM):負(fù)責(zé)管理全局事務(wù),分配事務(wù)唯一標(biāo)識(shí),監(jiān)控事務(wù)的執(zhí)行進(jìn)度,并負(fù)責(zé)事務(wù)的提交、回滾、失敗恢復(fù)等。

Xa主要規(guī)定了RM與TM之間的交互,下面來(lái)看下XA規(guī)范中定義的RM 和 TM交互的接口:
MySQL 中基于 XA 實(shí)現(xiàn)的分布式事務(wù)
本圖來(lái)著 參考文章XA規(guī)范25頁(yè)

  • xa_start負(fù)責(zé)開啟或者恢復(fù)一個(gè)事務(wù)分支,并且管理XID到調(diào)用線程

  • xa_end 負(fù)責(zé)取消當(dāng)前線程與事務(wù)分支的關(guān)聯(lián)

  • xa_prepare負(fù)責(zé)詢問(wèn)RM 是否準(zhǔn)備好了提交事務(wù)分支

  • xa_commit通知RM提交事務(wù)分支

  • xa_rollback 通知RM回滾事務(wù)分支

XA協(xié)議是使用了二階段協(xié)議的,其中:

  • 第一階段TM要求所有的RM準(zhǔn)備提交對(duì)應(yīng)的事務(wù)分支,詢問(wèn)RM是否有能力保證成功的提交事務(wù)分支,RM根據(jù)自己的情況,如果判斷自己進(jìn)行的工作可以被提交,那就就對(duì)工作內(nèi)容進(jìn)行持久化,并給TM回執(zhí)OK;否者給TM的回執(zhí)NO。RM在發(fā)送了否定答復(fù)并回滾了已經(jīng)的工作后,就可以丟棄這個(gè)事務(wù)分支信息了。

  • 第二階段TM根據(jù)階段1各個(gè)RM prepare的結(jié)果,決定是提交還是回滾事務(wù)。如果所有的RM都prepare成功,那么TM通知所有的RM進(jìn)行提交;如果有RM prepare回執(zhí)NO的話,則TM通知所有RM回滾自己的事務(wù)分支。

也就是TM與RM之間是通過(guò)兩階段提交協(xié)議進(jìn)行交互的。

5.2 MySQL中XA實(shí)現(xiàn)

MYSQL的數(shù)據(jù)庫(kù)存儲(chǔ)引擎InnoDB的事務(wù)特性能夠保證在存儲(chǔ)引擎級(jí)別實(shí)現(xiàn)ACID,而分布式事務(wù)讓存儲(chǔ)引擎級(jí)別的事務(wù)擴(kuò)展到數(shù)據(jù)庫(kù)層面,甚至擴(kuò)展到多個(gè)數(shù)據(jù)庫(kù)之間,這是通過(guò)兩階段提交協(xié)議來(lái)實(shí)現(xiàn)的,MySQL 5.0或者更新版本開始支持XA事務(wù),從下圖可知MySQL中只有InnoDB引擎支持XA協(xié)議:
MySQL 中基于 XA 實(shí)現(xiàn)的分布式事務(wù)

Mysql中存在兩種XA事務(wù),一種是內(nèi)部XA事務(wù)主要用來(lái)協(xié)調(diào)存儲(chǔ)引擎和二進(jìn)制日志,一種是外部事務(wù)可以參與到外部分布式事務(wù)中(比如多個(gè)數(shù)據(jù)庫(kù)實(shí)現(xiàn)的分布式事務(wù)),本節(jié)我們主要討論外部事務(wù)。

在MySQL數(shù)據(jù)庫(kù)分布式事務(wù)中,MySQL是XA事務(wù)過(guò)程中的資源管理器(RM)存在的,TM是連接MySQL服務(wù)器的客戶端。MySQL數(shù)據(jù)庫(kù)是作為RM存在的,在分布式事務(wù)中一般會(huì)涉及到至少兩個(gè)RM,所以我們說(shuō)的MySQL支持XA協(xié)議是說(shuō)mysql作為RM來(lái)說(shuō)的,也就是說(shuō)MySQL實(shí)現(xiàn)了XA協(xié)議中RM應(yīng)該具有的功能;需要注意的是MySQL中只有當(dāng)隔離級(jí)別為Serializable時(shí)候才能使用分布式事務(wù),所以需要使用set global tx_isolation='serializable',session tx_isolation='serializable';設(shè)置數(shù)據(jù)庫(kù)隔離級(jí)別(具體可以參考本地事務(wù))。

下面我們來(lái)看看在MySQL數(shù)據(jù)庫(kù)單個(gè)節(jié)點(diǎn)運(yùn)行XA事務(wù),首先來(lái)看下MySQL下xa事務(wù)語(yǔ)法:
MySQL 中基于 XA 實(shí)現(xiàn)的分布式事務(wù)
其中xid是一個(gè)全局唯一的id標(biāo)示一個(gè)分支事務(wù),每個(gè)分支事務(wù)有自己的全局唯一的一個(gè)id,是一個(gè)字符串。
然后確認(rèn)下mysql是否啟動(dòng)了xa功能:
MySQL 中基于 XA 實(shí)現(xiàn)的分布式事務(wù)
可知啟動(dòng)了,下面具體看一個(gè)實(shí)例:

MySQL 中基于 XA 實(shí)現(xiàn)的分布式事務(wù)

  • 其中首先使用XA START ‘xid' 啟動(dòng)了一個(gè)XA事務(wù),并把它置于ACTIVE狀態(tài)

  • 對(duì)于一個(gè)ACTIVE狀態(tài)的 XA事務(wù),我們可以執(zhí)行構(gòu)成事務(wù)的多條SQL語(yǔ)句,也就是指定分支事務(wù)的邊界,然后執(zhí)行一個(gè)XA END ‘xid'語(yǔ)句,XA END把事務(wù)放入IDLE狀態(tài),也就是結(jié)束事務(wù)邊界,在xa start和xa end之間的語(yǔ)句就構(gòu)成了本分支事務(wù)的一個(gè)事務(wù)范圍。當(dāng)調(diào)用xa end 'xid1'后由于結(jié)束了事務(wù)邊界,所以這時(shí)候如何在執(zhí)行sql語(yǔ)句會(huì)拋出ERROR 1399 (XAE07): XAER_RMFAIL: The command cannot be executed when global transaction is in the IDLE state錯(cuò)誤,也就是當(dāng)分支事務(wù)處于IDLE狀態(tài)時(shí)候不允許執(zhí)行沒(méi)有包含到分支事務(wù)邊界里面的其他sql.

  • 對(duì)于一個(gè)IDLE 狀態(tài)XA事務(wù),可以執(zhí)行一個(gè)XA PREPARE語(yǔ)句或一個(gè)XA COMMIT…ONE PHASE語(yǔ)句,其中XA PREPARE把事務(wù)放入PREPARED狀態(tài)。在此點(diǎn)上的XA RECOVER語(yǔ)句將在其輸出中包括事務(wù)的xid值,因?yàn)閄A RECOVER會(huì)列出處于PREPARED狀態(tài)的所有XA事務(wù)。XA COMMIT…ONE PHASE用于預(yù)備和提交事務(wù),也就是轉(zhuǎn)換為一階段協(xié)議,直接提交事務(wù)。

  • 對(duì)于一個(gè)PREPARED狀態(tài)的 XA事務(wù),可以執(zhí)行XA COMMIT 語(yǔ)句來(lái)提交或者執(zhí)行XA ROLLBACK來(lái)回滾xa事務(wù)。

其中二階段協(xié)議中第一階段是執(zhí)行 xa prepare時(shí)候,這時(shí)候MySQL客戶端(TM)向MySQL數(shù)據(jù)庫(kù)服務(wù)器(RM)發(fā)出prepare"準(zhǔn)備提交"請(qǐng)求,數(shù)據(jù)庫(kù)收到請(qǐng)求后執(zhí)行數(shù)據(jù)修改和日志記錄等處理,處理完成后只是把事務(wù)的狀態(tài)改成"可以提交",然后把結(jié)果返回給事務(wù)管理器。

如果第一階段中數(shù)據(jù)庫(kù)都prepare成功,那么mysql客戶端(TM)向數(shù)據(jù)庫(kù)服務(wù)器發(fā)出"commit"請(qǐng)求,數(shù)據(jù)庫(kù)服務(wù)器把事務(wù)的"可以提交"狀態(tài)改為"提交完成"狀態(tài),然后返回應(yīng)答。如果在第一階段內(nèi)數(shù)據(jù)庫(kù)的操作發(fā)生了錯(cuò)誤,或者mysql客戶端(RM)收不到數(shù)據(jù)庫(kù)的回應(yīng),則認(rèn)為事務(wù)失敗,執(zhí)行rollback回撤所有數(shù)據(jù)庫(kù)的事務(wù)。

上面例子是在一個(gè)數(shù)據(jù)庫(kù)節(jié)點(diǎn)上運(yùn)行的一個(gè)分支事務(wù),演示了單個(gè)數(shù)據(jù)庫(kù)上執(zhí)行xa分支事務(wù)的流程,但是通常都是使用編程語(yǔ)言,比如Java的 JTA來(lái)完成MySQL的分布式事務(wù)的,下面一個(gè)例子用來(lái)演示:
首先添加依賴

    <dependency>
            <groupId>javax.transaction</groupId>
            <artifactId>jta</artifactId>
            <version>1.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>6.0.6</version>
        </dependency>

代碼:

public class XaDemo {

    public static MysqlXADataSource getDataSource(String connStr, String user, String pwd) {

        try {

            MysqlXADataSource ds = new MysqlXADataSource();
            ds.setUrl(connStr);
            ds.setUser(user);
            ds.setPassword(pwd);

            return ds;
        } catch (Exception e) {
            e.printStackTrace();
        }

        return null;
    }

    public static void main(String[] arg) {
        String connStr1 = "jdbc:mysql://192.168.0.1:3306/test";
        String connStr2 = "jdbc:mysql://192.168.0.2:3306/test";

        try {
            //從不同數(shù)據(jù)庫(kù)獲取數(shù)據(jù)庫(kù)數(shù)據(jù)源
            MysqlXADataSource ds1 = getDataSource(connStr1, "root", "123456");
            MysqlXADataSource ds2 = getDataSource(connStr2, "root", "123456");

            //數(shù)據(jù)庫(kù)1獲取連接
            XAConnection xaConnection1 = ds1.getXAConnection();
            XAResource xaResource1 = xaConnection1.getXAResource();
            Connection connection1 = xaConnection1.getConnection();
            Statement statement1 = connection1.createStatement();

            //數(shù)據(jù)庫(kù)2獲取連接
            XAConnection xaConnection2 = ds2.getXAConnection();
            XAResource xaResource2 = xaConnection2.getXAResource();
            Connection connection2 = xaConnection2.getConnection();
            Statement statement2 = connection2.createStatement();

            //創(chuàng)建事務(wù)分支的xid
            Xid xid1 = new MysqlXid(new byte[] { 0x01 }, new byte[] { 0x02 }, 100);
            Xid xid2 = new MysqlXid(new byte[] { 0x011 }, new byte[] { 0x012 }, 100);

            try {
                //事務(wù)分支1關(guān)聯(lián)分支事務(wù)sql語(yǔ)句
                xaResource1.start(xid1, XAResource.TMNOFLAGS);
                int update1Result = statement1.executeUpdate("update account_from set money=money - 50 where id=1");
                xaResource1.end(xid1, XAResource.TMSUCCESS);

                //事務(wù)分支2關(guān)聯(lián)分支事務(wù)sql語(yǔ)句
                xaResource2.start(xid2, XAResource.TMNOFLAGS);
                int update2Result = statement2.executeUpdate("update account_to set money= money + 50 where id=1");
                xaResource2.end(xid2, XAResource.TMSUCCESS);

                // 兩階段提交協(xié)議第一階段
                int ret1 = xaResource1.prepare(xid1);
                int ret2 = xaResource2.prepare(xid2);

                // 兩階段提交協(xié)議第二階段
                if (XAResource.XA_OK == ret1 && XAResource.XA_OK == ret2) {
                    xaResource1.commit(xid1, false);
                    xaResource2.commit(xid2, false);

                    System.out.println("reslut1:" + update1Result + ", result2:" + update2Result);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

如上代碼對(duì)兩個(gè)機(jī)器上的數(shù)據(jù)庫(kù)進(jìn)行轉(zhuǎn)賬操作。

最后

更多本地事務(wù)咨詢可以單擊我
更多分布式事務(wù)咨詢可以單擊我

想了解更多關(guān)于粘包半包問(wèn)題單擊我
更多關(guān)于分布式系統(tǒng)中服務(wù)降級(jí)策略的知識(shí)可以單擊 單擊我
想系統(tǒng)學(xué)dubbo的單擊我
想學(xué)并發(fā)的童鞋可以 單擊我

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI