溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Mycat 分布式事務(wù)的實現(xiàn)

發(fā)布時間:2020-07-15 13:33:35 來源:網(wǎng)絡(luò) 閱讀:350 作者:博文視點 欄目:數(shù)據(jù)庫

引言:Mycat已經(jīng)成為了一個強大的開源分布式數(shù)據(jù)庫中間件產(chǎn)品。面對企業(yè)應(yīng)用的海量數(shù)據(jù)事務(wù)處理,是目前最好的開源解決方案。但是如果想讓多臺機器中的數(shù)據(jù)保存一致,比較常規(guī)的解決方法是引入“協(xié)調(diào)者”來統(tǒng)一調(diào)度所有節(jié)點的執(zhí)行。 
本文選自《分布式數(shù)據(jù)庫架構(gòu)及企業(yè)實踐——基于Mycat中間件》。

  隨著并發(fā)量、數(shù)據(jù)量越來越大及業(yè)務(wù)已經(jīng)細化到不能再按照業(yè)務(wù)劃分,我們不得不使用分布式數(shù)據(jù)庫提高系統(tǒng)的性能。在分布式系統(tǒng)中,各個節(jié)點在物理上都是相對獨立的,每個節(jié)點上的數(shù)據(jù)操作都可以滿足 ACID。但是,各獨立節(jié)點之間無法知道其他節(jié)點事務(wù)的執(zhí)行情況,如果想讓多臺機器中的數(shù)據(jù)保存一致,就必須保證所有節(jié)點上的數(shù)據(jù)操作要么全部執(zhí)行成功,要么全部不執(zhí)行,比較常規(guī)的解決方法是引入“協(xié)調(diào)者”來統(tǒng)一調(diào)度所有節(jié)點的執(zhí)行。

XA 規(guī)范

  X/Open 組織(即現(xiàn)在的 Open Group)定義了分布式事務(wù)處理模型。X/Open DTP 模型(1994)包括應(yīng)用程序(AP)、事務(wù)管理器(TM)、資源管理器(RM)、通信資源管理器(CRM)四部分。事務(wù)管理器(TM)是交易中間件,資源管理器(RM)是數(shù)據(jù)庫,通信資源管理器(CRM)是消息中間件。通常把一個數(shù)據(jù)庫內(nèi)部的事務(wù)處理看作本地事務(wù),而分布式事務(wù)處理的對象是全局事務(wù)。全局事務(wù)是指在分布式事務(wù)處理環(huán)境中,多個數(shù)據(jù)庫可能需要共同完成一個工作,這個工作就是一個全局事務(wù)。在一個事務(wù)中可能更新幾個不同的數(shù)據(jù)庫,此時一個數(shù)據(jù)庫對自己內(nèi)部所做操作的提交不僅需要本身的操作成功,還需要全局事務(wù)相關(guān)的其他數(shù)據(jù)庫的操作成功。如果任一數(shù)據(jù)庫的任一操作失敗,則參與此事務(wù)的所有數(shù)據(jù)庫所做的所有操作都必須回滾。XA就是X/Open DTP 定義的交易中間件與數(shù)據(jù)庫之間的接口規(guī)范(即接口函數(shù)),交易中間件用它來通知數(shù)據(jù)庫事務(wù)的開始、結(jié)束、提交、回滾等,XA 接口函數(shù)由數(shù)據(jù)庫廠商提供,根據(jù)這一思想衍生出二階段提交協(xié)議和三階段提交協(xié)議。

