溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

深入淺析Spring事務(wù)傳播的原理

發(fā)布時(shí)間:2020-11-06 17:11:14 來源:億速云 閱讀:307 作者:Leah 欄目:開發(fā)技術(shù)

今天就跟大家聊聊有關(guān)深入淺析Spring事務(wù)傳播的原理,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

事務(wù)切面的調(diào)用過程

 public Object invoke(MethodInvocation invocation) throws Throwable {
 // Work out the target class: may be {@code null}.
 // The TransactionAttributeSource should be passed the target class
 // as well as the method, which may be from an interface.
 Class<&#63;> targetClass = (invocation.getThis() != null &#63; AopUtils.getTargetClass(invocation.getThis()) : null);

 // Adapt to TransactionAspectSupport's invokeWithinTransaction...
 return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
 }

這個方法本身沒做什么事,主要是調(diào)用了父類的invokeWithinTransaction方法,注意最后一個參數(shù),傳入的是一個lambda表達(dá)式,而這個表達(dá)式中的調(diào)用的方法應(yīng)該不陌生,在分析AOP調(diào)用鏈時(shí),就是通過這個方法傳遞到下一個切面或是調(diào)用被代理實(shí)例的方法,忘記了的可以回去看看。

 protected Object invokeWithinTransaction(Method method, @Nullable Class<&#63;> targetClass,
 final InvocationCallback invocation) throws Throwable {

 // If the transaction attribute is null, the method is non-transactional.
 //獲取事務(wù)屬性類 AnnotationTransactionAttributeSource
 TransactionAttributeSource tas = getTransactionAttributeSource();

 //獲取方法上面有@Transactional注解的屬性
 final TransactionAttribute txAttr = (tas != null &#63; tas.getTransactionAttribute(method, targetClass) : null);

 //獲取事務(wù)管理器
 final PlatformTransactionManager tm = determineTransactionManager(txAttr);
 final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

 if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
 Object retVal = null;
 try {
 // 調(diào)用proceed方法
 retVal = invocation.proceedWithInvocation();
 }
 catch (Throwable ex) {
 // target invocation exception
 //事務(wù)回滾
 completeTransactionAfterThrowing(txInfo, ex);
 throw ex;
 }
 finally {
 cleanupTransactionInfo(txInfo);
 }
 //事務(wù)提交
 commitTransactionAfterReturning(txInfo);
 return retVal;
 }
 
 // 省略了else
 }

這個方法邏輯很清晰,一目了然,if里面就是對聲明式事務(wù)的處理,先調(diào)用createTransactionIfNecessary方法開啟事務(wù),然后通過invocation.proceedWithInvocation調(diào)用下一個切面,如果沒有其它切面了,就是調(diào)用被代理類的方法,出現(xiàn)異常就回滾,否則提交事務(wù),這就是Spring事務(wù)切面的執(zhí)行過程。但是,我們主要要搞懂的就是在這些方法中是如何管理事務(wù)以及事務(wù)在多個方法之間是如何傳播的。

事務(wù)的傳播性概念

傳播性是Spring自己搞出來的,數(shù)據(jù)庫是沒有的,因?yàn)樯婕暗椒椒ㄩg的調(diào)用,那么必然就需要考慮事務(wù)在這些方法之間如何流轉(zhuǎn),所以Spring提供了7個傳播屬性供選擇,可以將其看成兩大類,即是否支持當(dāng)前事務(wù):

支持當(dāng)前事務(wù)(在同一個事務(wù)中):

PROPAGATION_REQUIRED:支持當(dāng)前事務(wù),如果不存在,就新建一個事務(wù)。

PROPAGATION_MANDATORY:支持當(dāng)前事務(wù),如果不存在,就拋出異常。

PROPAGATION_SUPPORTS:支持當(dāng)前事務(wù),如果不存在,就不使用事務(wù)。

不支持當(dāng)前事務(wù)(不在同一個事務(wù)中):

PROPAGATION_NEVER:以非事務(wù)的方式運(yùn)行,如果有事務(wù),則拋出異常。

