溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Saga模式源碼方法教程

發(fā)布時間:2021-10-19 16:31:06 來源:億速云 閱讀:162 作者:iii 欄目:web開發(fā)

本篇內容主要講解“Saga模式源碼方法教程”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Saga模式源碼方法教程”吧!

狀態(tài)機定義

以一個典型的電商購物流程為例,我們定義3個服務,訂單服務(OrderServer),賬戶服務(AccountService)和庫存服務(StorageService),這里我們把訂單服務當做聚合服務,也就是TM。

當外部下單時,訂單服務首先會創(chuàng)建一個訂單,然后調用賬戶服務扣減金額,最后調用庫存服務扣減庫存。這個流程入下圖:


Saga模式源碼方法教程

seata的saga模式是基于狀態(tài)機來實現(xiàn)了,狀態(tài)機對狀態(tài)的控制需要一個JSON文件,這個JSON文件定義如下:

{     "Name": "buyGoodsOnline",     "Comment": "buy a goods on line, add order, deduct account, deduct storage ",     "StartState": "SaveOrder",     "Version": "0.0.1",     "States": {         "SaveOrder": {             "Type": "ServiceTask",             "ServiceName": "orderSave",             "ServiceMethod": "saveOrder",             "CompensateState": "DeleteOrder",             "Next": "ChoiceAccountState",             "Input": [                 "$.[businessKey]",                 "$.[order]"             ],             "Output": {                 "SaveOrderResult": "$.#root"             },             "Status": {                 "#root == true": "SU",                 "#root == false": "FA",                 "$Exception{java.lang.Throwable}": "UN"             }         },         "ChoiceAccountState":{             "Type": "Choice",             "Choices":[                 {                     "Expression":"[SaveOrderResult] == true",                     "Next":"ReduceAccount"                 }             ],             "Default":"Fail"         },         "ReduceAccount": {             "Type": "ServiceTask",             "ServiceName": "accountService",             "ServiceMethod": "decrease",             "CompensateState": "CompensateReduceAccount",             "Next": "ChoiceStorageState",             "Input": [                 "$.[businessKey]",                 "$.[userId]",                 "$.[money]",                 {                     "throwException" : "$.[mockReduceAccountFail]"                 }             ],             "Output": {                 "ReduceAccountResult": "$.#root"             },             "Status": {                 "#root == true": "SU",                 "#root == false": "FA",                 "$Exception{java.lang.Throwable}": "UN"             },             "Catch": [                 {                     "Exceptions": [                         "java.lang.Throwable"                     ],                     "Next": "CompensationTrigger"                 }             ]         },         "ChoiceStorageState":{             "Type": "Choice",             "Choices":[                 {                     "Expression":"[ReduceAccountResult] == true",                     "Next":"ReduceStorage"                 }             ],             "Default":"Fail"         },         "ReduceStorage": {             "Type": "ServiceTask",             "ServiceName": "storageService",             "ServiceMethod": "decrease",             "CompensateState": "CompensateReduceStorage",             "Input": [                 "$.[businessKey]",                 "$.[productId]",                 "$.[count]",                 {                     "throwException" : "$.[mockReduceStorageFail]"                 }             ],             "Output": {                 "ReduceStorageResult": "$.#root"             },             "Status": {                 "#root == true": "SU",                 "#root == false": "FA",                 "$Exception{java.lang.Throwable}": "UN"             },             "Catch": [                 {                     "Exceptions": [                         "java.lang.Throwable"                     ],                     "Next": "CompensationTrigger"                 }             ],             "Next": "Succeed"         },         "DeleteOrder": {             "Type": "ServiceTask",             "ServiceName": "orderSave",             "ServiceMethod": "deleteOrder",             "Input": [                 "$.[businessKey]",                 "$.[order]"             ]         },         "CompensateReduceAccount": {             "Type": "ServiceTask",             "ServiceName": "accountService",             "ServiceMethod": "compensateDecrease",             "Input": [                 "$.[businessKey]",                 "$.[userId]",                 "$.[money]"             ]         },         "CompensateReduceStorage": {             "Type": "ServiceTask",             "ServiceName": "storageService",             "ServiceMethod": "compensateDecrease",             "Input": [                 "$.[businessKey]",                 "$.[productId]",                 "$.[count]"             ]         },         "CompensationTrigger": {             "Type": "CompensationTrigger",             "Next": "Fail"         },         "Succeed": {             "Type":"Succeed"         },         "Fail": {             "Type":"Fail",             "ErrorCode": "PURCHASE_FAILED",             "Message": "purchase failed"         }     } }

狀態(tài)機是運行在TM中的,也就是我們上面定義的訂單服務。訂單服務創(chuàng)建訂單時需要開啟一個全局事務,這時就需要啟動狀態(tài)機,代碼如下:

StateMachineEngine stateMachineEngine = (StateMachineEngine) ApplicationContextUtils.getApplicationContext().getBean("stateMachineEngine");  Map<String, Object> startParams = new HashMap<>(3); String businessKey = String.valueOf(System.currentTimeMillis()); startParams.put("businessKey", businessKey); startParams.put("order", order); startParams.put("mockReduceAccountFail", "true"); startParams.put("userId", order.getUserId()); startParams.put("money", order.getPayAmount()); startParams.put("productId", order.getProductId()); startParams.put("count", order.getCount());  //sync test StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("buyGoodsOnline", null, businessKey, startParams);

可以看到,上面代碼定義的buyGoodsOnline,正是JSON文件中name的屬性值。

狀態(tài)機初始化

那上面創(chuàng)建訂單代碼中的stateMachineEngine這個bean是在哪里定義的呢?訂單服務的demo中有一個類StateMachineConfiguration來進行定義,代碼如下:

public class StateMachineConfiguration {      @Bean     public ThreadPoolExecutorFactoryBean threadExecutor(){         ThreadPoolExecutorFactoryBean threadExecutor = new ThreadPoolExecutorFactoryBean();         threadExecutor.setThreadNamePrefix("SAGA_ASYNC_EXE_");         threadExecutor.setCorePoolSize(1);         threadExecutor.setMaxPoolSize(20);         return threadExecutor;     }      @Bean     public DbStateMachineConfig dbStateMachineConfig(ThreadPoolExecutorFactoryBean threadExecutor, DataSource hikariDataSource) throws IOException {         DbStateMachineConfig dbStateMachineConfig = new DbStateMachineConfig();         dbStateMachineConfig.setDataSource(hikariDataSource);         dbStateMachineConfig.setThreadPoolExecutor((ThreadPoolExecutor) threadExecutor.getObject());     /**      *這里配置了json文件的路徑,TM在初始化的時候,會把json文件解析成StateMachineImpl類,如果數(shù)據(jù)庫沒有保存這個狀態(tài)機,則存入數(shù)據(jù)庫seata_state_machine_def表,      *如果數(shù)據(jù)庫有記錄,則取最新的一條記錄,并且注冊到StateMachineRepositoryImpl,      *注冊的Map有2個,一個是stateMachineMapByNameAndTenant,key格式是(stateMachineName + "_" + tenantId),      *一個是stateMachineMapById,key是stateMachine.getId()      *具體代碼見StateMachineRepositoryImpl類registryStateMachine方法      *這個注冊的觸發(fā)方法在DefaultStateMachineConfig的初始化方法init(),這個類是DbStateMachineConfig的父類      */         dbStateMachineConfig.setResources(new PathMatchingResourcePatternResolver().getResources("classpath*:statelang/*.json"));//json文件         dbStateMachineConfig.setEnableAsync(true);         dbStateMachineConfig.setApplicationId("order-server");         dbStateMachineConfig.setTxServiceGroup("my_test_tx_group");         return dbStateMachineConfig;     }      @Bean     public ProcessCtrlStateMachineEngine stateMachineEngine(DbStateMachineConfig dbStateMachineConfig){         ProcessCtrlStateMachineEngine stateMachineEngine = new ProcessCtrlStateMachineEngine();         stateMachineEngine.setStateMachineConfig(dbStateMachineConfig);         return stateMachineEngine;     }      @Bean     public StateMachineEngineHolder stateMachineEngineHolder(ProcessCtrlStateMachineEngine stateMachineEngine){         StateMachineEngineHolder stateMachineEngineHolder = new StateMachineEngineHolder();         stateMachineEngineHolder.setStateMachineEngine(stateMachineEngine);         return stateMachineEngineHolder;     } }

可以看到,我們在DbStateMachineConfig中配置了狀態(tài)機的json文件,同時配置了applicationId和txServiceGroup。在DbStateMachineConfig初始化的時候,子類DefaultStateMachineConfig的init的方法會把json文件解析成狀態(tài)機,并注冊。

注冊的過程中往seata_state_machine_def這張表里插入了1條記錄,表里的content字段保存了我們的JOSON文件內容,其他字段值數(shù)據(jù)如下圖:

Saga模式源碼方法教程

附:根據(jù)前面的JSON文件,我們debug跟蹤到的StateMachineImpl的內容如下:

id = null tenantId = null appName = "SEATA" name = "buyGoodsOnline" comment = "buy a goods on line, add order, deduct account, deduct storage " version = "0.0.1" startState = "SaveOrder" status = {StateMachine$Status@9135} "AC" recoverStrategy = null isPersist = true type = "STATE_LANG" content = null gmtCreate = null states = {LinkedHashMap@9137}  size = 11    "SaveOrder" -> {ServiceTaskStateImpl@9153}     "ChoiceAccountState" -> {ChoiceStateImpl@9155}     "ReduceAccount" -> {ServiceTaskStateImpl@9157}     "ChoiceStorageState" -> {ChoiceStateImpl@9159}     "ReduceStorage" -> {ServiceTaskStateImpl@9161}     "DeleteOrder" -> {ServiceTaskStateImpl@9163}     "CompensateReduceAccount" -> {ServiceTaskStateImpl@9165}     "CompensateReduceStorage" -> {ServiceTaskStateImpl@9167}     "CompensationTrigger" -> {CompensationTriggerStateImpl@9169}     "Succeed" -> {SucceedEndStateImpl@9171}     "Fail" -> {FailEndStateImpl@9173}

啟動狀態(tài)機

在第一節(jié)創(chuàng)建訂單的代碼中,startWithBusinessKey方法進行了整個事務的啟動,這個方法還有一個異步模式startWithBusinessKeyAsync,這里我們只分析同步模式,源代碼如下:

public StateMachineInstance startWithBusinessKey(String stateMachineName, String tenantId, String businessKey,                                                  Map<String, Object> startParams) throws EngineExecutionException {     return startInternal(stateMachineName, tenantId, businessKey, startParams, false, null); } private StateMachineInstance startInternal(String stateMachineName, String tenantId, String businessKey,                                            Map<String, Object> startParams, boolean async, AsyncCallback callback)     throws EngineExecutionException {     //省略部分源代碼   //創(chuàng)建一個狀態(tài)機實例   //默認值tenantId="000001"     StateMachineInstance instance = createMachineInstance(stateMachineName, tenantId, businessKey, startParams);      /**    * ProcessType.STATE_LANG這個枚舉只有一個元素    * OPERATION_NAME_START = "start"    * callback是null    * getStateMachineConfig()返回DbStateMachineConfig    */     ProcessContextBuilder contextBuilder = ProcessContextBuilder.create().withProcessType(ProcessType.STATE_LANG)         .withOperationName(DomainConstants.OPERATION_NAME_START).withAsyncCallback(callback).withInstruction(             new StateInstruction(stateMachineName, tenantId)).withStateMachineInstance(instance)         .withStateMachineConfig(getStateMachineConfig()).withStateMachineEngine(this);      Map<String, Object> contextVariables;     if (startParams != null) {         contextVariables = new ConcurrentHashMap<>(startParams.size());         nullSafeCopy(startParams, contextVariables);     } else {         contextVariables = new ConcurrentHashMap<>();     }     instance.setContext(contextVariables);//把啟動參數(shù)賦值給狀態(tài)機實例的context     //給ProcessContextImpl的variables加參數(shù)     contextBuilder.withStateMachineContextVariables(contextVariables);      contextBuilder.withIsAsyncExecution(async);      //上面定義的建造者創(chuàng)建一個ProcessContextImpl     ProcessContext processContext = contextBuilder.build();      //這個條件是true     if (instance.getStateMachine().isPersist() && stateMachineConfig.getStateLogStore() != null) {       //記錄狀態(tài)機開始狀態(tài)         stateMachineConfig.getStateLogStore().recordStateMachineStarted(instance, processContext);     }     if (StringUtils.isEmpty(instance.getId())) {         instance.setId(             stateMachineConfig.getSeqGenerator().generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));     }      if (async) {         stateMachineConfig.getAsyncProcessCtrlEventPublisher().publish(processContext);     } else {       //發(fā)送消息到EventBus,這里的消費者是ProcessCtrlEventConsumer,在DefaultStateMachineConfig初始化時設置         stateMachineConfig.getProcessCtrlEventPublisher().publish(processContext);     }      return instance; }

上面的代碼中我們可以看出,啟動狀態(tài)記得時候主要做了2件事情,一個是記錄狀態(tài)機開始的狀態(tài),一個是發(fā)送消息到EventBus,下面我們詳細看一下這2個過程。

開啟全局事務

上面的代碼分析中,有一個記錄狀態(tài)機開始狀態(tài)的代碼,如下:

stateMachineConfig.getStateLogStore().recordStateMachineStarted(instance, processContext);

這里調用了類DbAndReportTcStateLogStore的recordStateMachineStarted方法,我們來看一下,代碼如下:

public void recordStateMachineStarted(StateMachineInstance machineInstance, ProcessContext context) {      if (machineInstance != null) {         //if parentId is not null, machineInstance is a SubStateMachine, do not start a new global transaction,         //use parent transaction instead.         String parentId = machineInstance.getParentId();         if (StringUtils.hasLength(parentId)) {             if (StringUtils.isEmpty(machineInstance.getId())) {                 machineInstance.setId(parentId);             }         } else {         //走這個分支,因為沒有配置子狀態(tài)機         /**              * 這里的beginTransaction就是開啟全局事務,        * 這里是調用TC開啟全局事務              */             beginTransaction(machineInstance, context);         }           if (StringUtils.isEmpty(machineInstance.getId()) && seqGenerator != null) {             machineInstance.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));         }          // save to db     //dbType = "MySQL"         machineInstance.setSerializedStartParams(paramsSerializer.serialize(machineInstance.getStartParams()));         executeUpdate(stateLogStoreSqls.getRecordStateMachineStartedSql(dbType),                 STATE_MACHINE_INSTANCE_TO_STATEMENT_FOR_INSERT, machineInstance);     } }

上面executeUpdate方法在子類AbstractStore,debug一下executeUpdate這個方法可以看到,這里執(zhí)行的sql如下:

INSERT INTO seata_state_machine_inst (id, machine_id, tenant_id, parent_id, gmt_started, business_key, start_params, is_running, status, gmt_updated) VALUES ('192.168.59.146:8091:65853497147990016', '06a098cab53241ca7ed09433342e9f07', '000001', null, '2020-10-31 17:18:24.773',  '1604135904773', '{"@type":"java.util.HashMap","money":50.,"productId":1L,"_business_key_":"1604135904773","businessKey":"1604135904773", "count":1,"mockReduceAccountFail":"true","userId":1L,"order":{"@type":"io.seata.sample.entity.Order","count":1,"payAmount":50, "productId":1,"userId":1}}', 1, 'RU', '2020-10-31 17:18:24.773')

可以看到,這個全局事務記錄在了表seata_state_machine_inst,記錄的是我們啟動狀態(tài)機的參數(shù),status記錄的狀態(tài)是"RU"也就是RUNNING。

分支事務處理

上一節(jié)我們提到,啟動狀態(tài)機后,向EventBus發(fā)了一條消息,這個消息的消費者是ProcessCtrlEventConsumer,我們看一下這個類的代碼:

public class ProcessCtrlEventConsumer implements EventConsumer<ProcessContext> {      private ProcessController processController;      @Override     public void process(ProcessContext event) throws FrameworkException {         //這里的processController是ProcessControllerImpl         processController.process(event);     }      @Override     public boolean accept(Class<ProcessContext> clazz) {         return ProcessContext.class.isAssignableFrom(clazz);     }      public void setProcessController(ProcessController processController) {         this.processController = processController;     } }

ProcessControllerImpl類的process方法有2個處理邏輯,process和route,代碼如下:

public void process(ProcessContext context) throws FrameworkException {      try {         //這里的businessProcessor是CustomizeBusinessProcessor         businessProcessor.process(context);          businessProcessor.route(context);      } catch (FrameworkException fex) {         throw fex;     } catch (Exception ex) {         LOGGER.error("Unknown exception occurred, context = {}", context, ex);         throw new FrameworkException(ex, "Unknown exception occurred", FrameworkErrorCode.UnknownAppError);     } }

這里的處理邏輯有些復雜,先上一張UML類圖,跟著這張圖,可以捋清楚代碼的調用邏輯:


Saga模式源碼方法教程

我們先來看一下CustomizeBusinessProcessor中的process方法:

public void process(ProcessContext context) throws FrameworkException {      /**     *processType = {ProcessType@10310} "STATE_LANG"     *code = "STATE_LANG"     *message = "SEATA State Language"     *name = "STATE_LANG"     *ordinal = 0     */     ProcessType processType = matchProcessType(context);     if (processType == null) {         if (LOGGER.isWarnEnabled()) {             LOGGER.warn("Process type not found, context= {}", context);         }         throw new FrameworkException(FrameworkErrorCode.ProcessTypeNotFound);     }      ProcessHandler processor = processHandlers.get(processType.getCode());     if (processor == null) {         LOGGER.error("Cannot find process handler by type {}, context= {}", processType.getCode(), context);         throw new FrameworkException(FrameworkErrorCode.ProcessHandlerNotFound);     }     //這里的是StateMachineProcessHandler     processor.process(context); }

這里的代碼不好理解,我們分四步來研究。

第一步,我們看一下StateMachineProcessHandler類中process方法,這個方法代理了ServiceTaskStateHandler的process方法,代碼如下:

public void process(ProcessContext context) throws FrameworkException {     /**    * instruction = {StateInstruction@11057}     * stateName = null    * stateMachineName = "buyGoodsOnline"    * tenantId = "000001"    * end = false    * temporaryState = null     */     StateInstruction instruction = context.getInstruction(StateInstruction.class);   //這里的state實現(xiàn)類是ServiceTaskStateImpl     State state = instruction.getState(context);     String stateType = state.getType();   //這里stateHandler實現(xiàn)類是ServiceTaskStateHandler     StateHandler stateHandler = stateHandlers.get(stateType);      List<StateHandlerInterceptor> interceptors = null;     if (stateHandler instanceof InterceptableStateHandler) {       //list上有1個元素ServiceTaskHandlerInterceptor         interceptors = ((InterceptableStateHandler)stateHandler).getInterceptors();     }      List<StateHandlerInterceptor> executedInterceptors = null;     Exception exception = null;     try {         if (interceptors != null && interceptors.size() > 0) {             executedInterceptors = new ArrayList<>(interceptors.size());             for (StateHandlerInterceptor interceptor : interceptors) {                 executedInterceptors.add(interceptor);                 interceptor.preProcess(context);             }         }          stateHandler.process(context);      } catch (Exception e) {         exception = e;         throw e;     } finally {          if (executedInterceptors != null && executedInterceptors.size() > 0) {             for (int i = executedInterceptors.size() - 1; i >= 0; i--) {                 StateHandlerInterceptor interceptor = executedInterceptors.get(i);                 interceptor.postProcess(context, exception);             }         }     } }

從這個方法我們看到,代理對stateHandler.process加入了前置和后置增強,增強類是ServiceTaskHandlerInterceptor,前置后置增強分別調用了interceptor的preProcess和postProcess。

第二步,我們來看一下增強邏輯。ServiceTaskHandlerInterceptor的preProcess和postProcess方法,代碼如下:

public class ServiceTaskHandlerInterceptor implements StateHandlerInterceptor {     //省略部分代碼     @Override     public void preProcess(ProcessContext context) throws EngineExecutionException {          StateInstruction instruction = context.getInstruction(StateInstruction.class);          StateMachineInstance stateMachineInstance = (StateMachineInstance)context.getVariable(             DomainConstants.VAR_NAME_STATEMACHINE_INST);         StateMachineConfig stateMachineConfig = (StateMachineConfig)context.getVariable(             DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);          //如果超時,修改狀態(tài)機狀態(tài)為FA         if (EngineUtils.isTimeout(stateMachineInstance.getGmtUpdated(), stateMachineConfig.getTransOperationTimeout())) {             String message = "Saga Transaction [stateMachineInstanceId:" + stateMachineInstance.getId()                     + "] has timed out, stop execution now.";             EngineUtils.failStateMachine(context, exception);             throw exception;         }          StateInstanceImpl stateInstance = new StateInstanceImpl();          Map<String, Object> contextVariables = (Map<String, Object>)context.getVariable(             DomainConstants.VAR_NAME_STATEMACHINE_CONTEXT);         ServiceTaskStateImpl state = (ServiceTaskStateImpl)instruction.getState(context);         List<Object> serviceInputParams = null;          Object isForCompensation = state.isForCompensation();         if (isForCompensation != null && (Boolean)isForCompensation) {             CompensationHolder compensationHolder = CompensationHolder.getCurrent(context, true);             StateInstance stateToBeCompensated = compensationHolder.getStatesNeedCompensation().get(state.getName());             if (stateToBeCompensated != null) {                  stateToBeCompensated.setCompensationState(stateInstance);                 stateInstance.setStateIdCompensatedFor(stateToBeCompensated.getId());             } else {                 LOGGER.error("Compensation State[{}] has no state to compensate, maybe this is a bug.",                     state.getName());             }       //加入補償集合             CompensationHolder.getCurrent(context, true).addForCompensationState(stateInstance.getName(),                 stateInstance);         }         //省略部分代碼         stateInstance.setInputParams(serviceInputParams);          if (stateMachineInstance.getStateMachine().isPersist() && state.isPersist()             && stateMachineConfig.getStateLogStore() != null) {              try {           //記錄一個分支事務的狀態(tài)RU到數(shù)據(jù)庫         /**           *INSERT INTO seata_state_inst (id, machine_inst_id, name, type, gmt_started, service_name, service_method, service_type, is_for_update, input_params, status, business_key, state_id_compensated_for, state_id_retried_for)                   *VALUES ('4fe5f602452c84ba5e88fd2ee9c13b35', '192.168.59.146:8091:65853497147990016', 'SaveOrder', 'ServiceTask', '2020-10-31 17:18:40.84', 'orderSave',            *'saveOrder', null, 1, '["1604135904773",{"@type":"io.seata.sample.entity.Order","count":1,"payAmount":50,"productId":1,"userId":1}]', 'RU', null, null, null)           */                 stateMachineConfig.getStateLogStore().recordStateStarted(stateInstance, context);             }         }         //省略部分代碼         stateMachineInstance.putStateInstance(stateInstance.getId(), stateInstance);//放入StateMachineInstanceImpl的stateMap用于重試或交易補償         ((HierarchicalProcessContext)context).setVariableLocally(DomainConstants.VAR_NAME_STATE_INST, stateInstance);//記錄狀態(tài)后面?zhèn)鹘oTaskStateRouter判斷全局事務結束     }      @Override     public void postProcess(ProcessContext context, Exception exp) throws EngineExecutionException {          StateInstruction instruction = context.getInstruction(StateInstruction.class);         ServiceTaskStateImpl state = (ServiceTaskStateImpl)instruction.getState(context);          StateMachineInstance stateMachineInstance = (StateMachineInstance)context.getVariable(             DomainConstants.VAR_NAME_STATEMACHINE_INST);         StateInstance stateInstance = (StateInstance)context.getVariable(DomainConstants.VAR_NAME_STATE_INST);         if (stateInstance == null || !stateMachineInstance.isRunning()) {             LOGGER.warn("StateMachineInstance[id:" + stateMachineInstance.getId() + "] is end. stop running");             return;         }          StateMachineConfig stateMachineConfig = (StateMachineConfig)context.getVariable(             DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);          if (exp == null) {             exp = (Exception)context.getVariable(DomainConstants.VAR_NAME_CURRENT_EXCEPTION);         }         stateInstance.setException(exp);          //設置事務狀態(tài)         decideExecutionStatus(context, stateInstance, state, exp);         //省略部分代碼          Map<String, Object> contextVariables = (Map<String, Object>)context.getVariable(             DomainConstants.VAR_NAME_STATEMACHINE_CONTEXT);         //省略部分代碼          context.removeVariable(DomainConstants.VAR_NAME_OUTPUT_PARAMS);         context.removeVariable(DomainConstants.VAR_NAME_INPUT_PARAMS);          stateInstance.setGmtEnd(new Date());          if (stateMachineInstance.getStateMachine().isPersist() && state.isPersist()             && stateMachineConfig.getStateLogStore() != null) {       //更新分支事務的狀態(tài)為成功       /**         * UPDATE seata_state_inst SET gmt_end = '2020-10-31 17:18:49.919', excep = null, status = 'SU',          * output_params = 'true' WHERE id = '4fe5f602452c84ba5e88fd2ee9c13b35' AND          * machine_inst_id = '192.168.59.146:8091:65853497147990016'               */             stateMachineConfig.getStateLogStore().recordStateFinished(stateInstance, context);         }         //省略部分代碼     } }

從這個代碼我們能看到,分支事務執(zhí)行前,封裝了一個StateInstanceImpl賦值給了ProcessContext,分支事務執(zhí)行后,對這個StateInstanceImpl進行了修改,這個StateInstanceImpl有3個作用:

傳入StateMachineInstanceImpl的stateMap用于重試或交易補償

記錄了分支事務的執(zhí)行情況,同時支持持久化到seata_state_inst表

傳入TaskStateRouter用作判斷全局事務結束

第三步,我們看一下被代理的方法stateHandler.process(context),正常執(zhí)行邏輯中stateHandler的實現(xiàn)類是ServiceTaskStateHandler,代碼如下:

public void process(ProcessContext context) throws EngineExecutionException {      StateInstruction instruction = context.getInstruction(StateInstruction.class);     ServiceTaskStateImpl state = (ServiceTaskStateImpl) instruction.getState(context);     StateInstance stateInstance = (StateInstance) context.getVariable(DomainConstants.VAR_NAME_STATE_INST);      Object result;     try {         /**      * 這里的input是我們在JSON中定義的,比如orderSave這個ServiceTask,input如下:      * 0 = "1608714480316"      * 1 = {Order@11271} "Order(id=null, userId=1, productId=1, count=1, payAmount=50, status=null)"      * JSON中定義如下:      * "Input": [          *     "$.[businessKey]",          *     "$.[order]"          * ]      */         List<Object> input = (List<Object>) context.getVariable(DomainConstants.VAR_NAME_INPUT_PARAMS);          //Set the current task execution status to RU (Running)         stateInstance.setStatus(ExecutionStatus.RU);//設置狀態(tài)          if (state instanceof CompensateSubStateMachineState) {             //省略子狀態(tài)機的研究         } else {             StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable(                     DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);             //這里的state.getServiceType是springBean             ServiceInvoker serviceInvoker = stateMachineConfig.getServiceInvokerManager().getServiceInvoker(                     state.getServiceType());             if (serviceInvoker == null) {                 throw new EngineExecutionException("No such ServiceInvoker[" + state.getServiceType() + "]",                         FrameworkErrorCode.ObjectNotExists);             }             if (serviceInvoker instanceof ApplicationContextAware) {                 ((ApplicationContextAware) serviceInvoker).setApplicationContext(                         stateMachineConfig.getApplicationContext());             }             //這里觸發(fā)了我們在JSON中定義ServiceTask中方法,比如orderSave中的saveOrder方法             result = serviceInvoker.invoke(state, input.toArray());         }          if (LOGGER.isDebugEnabled()) {             LOGGER.debug("<<<<<<<<<<<<<<<<<<<<<< State[{}], ServiceName[{}], Method[{}] Execute finish. result: {}",                     state.getName(), serviceName, methodName, result);         }     //省略部分代碼      }    //省略異常處理代碼 }

可以看到,process這個方法是一個核心的業(yè)務處理,它用發(fā)射觸發(fā)了JSON中定義ServiceTask的方法,并且根據(jù)狀態(tài)觸發(fā)了Next對象,即流程中的下一個ServiceTask。

第四步,我們再看一下CustomizeBusinessProcessor的route方法,代碼如下:

public void route(ProcessContext context) throws FrameworkException {      //code = "STATE_LANG"     //message = "SEATA State Language"     //name = "STATE_LANG"     //ordinal = 0     ProcessType processType = matchProcessType(context);      RouterHandler router = routerHandlers.get(processType.getCode());     //DefaultRouterHandler的route方法     router.route(context); }

我們看一下DefaultRouterHandler的route方法,代碼如下:

public void route(ProcessContext context) throws FrameworkException {      try {         ProcessType processType = matchProcessType(context);         //這里的processRouter是StateMachineProcessRouter         ProcessRouter processRouter = processRouters.get(processType.getCode());         Instruction instruction = processRouter.route(context);         if (instruction == null) {             LOGGER.info("route instruction is null, process end");         } else {             context.setInstruction(instruction);              eventPublisher.publish(context);         }     } catch (FrameworkException e) {         throw e;     } catch (Exception ex) {         throw new FrameworkException(ex, ex.getMessage(), FrameworkErrorCode.UnknownAppError);     } }

看一下StateMachineProcessRouter的route方法,這里也是用了代理模式,代碼如下:

public Instruction route(ProcessContext context) throws FrameworkException {      StateInstruction stateInstruction = context.getInstruction(StateInstruction.class);      State state;     if (stateInstruction.getTemporaryState() != null) {         state = stateInstruction.getTemporaryState();         stateInstruction.setTemporaryState(null);     } else {       //走這個分支         StateMachineConfig stateMachineConfig = (StateMachineConfig)context.getVariable(             DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);         StateMachine stateMachine = stateMachineConfig.getStateMachineRepository().getStateMachine(             stateInstruction.getStateMachineName(), stateInstruction.getTenantId());         state = stateMachine.getStates().get(stateInstruction.getStateName());     }      String stateType = state.getType();      StateRouter router = stateRouters.get(stateType);      Instruction instruction = null;      List<StateRouterInterceptor> interceptors = null;     if (router instanceof InterceptableStateRouter) {       //這里只有EndStateRouter         interceptors = ((InterceptableStateRouter)router).getInterceptors();//EndStateRouterInterceptor     }      List<StateRouterInterceptor> executedInterceptors = null;     Exception exception = null;     try {         //前置增量實現(xiàn)方法是空,這里省略代碼         instruction = router.route(context, state);      } catch (Exception e) {         exception = e;         throw e;     } finally {          if (executedInterceptors != null && executedInterceptors.size() > 0) {             for (int i = executedInterceptors.size() - 1; i >= 0; i--) {                 StateRouterInterceptor interceptor = executedInterceptors.get(i);                 interceptor.postRoute(context, state, instruction, exception);//結束狀態(tài)機             }         }          //if 'Succeed' or 'Fail' State did not configured, we must end the state machine         if (instruction == null && !stateInstruction.isEnd()) {             EngineUtils.endStateMachine(context);         }     }      return instruction; }

這里的代理只實現(xiàn)了一個后置增強,做的事情就是結束狀態(tài)機。

下面我們來看一下StateRouter,UML類圖如下:

Saga模式源碼方法教程

從UML類圖我們看到,除了EndStateRouter,只有一個TaskStateRouter了。而EndStateRouter并沒有做什么事情,因為關閉狀態(tài)機的邏輯已經由代理做了。這里我們看一下TaskStateRouter,代碼如下:

public Instruction route(ProcessContext context, State state) throws EngineExecutionException {      StateInstruction stateInstruction = context.getInstruction(StateInstruction.class);     if (stateInstruction.isEnd()) {       //如果已經結束,直接返回         //省略代碼     }      //The current CompensationTriggerState can mark the compensation process is started and perform compensation     // route processing.     State compensationTriggerState = (State)context.getVariable(         DomainConstants.VAR_NAME_CURRENT_COMPEN_TRIGGER_STATE);     if (compensationTriggerState != null) {       //加入補償集合進行補償并返回         return compensateRoute(context, compensationTriggerState);     }      //There is an exception route, indicating that an exception is thrown, and the exception route is prioritized.     String next = (String)context.getVariable(DomainConstants.VAR_NAME_CURRENT_EXCEPTION_ROUTE);      if (StringUtils.hasLength(next)) {         context.removeVariable(DomainConstants.VAR_NAME_CURRENT_EXCEPTION_ROUTE);     } else {         next = state.getNext();     }      //If next is empty, the state selected by the Choice state was taken.     if (!StringUtils.hasLength(next) && context.hasVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE)) {         next = (String)context.getVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE);         context.removeVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE);     }     //從當前context中取不出下一個節(jié)點了,直接返回     if (!StringUtils.hasLength(next)) {         return null;     }      StateMachine stateMachine = state.getStateMachine();      State nextState = stateMachine.getState(next);     if (nextState == null) {         throw new EngineExecutionException("Next state[" + next + "] is not exits",             FrameworkErrorCode.ObjectNotExists);     }     //獲取到下一個要流轉的狀態(tài)并且賦值給stateInstruction     stateInstruction.setStateName(next);      return stateInstruction; }

可以看到,route的作用是幫狀態(tài)機確定下一個流程節(jié)點,然后放入到當前的context中的stateInstruction。

到這里,我們就分析完成了狀態(tài)機的原理,ProcessControllerImpl類中。

需要注意的是,這里獲取到下一個節(jié)點后,并沒有直接處理,而是使用觀察者模式,先發(fā)送到EventBus,等待觀察者來處理,循環(huán)往復,直到EndStateRouter結束狀態(tài)機。

這里觀察者模式的Event是ProcessContext,里面包含了Instruction,而Instruction里面包含了State,這個State里面就決定了下一個處理的節(jié)點直到結束。UML類圖如下:

Saga模式源碼方法教程

總結

seata中間件中的saga模式使用比較廣泛,但是代碼還是比較復雜的。我從下面幾個方面進行了梳理:

  • 我們定義的json文件加載到了類StateMachineImpl中。

  • 啟動狀態(tài)機,我們也就啟動了全局事務,這個普通模式啟動全局事務是一樣的,都會向TC發(fā)送消息。

  • 處理狀態(tài)機狀態(tài)和控制狀態(tài)流轉的入口類在ProcessControllerImpl,從process方法可以跟代碼。

  • ProcessControllerImpl調用CustomizeBusinessProcessor的process處理當前狀態(tài),然后調用route方法獲取到下一個節(jié)點并發(fā)送到EventBus。

saga模式額外引入了3張表,我們也可以根據(jù)跟全局事務和分支事務相關的2張表來跟蹤代碼,我之前給出的demo,如果事務成功,這2張表的寫sql按照狀態(tài)機執(zhí)行順序給出一個成功sql,代碼如下:

INSERT INTO seata_state_machine_inst (id, machine_id, tenant_id, parent_id, gmt_started, business_key, start_params, is_running, status, gmt_updated) VALUES ('192.168.59.146:8091:65853497147990016', '06a098cab53241ca7ed09433342e9f07', '000001', null, '2020-10-31 17:18:24.773', '1604135904773', '{"@type":"java.util.HashMap","money":50.,"productId":1L,"_business_key_":"1604135904773","businessKey":"1604135904773",\"count\":1,\"mockreduceaccountfail\":\"true\","userId":1L,"order":{"@type":"io.seata.sample.entity.Order","count":1,"payAmount":50,"productId":1,"userId":1}}', 1, 'RU', '2020-10-31 17:18:24.773')  INSERT INTO seata_state_inst (id, machine_inst_id, name, type, gmt_started, service_name, service_method, service_type, is_for_update, input_params, status, business_key, state_id_compensated_for, state_id_retried_for) VALUES ('4fe5f602452c84ba5e88fd2ee9c13b35', '192.168.59.146:8091:65853497147990016', 'SaveOrder', 'ServiceTask', '2020-10-31 17:18:40.84', 'orderSave', 'saveOrder', null, 1, '["1604135904773",{"@type":"io.seata.sample.entity.Order","count":1,"payAmount":50,"productId":1,"userId":1}]', 'RU', null, null, null)  UPDATE seata_state_inst SET gmt_end = '2020-10-31 17:18:49.919', excep = null, status = 'SU', output_params = 'true' WHERE id = '4fe5f602452c84ba5e88fd2ee9c13b35' AND machine_inst_id = '192.168.59.146:8091:65853497147990016'  INSERT INTO seata_state_inst (id, machine_inst_id, name, type, gmt_started, service_name, service_method, service_type, is_for_update, input_params, status, business_key, state_id_compensated_for, state_id_retried_for) VALUES ('8371235cb2c66c8626e148f66123d3b4', '192.168.59.146:8091:65853497147990016', 'ReduceAccount', 'ServiceTask', '2020-10-31 17:19:00.441', 'accountService', 'decrease', null, 1, '["1604135904773",1L,50.,{"@type":"java.util.LinkedHashMap","throwException":"true"}]', 'RU', null, null, null)  UPDATE seata_state_inst SET gmt_end = '2020-10-31 17:19:09.593', excep = null, status = 'SU', output_params = 'true' WHERE id = '8371235cb2c66c8626e148f66123d3b4' AND machine_inst_id = '192.168.59.146:8091:65853497147990016'  INSERT INTO seata_state_inst (id, machine_inst_id, name, type, gmt_started, service_name, service_method, service_type, is_for_update, input_params, status, business_key, state_id_compensated_for, state_id_retried_for) VALUES ('e70a49f1eac72f929085f4e82c2b4de2', '192.168.59.146:8091:65853497147990016', 'ReduceStorage', 'ServiceTask', '2020-10-31 17:19:18.494', 'storageService', 'decrease', null, 1, '["1604135904773",1L,1,{"@type":"java.util.LinkedHashMap"}]', 'RU', null, null, null)  UPDATE seata_state_inst SET gmt_end = '2020-10-31 17:19:26.613', excep = null, status = 'SU', output_params = 'true' WHERE id = 'e70a49f1eac72f929085f4e82c2b4de2' AND machine_inst_id = '192.168.59.146:8091:65853497147990016'  UPDATE seata_state_machine_inst SET gmt_end = '2020-10-31 17:19:33.581', excep = null, end_params = '{"@type":"java.util.HashMap","productId":1L,"count":1,"ReduceAccountResult":true,"mockReduceAccountFail":"true","userId":1L,"money":50.,"SaveOrderResult":true,"_business_key_":"1604135904773","businessKey":"1604135904773","ReduceStorageResult":true,"order":{"@type":"io.seata.sample.entity.Order","count":1,"id":60,"payAmount":50,"productId":1,"userId":1}}',status = 'SU', compensation_status = null, is_running = 0, gmt_updated = '2020-10-31 17:19:33.582' WHERE id = '192.168.59.146:8091:65853497147990016' and gmt_updated = '2020-10-31 17:18:24.773'

到此,相信大家對“Saga模式源碼方法教程”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續(xù)學習!

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。

AI