二階段提交

  所謂的兩個階段是指準(zhǔn)備階段和提交階段。 
  準(zhǔn)備階段指事務(wù)協(xié)調(diào)者(事務(wù)管理器)向每個參與者(資源管理器)發(fā)送準(zhǔn)備消息,每個參與者要么直接返回失敗消息(如權(quán)限驗證失?。?,要么在本地執(zhí)行事務(wù),寫本地的 redo 和undo日志但不提交,可以進一步將準(zhǔn)備階段分為以下三步。 
 ?。?)協(xié)調(diào)者節(jié)點向所有參與者節(jié)點詢問是否可以執(zhí)行提交操作(vote),并開始等待各參與者節(jié)點的響應(yīng)。 
 ?。?)參與者節(jié)點執(zhí)行詢問發(fā)起為止的所有事務(wù)操作,并將 undo 信息和 redo 信息寫入日志。 
 ?。?)各參與者節(jié)點響應(yīng)協(xié)調(diào)者節(jié)點發(fā)起的詢問。如果參與者節(jié)點的事務(wù)操作實際執(zhí)行成功,則它返回一個“同意”消息;如果參與者節(jié)點的事務(wù)操作實際執(zhí)行失敗,則它返回一個“中止”消息。 
  提交階段指如果協(xié)調(diào)者收到了參與者的失敗消息或者超時,則直接向每個參與者發(fā)送回滾(Rollback)消息,否則發(fā)送提交(Commit)消息,參與者根據(jù)協(xié)調(diào)者的指令執(zhí)行提交或者回滾操作,釋放所有事務(wù)在處理過程中使用的鎖資源。 
  二階段提交所存在的缺點如下。 
 ?。?)同步阻塞問題,在執(zhí)行過程中所有參與節(jié)點都是事務(wù)阻塞型的,當(dāng)參與者占用公共資源時,其他第三方節(jié)點訪問公共資源時不得不處于阻塞狀態(tài)。 
  (2)單點故障,由于協(xié)調(diào)者的重要性,一旦協(xié)調(diào)者發(fā)生故障,則參與者會一直阻塞下去。 
 ?。?)數(shù)據(jù)不一致,在二階段提交的第 2 個階段中,當(dāng)協(xié)調(diào)者向參與者發(fā)送 commit 請求之后發(fā)生了局部網(wǎng)絡(luò)異?;蛘咴诎l(fā)送 commit 請求的過程中協(xié)調(diào)者發(fā)生了故障,則會導(dǎo)致只有一部分參與者接收到了 commit 請求,而在這部分參與者在接收到 commit 請求之后就會執(zhí)行commit操作,其他部分未接收到 commit 請求的機器則無法執(zhí)行事務(wù)提交,于是整個分布式系統(tǒng)便出現(xiàn)了數(shù)據(jù)不一致的現(xiàn)象。 
  由于二階段提交存在諸如同步阻塞、單點問題、數(shù)據(jù)不一致、宕機等缺陷,所以,研究者們在二階段提交的基礎(chǔ)上做了改進,提出了三階段提交。

三階段提交

  三階段提交(Three-phase commit,3PC),也叫作三階段提交協(xié)議(Three-phase commitprotocol),是二階段提交(2PC)的改進版本。三階段提交把二階段提交的準(zhǔn)備階段再次一分為二,這樣三階段提交就有 CanCommit、PreCommit、DoCommit 三個階段。 
 ?。?)CanCommit 階段:三階段提交的 CanCommit 階段其實和二階段提交的準(zhǔn)備階段很像,協(xié)調(diào)者向參與者發(fā)送 commit 請求,參與者如果可以提交就返回 Yes 響應(yīng),否則返回 No 響應(yīng)。 
 ?。?)PreCommit 階段:協(xié)調(diào)者根據(jù)參與者的反應(yīng)情況來決定是否可以記錄事務(wù)的 PreCommit操作。根據(jù)響應(yīng)情況,有以下兩種可能。

  • 假如協(xié)調(diào)者從所有參與者那里獲得的反饋都是 Yes 響應(yīng),則執(zhí)行事務(wù)。

  • 假如有任何一個參與者向協(xié)調(diào)者發(fā)送了 No 響應(yīng),或者等待超時之后協(xié)調(diào)者都沒有接到參與者的響應(yīng),則執(zhí)行事務(wù)的中斷。