PROPAGATION_NOT_SUPPORTED:以非事務(wù)的方式運(yùn)行,如果有事務(wù),則掛起當(dāng)前事務(wù)。

PROPAGATION_REQUIRES_NEW:新建事務(wù),如果有事務(wù),掛起當(dāng)前事務(wù)(兩個事務(wù)相互獨(dú)立,父事務(wù)回滾不影響子事務(wù))。

PROPAGATION_NESTED:如果當(dāng)前事務(wù)存在,則嵌套事務(wù)執(zhí)行(指必須依存父事務(wù),子事務(wù)不能單獨(dú)提交且父事務(wù)回滾則子事務(wù)也必須回滾,而子事務(wù)若回滾,父事務(wù)可以回滾也可以捕獲異常)。如果當(dāng)前沒有事務(wù),則進(jìn)行與PROPAGATION_REQUIRED類似的操作。

別看屬性這么多,實(shí)際上我們主要用的是PROPAGATION_REQUIRED默認(rèn)屬性,一些特殊業(yè)務(wù)下可能會用到PROPAGATION_REQUIRES_NEW以及PROPAGATION_NESTED。

下面我會假設(shè)一個場景,并主要分析這三個屬性。

public class A {
 
 @Autowired
 private B b;

 @Transactional
 public void addA() {
 b.addB();
 }
}

public class B {
 @Transactional
 public void addB() {
 // doSomething...
 }
}

上面我創(chuàng)建了A、B兩個類,每個類中有一個事務(wù)方法,使用了聲明式事務(wù)并采用的默認(rèn)傳播屬性,在A中調(diào)用了B的方法。

當(dāng)請求來了調(diào)用addA時(shí),首先調(diào)用的是代理對象的方法,因此會進(jìn)入createTransactionIfNecessary方法開啟事務(wù):

 protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
 @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

 // If no name specified, apply method identification as transaction name.
 if (txAttr != null && txAttr.getName() == null) {
 txAttr = new DelegatingTransactionAttribute(txAttr) {
 @Override
 public String getName() {
  return joinpointIdentification;
 }
 };
 }

 TransactionStatus status = null;
 if (txAttr != null) {
 if (tm != null) {
 //開啟事務(wù),這里重點(diǎn)看
 status = tm.getTransaction(txAttr);
 }
 else {
 }
 }
 //創(chuàng)建事務(wù)信息對象,記錄新老事務(wù)信息對象
 return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
 }

實(shí)際上開啟事務(wù)是通過AbstractPlatformTransactionManager做的,而這個類是一個抽象類,具體實(shí)例化的對象就是我們在項(xiàng)目里常配置的DataSourceTransactionManager對象。

 public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
 //這里重點(diǎn)看,.DataSourceTransactionObject拿到對象
 Object transaction = doGetTransaction();

 // Cache debug flag to avoid repeated checks.
 boolean debugEnabled = logger.isDebugEnabled();

 if (definition == null) {
 // Use defaults if no transaction definition given.
 definition = new DefaultTransactionDefinition();
 }

 //第一次進(jìn)來connectionHolder為空的,所以不存在事務(wù)
 if (isExistingTransaction(transaction)) {
 // Existing transaction found -> check propagation behavior to find out how to behave.
 return handleExistingTransaction(definition, transaction, debugEnabled);
 }

 // Check definition settings for new transaction.
 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
 throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
 }

 // No existing transaction found -> check propagation behavior to find out how to proceed.
 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
 throw new IllegalTransactionStateException(
  "No existing transaction found for transaction marked with propagation 'mandatory'");
 }
 //第一次進(jìn)來大部分會走這里
 else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
 //先掛起
 SuspendedResourcesHolder suspendedResources = suspend(null);
 if (debugEnabled) {
  logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
 }
 try {
  boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  //創(chuàng)建事務(wù)狀態(tài)對象,其實(shí)就是封裝了事務(wù)對象的一些信息,記錄事務(wù)狀態(tài)的
  DefaultTransactionStatus status = newTransactionStatus(
  definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);

  //開啟事務(wù),重點(diǎn)看看 DataSourceTransactionObject
  doBegin(transaction, definition);

  //開啟事務(wù)后,改變事務(wù)狀態(tài)
  prepareSynchronization(status, definition);
  return status;
 }
 catch (RuntimeException | Error ex) {
 resume(null, suspendedResources);
 throw ex;
 }
 }
 else {
 boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
 return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
 }
 }

