溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

REST微服務(wù)中怎么利用消息中間件實(shí)現(xiàn)分布式事務(wù)

發(fā)布時(shí)間:2021-08-09 11:54:53 來(lái)源:億速云 閱讀:116 作者:Leah 欄目:大數(shù)據(jù)

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)?lái)有關(guān)REST微服務(wù)中怎么利用消息中間件實(shí)現(xiàn)分布式事務(wù),文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

我們還是使用之前的實(shí)例,一個(gè)訂票系統(tǒng)的購(gòu)票邏輯:

REST微服務(wù)中怎么利用消息中間件實(shí)現(xiàn)分布式事務(wù)

這篇教程的源代碼可以從github上獲取。

使用消息中間件實(shí)現(xiàn)分布式事務(wù),也就是使用事件驅(qū)動(dòng)實(shí)現(xiàn)。在這種方式下,Order服務(wù)不會(huì)直接調(diào)用User服務(wù),而是往MQ上發(fā)一個(gè)消息,說(shuō)明有新訂單需要扣費(fèi);User服務(wù)會(huì)響應(yīng)這個(gè)消息,并處理,處理完成后再發(fā)一個(gè)消息,說(shuō)明有新訂單需要轉(zhuǎn)移票;然后就會(huì)有Ticket服務(wù)來(lái)處理。而每個(gè)服務(wù)都是在一個(gè)事務(wù)里面處理讀消息、處理業(yè)務(wù)、寫消息的事情。大致流程如下:

REST微服務(wù)中怎么利用消息中間件實(shí)現(xiàn)分布式事務(wù)

在這種方式下,訂單的處理是異步的,用戶發(fā)起一個(gè)訂單的時(shí)候,只是生成一個(gè)正在處理的訂單,然后通過(guò)消息中間件一步步的進(jìn)行扣費(fèi)、交票、完成訂單等邏輯。而每一個(gè)服務(wù)中相應(yīng)的操作,基本都是:

  1. 從一個(gè)隊(duì)列中讀取消息

  2. 操作相應(yīng)數(shù)據(jù)庫(kù)操作

  3. 往下一個(gè)隊(duì)列中發(fā)送消息

也就是說(shuō),需要在這個(gè)方法中需要操作數(shù)據(jù)庫(kù)和MQ兩個(gè)資源,這正好是上一篇文章中介紹Spring內(nèi)部事務(wù)和外部事務(wù)時(shí)使用的實(shí)例中的場(chǎng)景。下面就是大致的代碼:

12345678
@JmsListener(destination = "order:new", containerFactory = "orderFactory")@Transactionalpublic void create(OrderDTO orderDTO) {Order order = new Order(orderDTO);order.setStatus("PENDING");orderRepository.save(order);jmsTemplate.convertAndSend("order:need_to_pay", order);}

它監(jiān)聽(tīng)MQ的”order:new”隊(duì)列,處理訂單,往”order:need_to_pay”發(fā)送一個(gè)消息。然后用戶服務(wù)就會(huì)接收這個(gè)消息,觸發(fā)扣費(fèi)流程。

在這個(gè)地方,我們可以使用JTA事務(wù),來(lái)使用兩階段提交來(lái)實(shí)現(xiàn)兩個(gè)資源的共同提交,但是這會(huì)影響系統(tǒng)的性能。而且,還需要使用的消息中間件實(shí)現(xiàn)了XA的規(guī)范,提供兩階段提交的功能。
這里也可以使用本地事務(wù),這時(shí),每個(gè)事物都會(huì)有一個(gè)JMS的Session,并使用事務(wù)。如此一來(lái),就存在一個(gè)數(shù)據(jù)庫(kù)的事物和一個(gè)JMS的事務(wù),兩個(gè)事務(wù)是相互獨(dú)立并依次提交的。這樣,就有可能在極少數(shù)情況下出錯(cuò),但是也能采取一些錯(cuò)誤來(lái)盡量解決。我們對(duì)上面的事務(wù)處理展開(kāi)(偽代碼,只是為了說(shuō)明處理過(guò)程),來(lái)看看出錯(cuò)的情況以及該如何處理:

1234567891011
jmsTransaction.begin(); // get transactions from jms sessiondbTransaction.begin(); // get transactions from JDBC connectiontry {orderRepository.save(order);jmsTemplate.convertAndSend("order:need_to_pay", order);dbTransaction.commit();jmsTransaction.commit();} catch(Exception e) {dbTransaction.rollback();jmsTransaction.rollback();}