(3)DoCommit階段:該階段進行真正的事務(wù)提交,也可以分為執(zhí)行提交、中斷事務(wù)兩種執(zhí)行情況。

  執(zhí)行提交的過程如下。

  • 協(xié)調(diào)者接收到參與者發(fā)送的ACK響應(yīng)后,將從預(yù)提交狀態(tài)進入提交狀態(tài),并向所有參與者發(fā)送doCommit請求。

  • 事務(wù)提交參與者接收到doCommit請求之后,執(zhí)行正式的事務(wù)提交,并在完成事務(wù)提交之后釋放所有的事務(wù)資源。

  • 事務(wù)提交完之后,向協(xié)調(diào)者發(fā)送ACK響應(yīng)。

  • 協(xié)調(diào)者接收到所有參與者的ACK響應(yīng)之后,完成事務(wù)。中斷事務(wù)的過程如下。

  • 協(xié)調(diào)者向所有參與者發(fā)送abort請求。

  • 參與者接收到 abort 請求之后,利用其在第 2 個階段記錄的 undo 信息來執(zhí)行事務(wù)的回滾操作,并在完成回滾之后釋放所有的事務(wù)資源。

  • 參與者完成事務(wù)回滾之后,向協(xié)調(diào)者發(fā)送 ACK 消息。

  • 協(xié)調(diào)者接收到參與者反饋的 ACK 消息之后,執(zhí)行事務(wù)的中斷。

Mycat 中分布式事務(wù)的實現(xiàn)

  Mycat在1.6版本以后已經(jīng)完全支持 XA 分布式強事務(wù)類型了,先通過一個簡單的示例來了解Mycat中XA的用法。 
  用戶應(yīng)用側(cè)(AP)的使用流程如下: 
 ?。?)set autocommit=0 
  在應(yīng)用層需要設(shè)置事務(wù)不能自動提交; 
  (2)set xa=on 
  在 SQL 中設(shè)置 XA 為開啟狀態(tài); 
 ?。?)執(zhí)行 SQL 
   insert into travelrecord(id,name) values(1,’N’),(6000000,’A’),(321,’D’),(13400000,’C’),(59,’E’); 
 ?。?)commit 或者 rollback 
  對事務(wù)進行提交(提交成功或者回滾異常)。 
  完整的流程圖如圖所示。 
Mycat 分布式事務(wù)的實現(xiàn) 
  Mycat 內(nèi)部實現(xiàn)側(cè)的實現(xiàn)流程如下: 
 ?。?)set autocommit=0 
  將 MysqlConnection 中的 autocommit 設(shè)置為 false; 
 ?。?)set xa=on 
  在Mycat中開啟 XA 事務(wù)管理器,用 MycatServer.getInstance().genXATXID()生成 XID,用XA START XID 命令進行 XA 事務(wù)開始標(biāo)記,繼續(xù)拼裝 SQL 業(yè)務(wù)(Mycat 會將上面的 insert 數(shù)據(jù)分片到不同的節(jié)點上),拼裝 XA END XID,XA PREPARE XID 最后進行 1pc 提交并記錄日志到 tm.log 中,如果 1pc 階段有異常,則直接回滾事務(wù) XA ROLLBACK xid。 
 ?。?)在多節(jié)點 MySQL 中全部進行 2pc 提交(XA COMMIT),提交成功后,事務(wù)結(jié)束;如果有異常,則對事務(wù)進行重新提交或者回滾。 
  Mycat 中的 XA 分布式事務(wù)的異常處理流程如下: 
 ?。?)一階段 commit 異常:如果 1pc 提交任意一個 mysql 節(jié)點無法提交或者異常,則全部節(jié)點的事務(wù)進行回滾,拋出異常給應(yīng)用側(cè)事務(wù)回滾。 
  (2)Mycat Crash Recovery 
  Mycat 崩潰以后,根據(jù) tm.log 事務(wù)日志再進行重啟恢復(fù),mycat 啟動后執(zhí)行事務(wù)日志查找各個節(jié)點中已經(jīng) prepared 的 XA 事務(wù),進行 commit 或者 rollback。

1. 相關(guān)類說明

  通過用戶應(yīng)用側(cè)發(fā)送 set xa = on ; SQL 開啟 Mycat 內(nèi)部 XA 事務(wù)管理器的功能,事務(wù)管理器將對 MySQL 數(shù)據(jù)庫進行 XA 方式的事務(wù)管理,具體事務(wù)管理功能的實現(xiàn)代碼如下:

  • MySQLConnection:數(shù)據(jù)庫連接。

  • NonBlockingSession:用戶連接 Session。

  • MultiNodeCoordinator:協(xié)調(diào)者。

  • CommitNodeHandler:分片提交處理。

  • RollbackNodeHandler:分片回滾處理。