這個方法流程比較長,一步步來看,先調(diào)用doGetTransaction方法獲取一個DataSourceTransactionObject對象,這個類是JdbcTransactionObjectSupport的子類,在父類中持有了一個ConnectionHolder對象,見名知意,這個對象保存了當(dāng)前的連接。

 protected Object doGetTransaction() {
 //管理connection對象,創(chuàng)建回滾點(diǎn),按照回滾點(diǎn)回滾,釋放回滾點(diǎn)
 DataSourceTransactionObject txObject = new DataSourceTransactionObject();

 //DataSourceTransactionManager默認(rèn)是允許嵌套事務(wù)的
 txObject.setSavepointAllowed(isNestedTransactionAllowed());

 //obtainDataSource() 獲取數(shù)據(jù)源對象,其實(shí)就是數(shù)據(jù)庫連接塊對象
 ConnectionHolder conHolder =
 (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
 txObject.setConnectionHolder(conHolder, false);
 return txObject;
 }

追溯getResource方法可以看到ConnectionHolder 是從ThreadLocal里獲取的,也就是當(dāng)前線程,key是DataSource對象;但是仔細(xì)思考下我們是第一次進(jìn)來,所以這里肯定獲取不到的,反之,要從這里獲取到值,那必然是同一個線程第二次及以后進(jìn)入到這里,也就是在addA調(diào)用addB時(shí),另外需要注意這里保存ConnectionHolder到DataSourceTransactionObject對象時(shí)是將newConnectionHolder屬性設(shè)置為false了的。

繼續(xù)往后,創(chuàng)建完transaction對象后,會調(diào)用isExistingTransaction判斷是否已經(jīng)存在一個事務(wù),如果存在就會調(diào)用handleExistingTransaction方法,這個方法就是處理事務(wù)傳播的核心方法,因?yàn)槲覀兪堑谝淮芜M(jìn)來,肯定不存在事務(wù),所以先跳過。

再往后,可以看到就是處理不同的傳播屬性,主要看到下面這個部分:

 else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
 //先掛起
 SuspendedResourcesHolder suspendedResources = suspend(null);
 if (debugEnabled) {
  logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
 }
 try {
  boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  //創(chuàng)建事務(wù)狀態(tài)對象,其實(shí)就是封裝了事務(wù)對象的一些信息,記錄事務(wù)狀態(tài)的
  DefaultTransactionStatus status = newTransactionStatus(
  definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);

  //開啟事務(wù),重點(diǎn)看看 DataSourceTransactionObject
  doBegin(transaction, definition);

  //開啟事務(wù)后,改變事務(wù)狀態(tài)
  prepareSynchronization(status, definition);
  return status;
 }
 catch (RuntimeException | Error ex) {
 resume(null, suspendedResources);
 throw ex;
 }
 }

第一次進(jìn)來時(shí),PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW和PROPAGATION_NESTED都會進(jìn)入到這里,首先會調(diào)用suspend掛起當(dāng)前存在的事務(wù),在這里沒啥作用。接下來通過newTransactionStatus創(chuàng)建了DefaultTransactionStatus對象,這個對象主要就是存儲當(dāng)前事務(wù)的一些狀態(tài)信息,需要特別注意newTransaction屬性設(shè)置為了true,表示是一個新事務(wù)。

狀態(tài)對象創(chuàng)建好之后就是通過doBegin開啟事務(wù),這是一個模板方法:

protected void doBegin(Object transaction, TransactionDefinition definition) {
 DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
 Connection con = null;

 try {
 //如果沒有數(shù)據(jù)庫連接
 if (!txObject.hasConnectionHolder() ||
  txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
 //從連接池里面獲取連接
 Connection newCon = obtainDataSource().getConnection();
 if (logger.isDebugEnabled()) {
  logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
 }
 //把連接包裝成ConnectionHolder,然后設(shè)置到事務(wù)對象中
 txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
 }

 txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
 con = txObject.getConnectionHolder().getConnection();

 //從數(shù)據(jù)庫連接中獲取隔離級別
 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
 txObject.setPreviousIsolationLevel(previousIsolationLevel);

 // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
 // so we don't want to do it unnecessarily (for example if we've explicitly
 // configured the connection pool to set it already).
 if (con.getAutoCommit()) {
 txObject.setMustRestoreAutoCommit(true);
 if (logger.isDebugEnabled()) {
  logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
 }
 //關(guān)閉連接的自動提交,其實(shí)這步就是開啟了事務(wù)
 con.setAutoCommit(false);
 }

 //設(shè)置只讀事務(wù) 從這一點(diǎn)設(shè)置的時(shí)間點(diǎn)開始(時(shí)間點(diǎn)a)到這個事務(wù)結(jié)束的過程中,其他事務(wù)所提交的數(shù)據(jù),該事務(wù)將看不見!
 //設(shè)置只讀事務(wù)就是告訴數(shù)據(jù)庫,我這個事務(wù)內(nèi)沒有新增,修改,刪除操作只有查詢操作,不需要數(shù)據(jù)庫鎖等操作,減少數(shù)據(jù)庫壓力
 prepareTransactionalConnection(con, definition);

 //自動提交關(guān)閉了,就說明已經(jīng)開啟事務(wù)了,事務(wù)是活動的
 txObject.getConnectionHolder().setTransactionActive(true);

 int timeout = determineTimeout(definition);
 if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
 txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
 }

 // Bind the connection holder to the thread.
 if (txObject.isNewConnectionHolder()) {
 //如果是新創(chuàng)建的事務(wù),則建立當(dāng)前線程和數(shù)據(jù)庫連接的關(guān)系
 TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
 }
 }

 catch (Throwable ex) {
 if (txObject.isNewConnectionHolder()) {
 DataSourceUtils.releaseConnection(con, obtainDataSource());
 txObject.setConnectionHolder(null, false);
 }
 throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
 }
 }

這個方法里面主要做了六件事:

首先從連接池獲取連接并保存到DataSourceTransactionObject對象中。

關(guān)閉數(shù)據(jù)庫的自動提交,也就是開啟事務(wù)。

獲取數(shù)據(jù)庫的隔離級別。

根據(jù)屬性設(shè)置該事務(wù)是否為只讀事務(wù)。

將該事務(wù)標(biāo)識為活動事務(wù)(transactionActive=true)。

將ConnectionHolder對象與當(dāng)前線程綁定。

完成之后通過prepareSynchronization將事務(wù)的屬性和狀態(tài)設(shè)置到TransactionSynchronizationManager對象中進(jìn)行管理。最后返回到createTransactionIfNecessary方法中創(chuàng)建TransactionInfo對象與當(dāng)前線程綁定并返回。

