您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關(guān)SpringBoot分布式事務(wù)中最大努力通知是怎樣的,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
環(huán)境:springboot.2.4.9 + RabbitMQ3.7.4
這是一個(gè)充值的案例
交互流程 :
1、賬戶(hù)系統(tǒng)調(diào)用充值系統(tǒng)接口。
2、充值系統(tǒng)完成支付向賬戶(hù)系統(tǒng)發(fā)起充值結(jié)果通知 若通知失敗,則充值系統(tǒng)按策略進(jìn)行重復(fù)通知。
3、賬戶(hù)系統(tǒng)接收到充值結(jié)果通知修改充值狀態(tài)。
4、賬戶(hù)系統(tǒng)未接收到通知會(huì)主動(dòng)調(diào)用充值系統(tǒng)的接口查詢(xún)充值結(jié)果。
通過(guò)上邊的例子我們總結(jié)最大努力通知方案的目標(biāo) : 發(fā)起通知方通過(guò)一定的機(jī)制最大努力將業(yè)務(wù)處理結(jié)果通知到接收方。 具體包括 :
1、有一定的消息重復(fù)通知機(jī)制。 因?yàn)榻邮胀ㄖ娇赡軟](méi)有接收到通知,此時(shí)要有一定的機(jī)制對(duì)消息重復(fù)通知。
2、消息校對(duì)機(jī)制。 如果盡最大努力也沒(méi)有通知到接收方,或者接收方消費(fèi)消息后要再次消費(fèi),此時(shí)可由接收方主動(dòng)向通知方查詢(xún)消息信息來(lái)滿(mǎn)足需求。
1、解決方案思想不同 可靠消息一致性,發(fā)起通知方需要保證將消息發(fā)出去,并且將消息發(fā)到接收通知方,消息的可靠性關(guān)鍵由發(fā)起通知方來(lái)保證。 最大努力通知,發(fā)起通知方盡最大的努力將業(yè)務(wù)處理結(jié)果通知為接收通知方,但是可能消息接收不到,此時(shí)需要接收通知方主動(dòng)調(diào)用發(fā)起通知方的接口查詢(xún)業(yè)務(wù)處理結(jié)果,通知的可靠性關(guān)鍵在接收通知方。
2、兩者的業(yè)務(wù)應(yīng)用場(chǎng)景不同 可靠消息一致性關(guān)注的是交易過(guò)程的事務(wù)一致,以異步的方式完成交易。 最大努力通知關(guān)注的是交易后的通知事務(wù),即將交易結(jié)果可靠的通知出去。
3、技術(shù)解決方向不同 可靠消息一致性要解決消息從發(fā)出到接收的一致性,即消息發(fā)出并且被接收到。 最大努力通知無(wú)法保證消息從發(fā)出到接收的一致性,只提供消息接收的可靠性機(jī)制。可靠機(jī)制是,最大努力地將消息通知給接收方,當(dāng)消息無(wú)法被接收方接收時(shí),由接收方主動(dòng)查詢(xún)消費(fèi)。
關(guān)于RabbitMQ相關(guān)文章《SpringBoot RabbitMQ消息可靠發(fā)送與接收 》,《RabbitMQ消息確認(rèn)機(jī)制confirm 》。
項(xiàng)目結(jié)構(gòu)
兩個(gè)子模塊users-mananger(賬戶(hù)模塊),pay-manager(支付模塊)
依賴(lài)
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency>
配置文件
server: port: 8080 --- spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / publisherConfirmType: correlated publisherReturns: true listener: simple: concurrency: 5 maxConcurrency: 10 prefetch: 5 acknowledgeMode: MANUAL retry: enabled: true initialInterval: 3000 maxAttempts: 3 defaultRequeueRejected: false
實(shí)體類(lèi)
記錄充值金額及賬戶(hù)信息
@Entity @Table(name = "t_pay_info") public class PayInfo implements Serializable{ @Id private Long id; private BigDecimal money ; private Long accountId ; }
DAO及Service
public interface PayInfoRepository extends JpaRepository<PayInfo, Long> { PayInfo findByOrderId(String orderId) ; }
@Service public class PayInfoService { @Resource private PayInfoRepository payInfoRepository ; @Resource private RabbitTemplate rabbitTemplate ; // 數(shù)據(jù)保存完后發(fā)送消息(這里發(fā)送消息可以應(yīng)用確認(rèn)模式或事物模式) @Transactional public PayInfo savePayInfo(PayInfo payInfo) { payInfo.setId(System.currentTimeMillis()) ; PayInfo result = payInfoRepository.save(payInfo) ; CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().replaceAll("-", "")) ; try { rabbitTemplate.convertAndSend("pay-exchange", "pay.#", new ObjectMapper().writeValueAsString(payInfo), correlationData) ; } catch (AmqpException | JsonProcessingException e) { e.printStackTrace(); } return result ; } public PayInfo queryByOrderId(String orderId) { return payInfoRepository.findByOrderId(orderId) ; } }
支付完成后發(fā)送消息。
Controller接口
@RestController @RequestMapping("/payInfos") public class PayInfoController { @Resource private PayInfoService payInfoService ; // 支付接口 @PostMapping("/pay") public Object pay(@RequestBody PayInfo payInfo) { payInfoService.savePayInfo(payInfo) ; return "支付已提交,等待結(jié)果" ; } @GetMapping("/queryPay") public Object queryPay(String orderId) { return payInfoService.queryByOrderId(orderId) ; } }
應(yīng)用配置
server: port: 8081 --- spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / publisherConfirmType: correlated publisherReturns: true listener: simple: concurrency: 5 maxConcurrency: 10 prefetch: 5 acknowledgeMode: MANUAL retry: enabled: true initialInterval: 3000 maxAttempts: 3 defaultRequeueRejected: false
實(shí)體類(lèi)
@Entity @Table(name = "t_users") public class Users { @Id private Long id; private String name ; private BigDecimal money ; }
賬戶(hù)信息表
@Entity @Table(name = "t_users_log") public class UsersLog { @Id private Long id; private String orderId ; // 0: 支付中,1:已支付,2:已取消 @Column(columnDefinition = "int default 0") private Integer status = 0 ; private BigDecimal money ; private Date createTime ; }
賬戶(hù)充值記錄表(去重)
DAO及Service
public interface UsersRepository extends JpaRepository<Users, Long> { } public interface UsersLogRepository extends JpaRepository<UsersLog, Long> { UsersLog findByOrderId(String orderId) ; }
Service類(lèi)
@Service public class UsersService { @Resource private UsersRepository usersRepository ; @Resource private UsersLogRepository usersLogRepository ; @Transactional public boolean updateMoneyAndLogStatus(Long id, String orderId) { UsersLog usersLog = usersLogRepository.findByOrderId(orderId) ; if (usersLog != null && 1 == usersLog.getStatus()) { throw new RuntimeException("已支付") ; } Users users = usersRepository.findById(id).orElse(null) ; if (users == null) { throw new RuntimeException("賬戶(hù)不存在") ; } users.setMoney(users.getMoney().add(usersLog.getMoney())) ; usersRepository.save(users) ; usersLog.setStatus(1) ; usersLogRepository.save(usersLog) ; return true ; } @Transactional public boolean saveLog(UsersLog usersLog) { usersLog.setId(System.currentTimeMillis()) ; usersLogRepository.save(usersLog) ; return true ; } }
消息監(jiān)聽(tīng)
@Component public class PayMessageListener { private static final Logger logger = LoggerFactory.getLogger(PayMessageListener.class) ; @Resource private UsersService usersService ; @SuppressWarnings("unchecked") @RabbitListener(queues = {"pay-queue"}) @RabbitHandler public void receive(Message message, Channel channel) { long deliveryTag = message.getMessageProperties().getDeliveryTag() ; byte[] buf = null ; try { buf = message.getBody() ; logger.info("接受到消息:{}", new String(buf, "UTF-8")) ; Map<String, Object> result = new JsonMapper().readValue(buf, Map.class) ; Long id = ((Integer) result.get("accountId")) + 0L ; String orderId = (String) result.get("orderId") ; usersService.updateMoneyAndLogStatus(id, orderId) ; channel.basicAck(deliveryTag, true) ; } catch (Exception e) { logger.error("消息接受出現(xiàn)異常:{}, 異常消息:{}", e.getMessage(), new String(buf, Charset.forName("UTF-8"))) ; e.printStackTrace() ; try { // 應(yīng)該將這類(lèi)異常的消息放入死信隊(duì)列中,以便人工排查。 channel.basicReject(deliveryTag, false); } catch (IOException e1) { logger.error("拒絕消息重入隊(duì)列異常:{}", e1.getMessage()) ; e1.printStackTrace(); } } } }
Controller接口
@RestController @RequestMapping("/users") public class UsersController { @Resource private RestTemplate restTemplate ; @Resource private UsersService usersService ; @PostMapping("/pay") public Object pay(Long id, BigDecimal money) throws Exception { HttpHeaders headers = new HttpHeaders() ; headers.setContentType(MediaType.APPLICATION_JSON) ; String orderId = UUID.randomUUID().toString().replaceAll("-", "") ; Map<String, String> params = new HashMap<>() ; params.put("accountId", String.valueOf(id)) ; params.put("orderId", orderId) ; params.put("money", money.toString()) ; UsersLog usersLog = new UsersLog() ; usersLog.setCreateTime(new Date()) ; usersLog.setOrderId(orderId); usersLog.setMoney(money) ; usersLog.setStatus(0) ; usersService.saveLog(usersLog) ; HttpEntity<String> requestEntity = new HttpEntity<String>(new ObjectMapper().writeValueAsString(params), headers) ; return restTemplate.postForObject("http://localhost:8080/payInfos/pay", requestEntity, String.class) ; } }
以上是兩個(gè)子模塊的所有代碼了
初始數(shù)據(jù)
賬戶(hù)子模塊控制臺(tái)
支付子模塊控制臺(tái)
數(shù)據(jù)表數(shù)據(jù)
看完上述內(nèi)容,你們對(duì)SpringBoot分布式事務(wù)中最大努力通知是怎樣的有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。