2. 代碼解析

  XA 事務(wù)啟動的源碼如下:

public class MySQLConnection extends BackendAIOConnection {
    //設(shè)置開啟事務(wù)
    private void getAutocommitCommand(StringBuilder sb, boolean autoCommit) {        if (autoCommit) {
            sb.append("SET autocommit=1;");
        } else {
            sb.append("SET autocommit=0;");
        }
    }    public void execute(RouteResultsetNode rrn, ServerConnection sc,boolean autocommit) throws UnsupportedEncodingException {        if(!modifiedSQLExecuted && rrn.isModifySQL()) {
            modifiedSQLExecuted = true;
        }        //獲取當(dāng)前事務(wù) ID
        String xaTXID = sc.getSession2().getXaTXID();
        synAndDoExecute(xaTXID, rrn, sc.getCharsetIndex(), sc.getTxIsolation(),autocommit);
    }
……
……//省略此處代碼,建議讀者參考 GitHub 倉庫的 MyCAT-Server 項目的 MySQLConnection.java源碼}

  用戶應(yīng)用側(cè)設(shè)置手動提交以后,Mycat 會在當(dāng)前連接中加入

  SET autocommit=0;

  將該語句加入到 StringBuffer 中,等待提交到數(shù)據(jù)庫。 
  用戶連接 Session 的源碼如下:

public class NonBlockingSession implements Session {
    ……
……//省略此處代碼,建議讀者參考 GitHub 倉庫的 MyCAT-Server 項目的 NonBlockingSession.java 源碼}
SET XA = ON ;語句分析

  用戶應(yīng)用側(cè)發(fā)送該語句到 Mycat 中,由 SQL 語句解析器解析后交由 SetHandle 進行處理c.getSession2().setXATXEnabled (true); 
  調(diào)用 NonBlockSession 中的 setXATXEnable d 方法設(shè)置 XA 開關(guān)啟動,并生成 XID,代碼如下:

public void setXATXEnabled(boolean xaTXEnabled) {
    LOGGER.info("XA Transaction enabled ,con " + this.getSource());    if (xaTXEnabled && this.xaTXID == null) {
        xaTXID = genXATXID();
    }
}

  另外,NonBlockSession 會接收來自于用戶應(yīng)用側(cè)的 commit, 調(diào)用 commit 方法進行處理事務(wù)提交的邏輯。 
  在 commit()方法中,首先會 check 節(jié)點個數(shù),一個節(jié)點和多個節(jié)點分為不同的處理過程,這里只講下多個節(jié)點的處理方法 checkDistriTransaxAndExecute(); 
  該方法會對多個節(jié)點的事務(wù)進行提交。 
  協(xié)調(diào)者的源碼如下:

public class MultiNodeCoordinator implements ResponseHandler {
    ……
……//省略此處代碼,建議讀者參考 GitHub 倉庫 MyCAT-Server 項目的 MultiNodeCoordinator.java 源碼}

  在 NonBlockSession 的 checkDistriTransaxAndExecute()方法中, NonBlockSession 會話類會調(diào)用專門進行多節(jié)點協(xié)同的 MultiNodeCoordinator 類進行具體的處理,在 MultiNodeCoordinator類中,executeBatchNodeCmd 方法加入 XA 1PC 提交的處理,代碼片段如下:

for (RouteResultsetNode rrn : session.getTargetKeys()) {
    ……
    if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE){
        //recovery Log
        participantLogEntry[started] = new
        ParticipantLogEntry(xaTxId,conn.getHost(),0,conn.getSchema(),((MySQLConnection) conn).getXaStatus());
        String[] cmds = new String[]{"XA END " + xaTxId,"XA PREPARE " + xaTxId};
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Start execute the batch cmd : "+ cmds[0] + ";" +cmds[1]+","+"current connection:"+conn.getHost()+":"+conn.getPort());
        }
    mysqlCon.execBatchCmd(cmds);
    }
……
}

  在 MultiNodeCoordinator 類的 okResponse 方法中,則進行 2pc 的事務(wù)提交

MySQLConnection mysqlCon = (MySQLConnection) conn;switch (mysqlCon.getXaStatus()){
    case TxState.TX_STARTED_STATE:
    if (mysqlCon.batchCmdFinished()){
        String xaTxId = session.getXaTXID();
        String cmd = "XA COMMIT " + xaTxId;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Start execute the cmd :"+cmd+",current host:"+mysqlCon.getHost()+":"+mysqlCon.getPort());
        }
        //recovery log
        CoordinatorLogEntry coordinatorLogEntry =inMemoryRepository.get(xaTxId);
        for(int i=0; i<coordinatorLogEntry.participants.length;i++){
            LOGGER.debug("[In MemoryCoordinatorLogEntry]"+coordinatorLogEntry.participants[i]);
            if(coordinatorLogEntry.participants[i].resourceName.equals(conn.getSchema())){
                coordinatorLogEntry.participants[i].txState =TxState.TX_PREPARED_STATE;
            }
        }
        inMemoryRepository.put(session.getXaTXID(),coordinatorLogEntry);
        fileRepository.writeCheckpoint(inMemoryRepository.getAllCoordinatorLogEntries());
        //send commit
        mysqlCon.setXaStatus(TxState.TX_PREPARED_STATE);
        mysqlCon.execCmd(cmd);
    }
    return;……
}

  分片事務(wù)提交處理的源碼如下:

public class CommitNodeHandler implements ResponseHandler {
    //結(jié)束 XA
    public void commit(BackendConnection conn) {
        ……
……//省略此處代碼,建議讀者參考 GitHub 倉庫 MyCAT-Server 項目的 CommitNodeHandler.java源碼
    }    //提交 XA
    @Override
    public void okResponse(byte[] ok, BackendConnection conn) {
        ……
……//省略此處代碼,建議讀者參考 GitHub 倉庫的 MyCAT-Server 項目的 CommitNodeHandler.java 源碼}

  在 Mycat 中同樣支持單節(jié)點 MySQL 數(shù)據(jù)庫的 XA 事務(wù)處理,在 CommitNodeHandler 類中就是對單節(jié)點的 XA 二階段處理,處理方式與 MultiNodeCoordinator 類同,通過 commit 方法進行 1pc 的提交,而通過 okResponse 的方法進行 2pc 階段的事務(wù)提交。 
  分片事務(wù)回滾處理的源碼如下:

public class RollbackNodeHandler extends MultiNodeHandler {
    ……
……//省略此處代碼,建議讀者參考 GitHub 倉庫的 MyCAT-Server 項目的 RollbackNodeHandler.java 源碼}

  在 RollbackNodeHandler 的 rollback 方法中加入了對 XA 事務(wù)的 rollback 處理,用戶應(yīng)用側(cè)發(fā)起的 rollback 會在這個方法中進行處理。

for (final RouteResultsetNode node : session.getTargetKeys()) {
    ……    //support the XA rollback
    MySQLConnection mysqlCon = (MySQLConnection) conn;    if(session.getXaTXID()!=null) {        String xaTxId = session.getXaTXID();
        mysqlCon.execCmd("XA END " + xaTxId + ";");
        mysqlCon.execCmd("XA ROLLBACK " + xaTxId + ";");
    }else {
    conn.rollback();
    }
……
}

  同樣,該方法會對所有的 MySQL 數(shù)據(jù)庫節(jié)點發(fā)起 xa rollback 指令。 
  本文選自《分布式數(shù)據(jù)庫架構(gòu)及企業(yè)實踐——基于Mycat中間件》,點此鏈接可在博文視點官網(wǎng)查看。 
               Mycat 分布式事務(wù)的實現(xiàn) 
  想及時獲得更多精彩文章,可在微信中搜索“博文視點”或者掃描下方二維碼并關(guān)注。 
                Mycat 分布式事務(wù)的實現(xiàn) 


向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI