您好,登錄后才能下訂單哦!
首先我們來(lái)簡(jiǎn)要看下分布式事務(wù)處理的XA規(guī)范
可知XA規(guī)范中分布式事務(wù)有AP,RM,TM組成:
其中應(yīng)用程序(Application Program ,簡(jiǎn)稱AP):AP定義事務(wù)邊界(定義事務(wù)開始和結(jié)束)并訪問(wèn)事務(wù)邊界內(nèi)的資源。
資源管理器(Resource Manager,簡(jiǎn)稱RM):Rm管理計(jì)算機(jī)共享的資源,許多軟件都可以去訪問(wèn)這些資源,資源包含比如數(shù)據(jù)庫(kù)、文件系統(tǒng)、打印機(jī)服務(wù)器等。
Xa主要規(guī)定了RM與TM之間的交互,下面來(lái)看下XA規(guī)范中定義的RM 和 TM交互的接口:
本圖來(lái)著 參考文章XA規(guī)范25頁(yè)
xa_start負(fù)責(zé)開啟或者恢復(fù)一個(gè)事務(wù)分支,并且管理XID到調(diào)用線程
xa_end 負(fù)責(zé)取消當(dāng)前線程與事務(wù)分支的關(guān)聯(lián)
xa_prepare負(fù)責(zé)詢問(wèn)RM 是否準(zhǔn)備好了提交事務(wù)分支
xa_commit通知RM提交事務(wù)分支
XA協(xié)議是使用了二階段協(xié)議的,其中:
第一階段TM要求所有的RM準(zhǔn)備提交對(duì)應(yīng)的事務(wù)分支,詢問(wèn)RM是否有能力保證成功的提交事務(wù)分支,RM根據(jù)自己的情況,如果判斷自己進(jìn)行的工作可以被提交,那就就對(duì)工作內(nèi)容進(jìn)行持久化,并給TM回執(zhí)OK;否者給TM的回執(zhí)NO。RM在發(fā)送了否定答復(fù)并回滾了已經(jīng)的工作后,就可以丟棄這個(gè)事務(wù)分支信息了。
也就是TM與RM之間是通過(guò)兩階段提交協(xié)議進(jìn)行交互的。
MYSQL的數(shù)據(jù)庫(kù)存儲(chǔ)引擎InnoDB的事務(wù)特性能夠保證在存儲(chǔ)引擎級(jí)別實(shí)現(xiàn)ACID,而分布式事務(wù)讓存儲(chǔ)引擎級(jí)別的事務(wù)擴(kuò)展到數(shù)據(jù)庫(kù)層面,甚至擴(kuò)展到多個(gè)數(shù)據(jù)庫(kù)之間,這是通過(guò)兩階段提交協(xié)議來(lái)實(shí)現(xiàn)的,MySQL 5.0或者更新版本開始支持XA事務(wù),從下圖可知MySQL中只有InnoDB引擎支持XA協(xié)議:
Mysql中存在兩種XA事務(wù),一種是內(nèi)部XA事務(wù)主要用來(lái)協(xié)調(diào)存儲(chǔ)引擎和二進(jìn)制日志,一種是外部事務(wù)可以參與到外部分布式事務(wù)中(比如多個(gè)數(shù)據(jù)庫(kù)實(shí)現(xiàn)的分布式事務(wù)),本節(jié)我們主要討論外部事務(wù)。
在MySQL數(shù)據(jù)庫(kù)分布式事務(wù)中,MySQL是XA事務(wù)過(guò)程中的資源管理器(RM)存在的,TM是連接MySQL服務(wù)器的客戶端。MySQL數(shù)據(jù)庫(kù)是作為RM存在的,在分布式事務(wù)中一般會(huì)涉及到至少兩個(gè)RM,所以我們說(shuō)的MySQL支持XA協(xié)議是說(shuō)mysql作為RM來(lái)說(shuō)的,也就是說(shuō)MySQL實(shí)現(xiàn)了XA協(xié)議中RM應(yīng)該具有的功能;需要注意的是MySQL中只有當(dāng)隔離級(jí)別為Serializable時(shí)候才能使用分布式事務(wù),所以需要使用set global tx_isolation='serializable',session tx_isolation='serializable';
設(shè)置數(shù)據(jù)庫(kù)隔離級(jí)別(具體可以參考本地事務(wù))。
下面我們來(lái)看看在MySQL數(shù)據(jù)庫(kù)單個(gè)節(jié)點(diǎn)運(yùn)行XA事務(wù),首先來(lái)看下MySQL下xa事務(wù)語(yǔ)法:
其中xid是一個(gè)全局唯一的id標(biāo)示一個(gè)分支事務(wù),每個(gè)分支事務(wù)有自己的全局唯一的一個(gè)id,是一個(gè)字符串。
然后確認(rèn)下mysql是否啟動(dòng)了xa功能:
可知啟動(dòng)了,下面具體看一個(gè)實(shí)例:
其中首先使用XA START ‘xid' 啟動(dòng)了一個(gè)XA事務(wù),并把它置于ACTIVE狀態(tài)
對(duì)于一個(gè)ACTIVE狀態(tài)的 XA事務(wù),我們可以執(zhí)行構(gòu)成事務(wù)的多條SQL語(yǔ)句,也就是指定分支事務(wù)的邊界,然后執(zhí)行一個(gè)XA END ‘xid'語(yǔ)句,XA END把事務(wù)放入IDLE狀態(tài),也就是結(jié)束事務(wù)邊界,在xa start和xa end之間的語(yǔ)句就構(gòu)成了本分支事務(wù)的一個(gè)事務(wù)范圍。當(dāng)調(diào)用xa end 'xid1'后由于結(jié)束了事務(wù)邊界,所以這時(shí)候如何在執(zhí)行sql語(yǔ)句會(huì)拋出ERROR 1399 (XAE07): XAER_RMFAIL: The command cannot be executed when global transaction is in the IDLE state錯(cuò)誤,也就是當(dāng)分支事務(wù)處于IDLE狀態(tài)時(shí)候不允許執(zhí)行沒(méi)有包含到分支事務(wù)邊界里面的其他sql.
對(duì)于一個(gè)IDLE 狀態(tài)XA事務(wù),可以執(zhí)行一個(gè)XA PREPARE語(yǔ)句或一個(gè)XA COMMIT…ONE PHASE語(yǔ)句,其中XA PREPARE把事務(wù)放入PREPARED狀態(tài)。在此點(diǎn)上的XA RECOVER語(yǔ)句將在其輸出中包括事務(wù)的xid值,因?yàn)閄A RECOVER會(huì)列出處于PREPARED狀態(tài)的所有XA事務(wù)。XA COMMIT…ONE PHASE用于預(yù)備和提交事務(wù),也就是轉(zhuǎn)換為一階段協(xié)議,直接提交事務(wù)。
其中二階段協(xié)議中第一階段是執(zhí)行 xa prepare時(shí)候,這時(shí)候MySQL客戶端(TM)向MySQL數(shù)據(jù)庫(kù)服務(wù)器(RM)發(fā)出prepare"準(zhǔn)備提交"請(qǐng)求,數(shù)據(jù)庫(kù)收到請(qǐng)求后執(zhí)行數(shù)據(jù)修改和日志記錄等處理,處理完成后只是把事務(wù)的狀態(tài)改成"可以提交",然后把結(jié)果返回給事務(wù)管理器。
如果第一階段中數(shù)據(jù)庫(kù)都prepare成功,那么mysql客戶端(TM)向數(shù)據(jù)庫(kù)服務(wù)器發(fā)出"commit"請(qǐng)求,數(shù)據(jù)庫(kù)服務(wù)器把事務(wù)的"可以提交"狀態(tài)改為"提交完成"狀態(tài),然后返回應(yīng)答。如果在第一階段內(nèi)數(shù)據(jù)庫(kù)的操作發(fā)生了錯(cuò)誤,或者mysql客戶端(RM)收不到數(shù)據(jù)庫(kù)的回應(yīng),則認(rèn)為事務(wù)失敗,執(zhí)行rollback回撤所有數(shù)據(jù)庫(kù)的事務(wù)。
上面例子是在一個(gè)數(shù)據(jù)庫(kù)節(jié)點(diǎn)上運(yùn)行的一個(gè)分支事務(wù),演示了單個(gè)數(shù)據(jù)庫(kù)上執(zhí)行xa分支事務(wù)的流程,但是通常都是使用編程語(yǔ)言,比如Java的 JTA來(lái)完成MySQL的分布式事務(wù)的,下面一個(gè)例子用來(lái)演示:
首先添加依賴
<dependency>
<groupId>javax.transaction</groupId>
<artifactId>jta</artifactId>
<version>1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>6.0.6</version>
</dependency>
代碼:
public class XaDemo {
public static MysqlXADataSource getDataSource(String connStr, String user, String pwd) {
try {
MysqlXADataSource ds = new MysqlXADataSource();
ds.setUrl(connStr);
ds.setUser(user);
ds.setPassword(pwd);
return ds;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static void main(String[] arg) {
String connStr1 = "jdbc:mysql://192.168.0.1:3306/test";
String connStr2 = "jdbc:mysql://192.168.0.2:3306/test";
try {
//從不同數(shù)據(jù)庫(kù)獲取數(shù)據(jù)庫(kù)數(shù)據(jù)源
MysqlXADataSource ds1 = getDataSource(connStr1, "root", "123456");
MysqlXADataSource ds2 = getDataSource(connStr2, "root", "123456");
//數(shù)據(jù)庫(kù)1獲取連接
XAConnection xaConnection1 = ds1.getXAConnection();
XAResource xaResource1 = xaConnection1.getXAResource();
Connection connection1 = xaConnection1.getConnection();
Statement statement1 = connection1.createStatement();
//數(shù)據(jù)庫(kù)2獲取連接
XAConnection xaConnection2 = ds2.getXAConnection();
XAResource xaResource2 = xaConnection2.getXAResource();
Connection connection2 = xaConnection2.getConnection();
Statement statement2 = connection2.createStatement();
//創(chuàng)建事務(wù)分支的xid
Xid xid1 = new MysqlXid(new byte[] { 0x01 }, new byte[] { 0x02 }, 100);
Xid xid2 = new MysqlXid(new byte[] { 0x011 }, new byte[] { 0x012 }, 100);
try {
//事務(wù)分支1關(guān)聯(lián)分支事務(wù)sql語(yǔ)句
xaResource1.start(xid1, XAResource.TMNOFLAGS);
int update1Result = statement1.executeUpdate("update account_from set money=money - 50 where id=1");
xaResource1.end(xid1, XAResource.TMSUCCESS);
//事務(wù)分支2關(guān)聯(lián)分支事務(wù)sql語(yǔ)句
xaResource2.start(xid2, XAResource.TMNOFLAGS);
int update2Result = statement2.executeUpdate("update account_to set money= money + 50 where id=1");
xaResource2.end(xid2, XAResource.TMSUCCESS);
// 兩階段提交協(xié)議第一階段
int ret1 = xaResource1.prepare(xid1);
int ret2 = xaResource2.prepare(xid2);
// 兩階段提交協(xié)議第二階段
if (XAResource.XA_OK == ret1 && XAResource.XA_OK == ret2) {
xaResource1.commit(xid1, false);
xaResource2.commit(xid2, false);
System.out.println("reslut1:" + update1Result + ", result2:" + update2Result);
}
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
如上代碼對(duì)兩個(gè)機(jī)器上的數(shù)據(jù)庫(kù)進(jìn)行轉(zhuǎn)賬操作。
更多本地事務(wù)咨詢可以單擊我
更多分布式事務(wù)咨詢可以單擊我
想了解更多關(guān)于粘包半包問(wèn)題單擊我
更多關(guān)于分布式系統(tǒng)中服務(wù)降級(jí)策略的知識(shí)可以單擊 單擊我
想系統(tǒng)學(xué)dubbo的單擊我
想學(xué)并發(fā)的童鞋可以 單擊我
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。