在上面的方法中,只要發(fā)生了錯(cuò)誤,MQ消息的消費(fèi)就算失敗,MQ的監(jiān)聽(tīng)器就會(huì)重新觸發(fā)一次這個(gè)方法。
這時(shí),如果錯(cuò)誤發(fā)生在:

  1. 數(shù)據(jù)提交時(shí)或之前。這時(shí),整個(gè)數(shù)據(jù)庫(kù)的操作都會(huì)被重置(也可能就根本還沒(méi)更新),重試的時(shí)候不需要考慮重復(fù)提交的問(wèn)題,因?yàn)橹暗奶峤欢家呀?jīng)被回滾。

  2. 數(shù)據(jù)庫(kù)提交成功,但是JMS提交失敗。這時(shí)就需要防止重復(fù)提交來(lái)避免數(shù)據(jù)庫(kù)的重復(fù)操作。

我們可以采用之前說(shuō)過(guò)的token方式,在調(diào)用這個(gè)方法前,生成一個(gè)唯一的token。這里使用Java的UUID生成一個(gè)ID作為token。(如果這里的重復(fù)調(diào)用只是在這個(gè)服務(wù)內(nèi)部重新觸發(fā),就不需要考慮分布式系統(tǒng)的全局一致性ID的問(wèn)題。這需要根據(jù)實(shí)際情況來(lái)判斷用什么樣的UUID生成方式)所以,Controller里面接受購(gòu)票請(qǐng)求如下:

1234567
@PostMapping(value = "/")@Transactionalpublic void create(@RequestBody OrderDTO orderDTO) {String uid = UUID.randomUUID().toString();orderDTO.setToken(uid);jmsTemplate.convertAndSend("order:new", orderDTO);}

然后在Service里面監(jiān)聽(tīng)這個(gè)隊(duì)列,處理購(gòu)票:

123456789101112131415
@JmsListener(destination = "order:new", containerFactory = "orderFactory")@Transactionalpublic void create(OrderDTO orderDTO) {if (!this.processedUIDs.contains(orderDTO.getToken())) {Order order = new Order(orderDTO);order.setStatus("PENDING");orderRepository.save(order);orderDTO.setStatus(order.getStatus());orderDTO.setId(order.getId());} else {LOG.info("Duplicate jms message:{}", orderDTO);}jmsTemplate.convertAndSend("order:need_to_pay", orderDTO);processedUIDs.add(orderDTO.getToken());}

簡(jiǎn)單來(lái)說(shuō),解決辦法就是,如果是重復(fù)觸發(fā)的,就略過(guò)數(shù)據(jù)庫(kù)相關(guān)的處理,直接往MQ的目標(biāo)隊(duì)列發(fā)送需要的數(shù)據(jù)。使得整個(gè)流程能夠往下走。

剛才說(shuō)的是在一個(gè)服務(wù)內(nèi)出錯(cuò)的情況,還有一種錯(cuò)誤情況是,訂單服務(wù)和用戶服務(wù)已經(jīng)處理完訂單創(chuàng)建和扣費(fèi)的操作,然后到了Ticket服務(wù)的時(shí)候,卻發(fā)現(xiàn)沒(méi)有票了。雖然我們可以通過(guò)合理的設(shè)計(jì)業(yè)務(wù)邏輯來(lái)避免這種問(wèn)題,例如,在操作之前先檢查用戶余額,檢查并鎖票,然后進(jìn)行操作數(shù)據(jù)的事情。但是,在有些情況下,很難通過(guò)業(yè)務(wù)流程的設(shè)計(jì)來(lái)完全避免這種問(wèn)題。如果出現(xiàn)了這種的問(wèn)題,我們也可以通過(guò)撤銷的流程來(lái)實(shí)現(xiàn),業(yè)務(wù)流程如下:
REST微服務(wù)中怎么利用消息中間件實(shí)現(xiàn)分布式事務(wù)