通過以上的步驟就開啟了事務(wù),接下來就是通過proceedWithInvocation調(diào)用其它切面,這里我們先假設(shè)沒有其它切面了,那么就是直接調(diào)用到A類的addA方法,在這個方法中又調(diào)用了B類的addB方法,那么肯定也是調(diào)用到代理類的方法,因此又會進(jìn)入到createTransactionIfNecessary方法中。但這次進(jìn)來通過isExistingTransaction判斷是存在事務(wù)的,因此會進(jìn)入到handleExistingTransaction方法:

 private TransactionStatus handleExistingTransaction(
 TransactionDefinition definition, Object transaction, boolean debugEnabled)
 throws TransactionException {

 //不允許有事務(wù),直接異常
 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
 throw new IllegalTransactionStateException(
  "Existing transaction found for transaction marked with propagation 'never'");
 }

 //以非事務(wù)方式執(zhí)行操作,如果當(dāng)前存在事務(wù),就把當(dāng)前事務(wù)掛起
 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
 if (debugEnabled) {
 logger.debug("Suspending current transaction");
 }
 //掛起當(dāng)前事務(wù)
 Object suspendedResources = suspend(transaction);
 boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
 //修改事務(wù)狀態(tài)信息,把事務(wù)的一些信息存儲到當(dāng)前線程中,ThreadLocal中
 return prepareTransactionStatus(
  definition, null, false, newSynchronization, debugEnabled, suspendedResources);
 }

 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
 if (debugEnabled) {
 logger.debug("Suspending current transaction, creating new transaction with name [" +
  definition.getName() + "]");
 }
 //掛起
 SuspendedResourcesHolder suspendedResources = suspend(transaction);
 try {
 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
 DefaultTransactionStatus status = newTransactionStatus(
  definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
 doBegin(transaction, definition);
 prepareSynchronization(status, definition);
 return status;
 }
 catch (RuntimeException | Error beginEx) {
 resumeAfterBeginException(transaction, suspendedResources, beginEx);
 throw beginEx;
 }
 }

 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
 if (!isNestedTransactionAllowed()) {
 throw new NestedTransactionNotSupportedException(
  "Transaction manager does not allow nested transactions by default - " +
  "specify 'nestedTransactionAllowed' property with value 'true'");
 }
 if (debugEnabled) {
 logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
 }
 //默認(rèn)是可以嵌套事務(wù)的
 if (useSavepointForNestedTransaction()) {
 // Create savepoint within existing Spring-managed transaction,
 // through the SavepointManager API implemented by TransactionStatus.
 // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
 DefaultTransactionStatus status =
  prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
 //創(chuàng)建回滾點(diǎn)
 status.createAndHoldSavepoint();
 return status;
 }
 else {
 // Nested transaction through nested begin and commit/rollback calls.
 // Usually only for JTA: Spring synchronization might get activated here
 // in case of a pre-existing JTA transaction.
 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
 DefaultTransactionStatus status = newTransactionStatus(
  definition, transaction, true, newSynchronization, debugEnabled, null);
 doBegin(transaction, definition);
 prepareSynchronization(status, definition);
 return status;
 }
 }
 
 // 省略
 
 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
 return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
 }

這里面也是對每個傳播屬性的判斷,先看PROPAGATION_REQUIRES_NEW的處理,因?yàn)樵搶傩砸竺看握{(diào)用都開啟一個新的事務(wù),所以首先會將當(dāng)前事務(wù)掛起,怎么掛起呢?

 protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
 if (TransactionSynchronizationManager.isSynchronizationActive()) {
 List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
 try {
 Object suspendedResources = null;
 //第一次進(jìn)來,肯定為null的
 if (transaction != null) {
  //吧connectionHolder設(shè)置為空
  suspendedResources = doSuspend(transaction);
 }

 //做數(shù)據(jù)還原操作
 String name = TransactionSynchronizationManager.getCurrentTransactionName();
 TransactionSynchronizationManager.setCurrentTransactionName(null);
 boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
 TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
 Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
 TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
 boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
 TransactionSynchronizationManager.setActualTransactionActive(false);
 return new SuspendedResourcesHolder(
  suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
 }
 catch (RuntimeException | Error ex) {
 // doSuspend failed - original transaction is still active...
 doResumeSynchronization(suspendedSynchronizations);
 throw ex;
 }
 }
 else if (transaction != null) {
 // Transaction active but no synchronization active.
 Object suspendedResources = doSuspend(transaction);
 return new SuspendedResourcesHolder(suspendedResources);
 }
 else {
 // Neither transaction nor synchronization active.
 return null;
 }
 }

 protected Object doSuspend(Object transaction) {
 DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
 txObject.setConnectionHolder(null);
 //解除綁定關(guān)系,
 return TransactionSynchronizationManager.unbindResource(obtainDataSource());
 }

這里明顯是進(jìn)入第一個if并且會調(diào)用到doSuspend方法,整體來說掛起事務(wù)很簡單:首先將DataSourceTransactionObject的ConnectionHolder設(shè)置為空并解除與當(dāng)前線程的綁定,之后將解除綁定的ConnectionHolder和其它屬性(事務(wù)名稱、隔離級別、只讀屬性)通通封裝到SuspendedResourcesHolder對象,并將當(dāng)前事務(wù)的活動狀態(tài)設(shè)置為false。掛起事務(wù)之后又通過newTransactionStatus創(chuàng)建了一個新的事務(wù)狀態(tài)并調(diào)用doBegin開啟事務(wù),這里不再重復(fù)分析。

接著來看PROPAGATION_NESTED:

 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
 if (!isNestedTransactionAllowed()) {
 throw new NestedTransactionNotSupportedException(
  "Transaction manager does not allow nested transactions by default - " +
  "specify 'nestedTransactionAllowed' property with value 'true'");
 }
 if (debugEnabled) {
 logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
 }
 //默認(rèn)是可以嵌套事務(wù)的
 if (useSavepointForNestedTransaction()) {
 // Create savepoint within existing Spring-managed transaction,
 // through the SavepointManager API implemented by TransactionStatus.
 // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
 DefaultTransactionStatus status =
  prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
 //創(chuàng)建回滾點(diǎn)
 status.createAndHoldSavepoint();
 return status;
 }
 else {
 // Nested transaction through nested begin and commit/rollback calls.
 // Usually only for JTA: Spring synchronization might get activated here
 // in case of a pre-existing JTA transaction.
 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
 DefaultTransactionStatus status = newTransactionStatus(
  definition, transaction, true, newSynchronization, debugEnabled, null);
 doBegin(transaction, definition);
 prepareSynchronization(status, definition);
 return status;
 }
 }

這里面可以看到如果允許嵌套事務(wù),就會創(chuàng)建一個DefaultTransactionStatus對象(注意newTransaction是false,表明不是一個新事務(wù))和回滾點(diǎn);如果不允許嵌套,就會創(chuàng)建新事務(wù)并開啟。

當(dāng)上面的判斷都不滿足時(shí),也就是傳播屬性為默認(rèn)PROPAGATION_REQUIRED時(shí),則只是創(chuàng)建了一個newTransaction為false的DefaultTransactionStatus返回。

完成之后又是調(diào)用proceedWithInvocation,那么就是執(zhí)行B類的addB方法,假如沒有發(fā)生異常,那么就會回到切面調(diào)用commitTransactionAfterReturning提交addB的事務(wù):

 protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
 if (txInfo != null && txInfo.getTransactionStatus() != null) {
 txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
 }
 }

 public final void commit(TransactionStatus status) throws TransactionException {

 processCommit(defStatus);
 }

 private void processCommit(DefaultTransactionStatus status) throws TransactionException {
 try {
 boolean beforeCompletionInvoked = false;

 try {
 boolean unexpectedRollback = false;
 prepareForCommit(status);
 triggerBeforeCommit(status);
 triggerBeforeCompletion(status);
 beforeCompletionInvoked = true;

 if (status.hasSavepoint()) {
  if (status.isDebug()) {
  logger.debug("Releasing transaction savepoint");
  }
  // 如果是nested,沒有提交,只是將savepoint清除掉了
  unexpectedRollback = status.isGlobalRollbackOnly();
  status.releaseHeldSavepoint();
 }
 //如果都是PROPAGATION_REQUIRED,最外層的才會走進(jìn)來統(tǒng)一提交,如果是PROPAGATION_REQUIRES_NEW,每一個事務(wù)都會進(jìn)來
 else if (status.isNewTransaction()) {
  if (status.isDebug()) {
  logger.debug("Initiating transaction commit");
  }
  unexpectedRollback = status.isGlobalRollbackOnly();
  doCommit(status);
 }
 else if (isFailEarlyOnGlobalRollbackOnly()) {
  unexpectedRollback = status.isGlobalRollbackOnly();
 }
 }
 }
 finally {
 cleanupAfterCompletion(status);
 }
 }

主要邏輯在processCommit方法中。如果存在回滾點(diǎn),可以看到并沒有提交事務(wù),只是將當(dāng)前事務(wù)的回滾點(diǎn)清除了;而如果是新事務(wù),就會調(diào)用doCommit提交事務(wù),也就是只有PROPAGATION_REQUIRED屬性下的最外層事務(wù)和PROPAGATION_REQUIRES_NEW屬性下的事務(wù)能提交。事務(wù)提交完成后會調(diào)用cleanupAfterCompletion清除當(dāng)前事務(wù)的狀態(tài),如果有掛起的事務(wù)還會通過resume恢復(fù)掛起的事務(wù)(將解綁的連接和當(dāng)前線程綁定以及將之前保存的事務(wù)狀態(tài)重新設(shè)置回去)。當(dāng)前事務(wù)正常提交后,那么就會輪到addA方法提交,處理邏輯同上,不再贅述。

如果調(diào)用addB發(fā)生異常,就會通過completeTransactionAfterThrowing進(jìn)行回滾:

 protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
 if (txInfo != null && txInfo.getTransactionStatus() != null) {
 if (logger.isTraceEnabled()) {
 logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
  "] after exception: " + ex);
 }
 if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
 try {
  txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
 }
 }
 }
 }

 public final void rollback(TransactionStatus status) throws TransactionException {
 DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
 processRollback(defStatus, false);
 }

 private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
 try {
 boolean unexpectedRollback = unexpected;

 try {
 triggerBeforeCompletion(status);

 //按照嵌套事務(wù)按照回滾點(diǎn)回滾
 if (status.hasSavepoint()) {
  if (status.isDebug()) {
  logger.debug("Rolling back transaction to savepoint");
  }
  status.rollbackToHeldSavepoint();
 }
 //都為PROPAGATION_REQUIRED最外層事務(wù)統(tǒng)一回滾
 else if (status.isNewTransaction()) {
  if (status.isDebug()) {
  logger.debug("Initiating transaction rollback");
  }
  doRollback(status);
 }
 else {
  // Participating in larger transaction
  if (status.hasTransaction()) {
  if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
  if (status.isDebug()) {
  logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
  }
  doSetRollbackOnly(status);
  }
  else {
  if (status.isDebug()) {
  logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
  }
  }
  }
  else {
  logger.debug("Should roll back transaction but cannot - no transaction available");
  }
  // Unexpected rollback only matters here if we're asked to fail early
  if (!isFailEarlyOnGlobalRollbackOnly()) {
  unexpectedRollback = false;
  }
 }
 }
 catch (RuntimeException | Error ex) {
 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
 throw ex;
 }

 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

 // Raise UnexpectedRollbackException if we had a global rollback-only marker
 if (unexpectedRollback) {
 throw new UnexpectedRollbackException(
  "Transaction rolled back because it has been marked as rollback-only");
 }
 }
 finally {
 cleanupAfterCompletion(status);
 }
 }

流程和提交是一樣的,先是判斷有沒有回滾點(diǎn),如果有就回到到回滾點(diǎn)并清除該回滾點(diǎn);如果沒有則判斷是不是新事務(wù)(PROPAGATION_REQUIRED屬性下的最外層事務(wù)和PROPAGATION_REQUIRES_NEW屬性下的事務(wù)),滿足則直接回滾當(dāng)前事務(wù)?;貪L完成后同樣需要清除掉當(dāng)前的事務(wù)狀態(tài)并恢復(fù)掛起的連接。另外需要特別注意的是在catch里面調(diào)用完回滾邏輯后,還通過throw拋出了異常,這意味著什么?意味著即使是嵌套事務(wù),內(nèi)層事務(wù)的回滾也會導(dǎo)致外層事務(wù)的回滾,也就是addA的事務(wù)也會跟著回滾。

至此,事務(wù)的傳播原理分析完畢,深入看每個方法的實(shí)現(xiàn)是很復(fù)雜的,但如果僅僅是分析各個傳播屬性對事務(wù)的影響,則有一個簡單的方法。我們可以將內(nèi)層事務(wù)切面等效替換掉invocation.proceedWithInvocation方法,比如上面兩個類的調(diào)用可以看作是下面這樣:

// addA的事務(wù)
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
 // addB的事務(wù)
 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
 Object retVal = null;
 try {
 retVal = invocation.proceedWithInvocation();
 }
 catch (Throwable ex) {
 // target invocation exception
 //事務(wù)回滾
 completeTransactionAfterThrowing(txInfo, ex);
 throw ex;
 }
 finally {
 cleanupTransactionInfo(txInfo);
 }
 //事務(wù)提交
 commitTransactionAfterReturning(txInfo);
}
catch (Throwable ex) {
 //事務(wù)回滾
 completeTransactionAfterThrowing(txInfo, ex);
 throw ex;
}
//事務(wù)提交
commitTransactionAfterReturning(txInfo);

這樣看是不是很容易就能分析出事務(wù)之間的影響以及是提交還是回滾了?下面來看幾個實(shí)例分析。

實(shí)例分析

我再添加一個C類,和addC的方法,然后在addA里面調(diào)用這個方法。

TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
 // addB的事務(wù)
 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
 Object retVal = null;
 try {
 b.addB();
 }
 catch (Throwable ex) {
 // target invocation exception
 //事務(wù)回滾
 completeTransactionAfterThrowing(txInfo, ex);
 throw ex;
 }
 //事務(wù)提交
 commitTransactionAfterReturning(txInfo);

 // addC的事務(wù)
 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
 Object retVal = null;
 try {
 c.addC();
 }
 catch (Throwable ex) {
 // target invocation exception
 //事務(wù)回滾
 completeTransactionAfterThrowing(txInfo, ex);
 throw ex;
 }
 //事務(wù)提交
 commitTransactionAfterReturning(txInfo);
}
catch (Throwable ex) {
 //事務(wù)回滾
 completeTransactionAfterThrowing(txInfo, ex);
 throw ex;
}
//事務(wù)提交
commitTransactionAfterReturning(txInfo);

等效替換后就是上面這個代碼,我們分別來分析。

都是PROPAGATION_REQUIRED屬性:通過上面的分析,我們知道三個方法都是同一個連接和事務(wù),那么任何一個出現(xiàn)異常則都會回滾。

addB為PROPAGATION_REQUIRES_NEW:

如果B中拋出異常,那么B中肯定會回滾,接著異常向上拋,導(dǎo)致A事務(wù)整體回滾;

如果C中拋出異常,不難看出C和A都會回滾,但B已經(jīng)提交了,因此不會受影響。

addC為PROPAGATION_NESTED,addB為PROPAGATION_REQUIRES_NEW:

如果B中拋出異常,那么B回滾并拋出異常,A也回滾,C不會執(zhí)行;

如果C中拋出異常,先是回滾到回滾點(diǎn)并拋出異常,所以A也回滾,但B此時(shí)已經(jīng)提交,不受影響。

都是PROPAGATION_NESTED:雖然創(chuàng)建了回滾點(diǎn),但是仍然是同一個連接,任何一個發(fā)生異常都會回滾,如果不想影響彼此,可以try-catch生吞子事務(wù)的異常實(shí)現(xiàn)。

還有其它很多情況,這里就不一一列舉了,只要使用上面的分析方法都能夠很輕松的分析出來。

總結(jié)

本篇詳細(xì)分析了事務(wù)的傳播原理,另外還有隔離級別,這在Spring中沒有體現(xiàn),需要我們自己結(jié)合數(shù)據(jù)庫的知識進(jìn)行分析設(shè)置。最后我們還需要考慮聲明式事務(wù)和編程式事務(wù)的優(yōu)缺點(diǎn),聲明式事務(wù)雖然簡單,但不適合用在長事務(wù)中,會占用大量連接資源,這時(shí)就需要考慮利用編程式事務(wù)的靈活性了。

總而言之,事務(wù)的使用并不是一律默認(rèn)就好,接口的一致性和吞吐量與事務(wù)有著直接關(guān)系,嚴(yán)重情況下可能會導(dǎo)致系統(tǒng)崩潰。

看完上述內(nèi)容,你們對深入淺析Spring事務(wù)傳播的原理有進(jìn)一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI