您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“Java @GlobalLock注解怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“Java @GlobalLock注解怎么使用”吧!
對于某條數(shù)據(jù)進(jìn)行更新操作,如果全局事務(wù)正在進(jìn)行,當(dāng)某個(gè)本地事務(wù)需要更新該數(shù)據(jù)時(shí),需要使用@GlobalLock確保其不會對全局事務(wù)正在操作的數(shù)據(jù)進(jìn)行修改。防止的本地事務(wù)對全局事務(wù)的數(shù)據(jù)臟寫。如果和select for update組合使用,還可以起到防止臟讀的效果。
首先我們知道,seata的AT模式是二段提交的,而且AT模式能夠做到事務(wù)ACID四種特性中的A原子性和D持久性,默認(rèn)情況下隔離級別也只能保證在讀未提交
那么為了保證原子性,在全局事務(wù)未提交之前,其中被修改的數(shù)據(jù)會被加上全局鎖,保證不再會被其他全局事務(wù)修改。
但是全局鎖僅僅能防止全局事務(wù)對一個(gè)上鎖的數(shù)據(jù)再次進(jìn)行修改,在很多業(yè)務(wù)場景中我們是沒有跨系統(tǒng)的rpc調(diào)用的,通常是不會加分布式事務(wù)的。
例如有分布式事務(wù)執(zhí)行完畢A系統(tǒng)的業(yè)務(wù)邏輯,正在繼續(xù)執(zhí)行B系統(tǒng)邏輯,并且A系統(tǒng)事務(wù)已經(jīng)提交。此時(shí)A系統(tǒng)一個(gè)本地的spring事務(wù)去與分布式事務(wù)修改同一行數(shù)據(jù),是可以正常修改的
由于本地的spring事務(wù)并不受seata的全局鎖控制容易導(dǎo)致臟寫,即全局事務(wù)修改數(shù)據(jù)后,還未提交,數(shù)據(jù)又被本地事務(wù)改掉了。這很容易發(fā)生數(shù)據(jù)出錯(cuò)的問題,而且十分有可能導(dǎo)致全局事務(wù)回滾時(shí)發(fā)現(xiàn) 數(shù)據(jù)已經(jīng)dirty(與uodoLog中的beforeImage不同)。那么就會回滾失敗,進(jìn)而導(dǎo)致全局鎖無法釋放,后續(xù)的操作無法進(jìn)行下去。也是比較嚴(yán)重的問題。
一種解決辦法就是,針對所有相關(guān)操作都加上AT全局事務(wù),但這顯然是沒必要的,因?yàn)槿质聞?wù)意味者需要與seata-server進(jìn)行通信,創(chuàng)建全局事務(wù),注冊分支事務(wù),記錄undoLog,判斷鎖沖突,注冊鎖。
那么對于不需要跨系統(tǒng),跨庫的的業(yè)務(wù)來說,使用GlobalTransactional實(shí)在是有點(diǎn)浪費(fèi)了
那么更加輕量的GlobalLock就能夠發(fā)揮作用了,其只需要判斷本地的修改是否與全局鎖沖突就夠了
加上@GlobalLock之后,會進(jìn)入切面
io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke
進(jìn)而進(jìn)入這個(gè)方法,處理全局鎖
Object handleGlobalLock(final MethodInvocation methodInvocation, final GlobalLock globalLockAnno) throws Throwable { return globalLockTemplate.execute(new GlobalLockExecutor() { @Override public Object execute() throws Throwable { return methodInvocation.proceed(); } @Override public GlobalLockConfig getGlobalLockConfig() { GlobalLockConfig config = new GlobalLockConfig(); config.setLockRetryInternal(globalLockAnno.lockRetryInternal()); config.setLockRetryTimes(globalLockAnno.lockRetryTimes()); return config; } }); }
進(jìn)入execute方法
public Object execute(GlobalLockExecutor executor) throws Throwable { boolean alreadyInGlobalLock = RootContext.requireGlobalLock(); if (!alreadyInGlobalLock) { RootContext.bindGlobalLockFlag(); } // set my config to config holder so that it can be access in further execution // for example, LockRetryController can access it with config holder GlobalLockConfig myConfig = executor.getGlobalLockConfig(); GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig); try { return executor.execute(); } finally { // only unbind when this is the root caller. // otherwise, the outer caller would lose global lock flag if (!alreadyInGlobalLock) { RootContext.unbindGlobalLockFlag(); } // if previous config is not null, we need to set it back // so that the outer logic can still use their config if (previousConfig != null) { GlobalLockConfigHolder.setAndReturnPrevious(previousConfig); } else { GlobalLockConfigHolder.remove(); } } } }
先判斷當(dāng)前是否已經(jīng)在globalLock范圍之內(nèi),如果已經(jīng)在范圍之內(nèi),那么把上層的配置取出來,用新的配置替換,并在方法執(zhí)行完畢時(shí)候,釋放鎖,或者將配置替換成之前的上層配置
如果開啟全局鎖,會在threadLocal put一個(gè)標(biāo)記
//just put something not null CONTEXT_HOLDER.put(KEY_GLOBAL_LOCK_FLAG, VALUE_GLOBAL_LOCK_FLAG);
開始執(zhí)行業(yè)務(wù)方法
那么加上相關(guān)GlobalLock標(biāo)記的和普通方法的區(qū)別在哪里?
我們都知道,seata會對數(shù)據(jù)庫連接做代理,在生成PreparedStatement時(shí)會進(jìn)入
io.seata.rm.datasource.AbstractConnectionProxy#prepareStatement(java.lang.String)
@Override public PreparedStatement prepareStatement(String sql) throws SQLException { String dbType = getDbType(); // support oracle 10.2+ PreparedStatement targetPreparedStatement = null; if (BranchType.AT == RootContext.getBranchType()) { List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType); if (sqlRecognizers != null && sqlRecognizers.size() == 1) { SQLRecognizer sqlRecognizer = sqlRecognizers.get(0); if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) { TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(), sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId()); String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()]; tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray); targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray); } } } if (targetPreparedStatement == null) { targetPreparedStatement = getTargetConnection().prepareStatement(sql); } return new PreparedStatementProxy(this, targetPreparedStatement, sql); }
這里顯然不會進(jìn)入AT模式的邏輯,那么直接通過真正的數(shù)據(jù)庫連接,生成PreparedStatement,再使用PreparedStatementProxy進(jìn)行包裝,代理增強(qiáng)
在使用PreparedStatementProxy執(zhí)行sql時(shí),會進(jìn)入seata定義的一些邏輯
public boolean execute() throws SQLException { return ExecuteTemplate.execute(this, (statement, args) -> statement.execute()); }
最終來到
io.seata.rm.datasource.exec.ExecuteTemplate#execute(java.util.List<io.seata.sqlparser.SQLRecognizer>, io.seata.rm.datasource.StatementProxy, io.seata.rm.datasource.exec.StatementCallback<T,S>, java.lang.Object…)
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException { if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) { // Just work as original statement return statementCallback.execute(statementProxy.getTargetStatement(), args); } String dbType = statementProxy.getConnectionProxy().getDbType(); if (CollectionUtils.isEmpty(sqlRecognizers)) { sqlRecognizers = SQLVisitorFactory.get( statementProxy.getTargetSQL(), dbType); } Executor<T> executor; if (CollectionUtils.isEmpty(sqlRecognizers)) { executor = new PlainExecutor<>(statementProxy, statementCallback); } else { if (sqlRecognizers.size() == 1) { SQLRecognizer sqlRecognizer = sqlRecognizers.get(0); switch (sqlRecognizer.getSQLType()) { case INSERT: executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer}); break; case UPDATE: executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case DELETE: executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case SELECT_FOR_UPDATE: executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; default: executor = new PlainExecutor<>(statementProxy, statementCallback); break; } } else { executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers); } } T rs; try { rs = executor.execute(args); } catch (Throwable ex) { if (!(ex instanceof SQLException)) { // Turn other exception into SQLException ex = new SQLException(ex); } throw (SQLException) ex; } return rs; }
如果當(dāng)前線程不需要鎖并且不不在AT模式的分支事務(wù)下,直接使用原生的preparedStatement執(zhí)行就好了
這里四種操作,通過不同的接口去執(zhí)行,接口又有多種不同的數(shù)據(jù)庫類型實(shí)現(xiàn)
插入分為不同的數(shù)據(jù)庫類型,通過spi獲取
seata提供了三種數(shù)據(jù)庫的實(shí)現(xiàn),
update,delete,select三種沒有多個(gè)實(shí)現(xiàn)類
他們在執(zhí)行時(shí)都會執(zhí)行父類的方法
io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitTrue
protected T executeAutoCommitTrue(Object[] args) throws Throwable { ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); try { connectionProxy.changeAutoCommit(); return new LockRetryPolicy(connectionProxy).execute(() -> { T result = executeAutoCommitFalse(args); connectionProxy.commit(); return result; }); } catch (Exception e) { // when exception occur in finally,this exception will lost, so just print it here LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e); if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) { connectionProxy.getTargetConnection().rollback(); } throw e; } finally { connectionProxy.getContext().reset(); connectionProxy.setAutoCommit(true); } }
全局鎖的策略, 是在一個(gè)while(true)循環(huán)里不斷執(zhí)行
protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception { LockRetryController lockRetryController = new LockRetryController(); while (true) { try { return callable.call(); } catch (LockConflictException lockConflict) { onException(lockConflict); lockRetryController.sleep(lockConflict); } catch (Exception e) { onException(e); throw e; } } }
如果出現(xiàn)異常是LockConflictException,進(jìn)入sleep
public void sleep(Exception e) throws LockWaitTimeoutException { if (--lockRetryTimes < 0) { throw new LockWaitTimeoutException("Global lock wait timeout", e); } try { Thread.sleep(lockRetryInternal); } catch (InterruptedException ignore) { } }
這兩個(gè)變量就是@GlobalLock注解的兩個(gè)配置,一個(gè)是重試次數(shù),一個(gè)重試之間的間隔時(shí)間。
繼續(xù)就是執(zhí)行數(shù)據(jù)庫更新操作
io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitFalse
發(fā)現(xiàn)這里也會生成,undoLog,beforeImage和afterImage,其實(shí)想想,在GlobalLock下,是沒必要生成undoLog的。但是現(xiàn)有邏輯確實(shí)要生成,這個(gè)seata后續(xù)應(yīng)該會優(yōu)化。
protected T executeAutoCommitFalse(Object[] args) throws Exception { if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) { throw new NotSupportYetException("multi pk only support mysql!"); } TableRecords beforeImage = beforeImage(); T result = statementCallback.execute(statementProxy.getTargetStatement(), args); TableRecords afterImage = afterImage(beforeImage); prepareUndoLog(beforeImage, afterImage); return result; }
生成beforeImage和aferImage的邏輯也比較簡單。分別在執(zhí)行更新前,查詢數(shù)據(jù)庫,和更新后查詢數(shù)據(jù)庫
可見記錄undoLog是十分影響性能的,查詢就多了兩次,如果undoLog入庫還要再多一次入庫操作。
再看prepareUndoLog
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException { if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) { return; } if (SQLType.UPDATE == sqlRecognizer.getSQLType()) { if (beforeImage.getRows().size() != afterImage.getRows().size()) { throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys."); } } ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage; String lockKeys = buildLockKey(lockKeyRecords); if (null != lockKeys) { connectionProxy.appendLockKey(lockKeys); SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage); connectionProxy.appendUndoLog(sqlUndoLog); } }
將lockKeys,和undoLog,暫時(shí)記錄在connectionProxy中,也就是說至此還沒有將uodoLog記錄到數(shù)據(jù)庫,也沒有判斷全局鎖,這些事情都留到了事務(wù)提交
io.seata.rm.datasource.ConnectionProxy#doCommit
private void doCommit() throws SQLException { if (context.inGlobalTransaction()) { processGlobalTransactionCommit(); } else if (context.isGlobalLockRequire()) { processLocalCommitWithGlobalLocks(); } else { targetConnection.commit(); } }
進(jìn)入io.seata.rm.datasource.ConnectionProxy#processLocalCommitWithGlobalLocks
這個(gè) 方法很簡單就是首先進(jìn)行鎖的檢查,并沒有我想象中的加索全局事務(wù)。
private void processLocalCommitWithGlobalLocks() throws SQLException { checkLock(context.buildLockKeys()); try { targetConnection.commit(); } catch (Throwable ex) { throw new SQLException(ex); } context.reset(); }
也就是說,使用GlobalLock會對全局鎖檢測,但是并不會對記錄加全局鎖。但是配合全局事務(wù)這樣已經(jīng)能夠保證全局事務(wù)的原子性了。可見GlobalLock還是要和本地事務(wù)組合一起使用的,這樣才能保證,GlobalLock執(zhí)行完畢本地事務(wù)未提交的數(shù)據(jù)不會被別的本地事務(wù)/分布式事務(wù)修改掉。
到此,相信大家對“Java @GlobalLock注解怎么使用”有了更深的了解,不妨來實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。