在上面的解決方案中,使用JDK的UUID類生成一個(gè)ID,實(shí)際上這個(gè)ID只是在當(dāng)前的JVM內(nèi),才能夠保證是唯一的。其次,在JMS的標(biāo)準(zhǔn)中,沒(méi)有規(guī)定一個(gè)消息的Listener在讀取一個(gè)消息失敗后,重新讀取的問(wèn)題。在微服務(wù)環(huán)境中,如果一個(gè)應(yīng)用部署了多個(gè)實(shí)例,那個(gè)這個(gè)消息有可能會(huì)被另一個(gè)實(shí)例讀到。所以在上面的方案中,使用JVM內(nèi)的唯一ID放在消息的內(nèi)容中,它有可能被任意一個(gè)實(shí)例處理,處理失敗后,又有可能被另一個(gè)實(shí)例處理。這就會(huì)出問(wèn)題。所以我們需要一個(gè)分布式環(huán)境下的生成唯一ID的解決辦法。例如,先獲得JVM的唯一ID以后,再加上IP+端口等信息。而且,對(duì)已經(jīng)處理過(guò)的ID的緩存,也需要存在分布式環(huán)境中。

所以,我們完全可以不使用兩階段提交,就實(shí)現(xiàn)微服務(wù)架構(gòu)下的分布式事務(wù)。使用這種方式,它的優(yōu)點(diǎn)是:

  1. 實(shí)現(xiàn)簡(jiǎn)單。結(jié)合Spring的事務(wù),幾乎不用寫額外的事務(wù)相關(guān)的代碼,就能夠?qū)崿F(xiàn)。我們只需要更好的服務(wù)的拆分和設(shè)計(jì)業(yè)務(wù)流程。

  2. 系統(tǒng)吞吐量高。因?yàn)閿?shù)據(jù)庫(kù)或MQ不會(huì)被長(zhǎng)期的鎖住,可以并發(fā)的處理更多的事務(wù)。

  3. 容錯(cuò)性好。各個(gè)服務(wù)之間通過(guò)MQ來(lái)觸發(fā)協(xié)調(diào),即使在處理一個(gè)任務(wù)的時(shí)候有一個(gè)服務(wù)停了,消息還會(huì)一直保持,直到服務(wù)起來(lái)開(kāi)始監(jiān)聽(tīng),然后繼續(xù)觸發(fā)這個(gè)任務(wù)。

當(dāng)然這種方式也有一些缺點(diǎn),最大的問(wèn)題就是異步處理的問(wèn)題。用戶發(fā)出一個(gè)請(qǐng)求后,處理該業(yè)務(wù)的服務(wù)只是簡(jiǎn)單處理,往MQ發(fā)送消息開(kāi)始處理流程,然后就返回了。這時(shí)候這個(gè)任務(wù)還在處理。雖然有時(shí)候我們可以通過(guò)等的方式,等待最終處理完成的消息,然后在返回給用戶。但是這樣又得考慮響應(yīng)時(shí)間、超時(shí)、各種錯(cuò)誤等情況。

有些人會(huì)覺(jué)得這種方式使得開(kāi)發(fā)和調(diào)試都變得復(fù)雜,在我看來(lái),恰恰相反,這使得開(kāi)發(fā)和調(diào)試都簡(jiǎn)單了。首先,根據(jù)微服務(wù)架構(gòu)的設(shè)計(jì)原則,就是每個(gè)服務(wù)只負(fù)責(zé)一個(gè)功能模塊;再者,根據(jù)面向?qū)ο蟮脑O(shè)計(jì)原則,一個(gè)方法只做一件事情。如果我們能夠合理的拆分服務(wù),和每一步的處理方法,這正是一個(gè)好的設(shè)計(jì)。在維護(hù)的時(shí)候,每個(gè)方法、每個(gè)步驟做什么事情,都很清楚。

說(shuō)到調(diào)試,我的原則是,你應(yīng)該通過(guò)單元測(cè)試來(lái)發(fā)現(xiàn)和解決問(wèn)題,而不是調(diào)試。以上面的購(gòu)票流程為例,每一個(gè)服務(wù),通過(guò)MQ觸發(fā)一個(gè)方法的時(shí)候,它的參數(shù)應(yīng)該是什么、狀態(tài)是什么都應(yīng)該是明確的,這個(gè)方法執(zhí)行完成后,會(huì)產(chǎn)生什么新的數(shù)據(jù),狀態(tài)會(huì)更新成什么,都應(yīng)該是明確的。而這些都可以通過(guò)單元測(cè)試來(lái)很好的測(cè)試。如果你的復(fù)雜流程中的每一個(gè)都能通過(guò)單元測(cè)試進(jìn)行完善的測(cè)試,那么這些方法串聯(lián)到一起,不但能夠很好的工作,也能應(yīng)付各種異常的情況。

上述就是小編為大家分享的REST微服務(wù)中怎么利用消息中間件實(shí)現(xiàn)分布式事務(wù)了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI