溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

SpringBoot分布式事務(wù)中最大努力通知是怎樣的

發(fā)布時(shí)間:2021-09-29 14:11:05 來(lái)源:億速云 閱讀:188 作者:柒染 欄目:web開(kāi)發(fā)

今天就跟大家聊聊有關(guān)SpringBoot分布式事務(wù)中最大努力通知是怎樣的,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

環(huán)境:springboot.2.4.9 + RabbitMQ3.7.4

什么是最大努力通知

這是一個(gè)充值的案例

SpringBoot分布式事務(wù)中最大努力通知是怎樣的

交互流程 :

1、賬戶(hù)系統(tǒng)調(diào)用充值系統(tǒng)接口。

2、充值系統(tǒng)完成支付向賬戶(hù)系統(tǒng)發(fā)起充值結(jié)果通知 若通知失敗,則充值系統(tǒng)按策略進(jìn)行重復(fù)通知。

3、賬戶(hù)系統(tǒng)接收到充值結(jié)果通知修改充值狀態(tài)。

4、賬戶(hù)系統(tǒng)未接收到通知會(huì)主動(dòng)調(diào)用充值系統(tǒng)的接口查詢(xún)充值結(jié)果。

通過(guò)上邊的例子我們總結(jié)最大努力通知方案的目標(biāo) : 發(fā)起通知方通過(guò)一定的機(jī)制最大努力將業(yè)務(wù)處理結(jié)果通知到接收方。 具體包括 :

1、有一定的消息重復(fù)通知機(jī)制。 因?yàn)榻邮胀ㄖ娇赡軟](méi)有接收到通知,此時(shí)要有一定的機(jī)制對(duì)消息重復(fù)通知。

2、消息校對(duì)機(jī)制。 如果盡最大努力也沒(méi)有通知到接收方,或者接收方消費(fèi)消息后要再次消費(fèi),此時(shí)可由接收方主動(dòng)向通知方查詢(xún)消息信息來(lái)滿(mǎn)足需求。

最大努力通知與可靠消息一致性有什么不同?

1、解決方案思想不同 可靠消息一致性,發(fā)起通知方需要保證將消息發(fā)出去,并且將消息發(fā)到接收通知方,消息的可靠性關(guān)鍵由發(fā)起通知方來(lái)保證。  最大努力通知,發(fā)起通知方盡最大的努力將業(yè)務(wù)處理結(jié)果通知為接收通知方,但是可能消息接收不到,此時(shí)需要接收通知方主動(dòng)調(diào)用發(fā)起通知方的接口查詢(xún)業(yè)務(wù)處理結(jié)果,通知的可靠性關(guān)鍵在接收通知方。

2、兩者的業(yè)務(wù)應(yīng)用場(chǎng)景不同 可靠消息一致性關(guān)注的是交易過(guò)程的事務(wù)一致,以異步的方式完成交易。  最大努力通知關(guān)注的是交易后的通知事務(wù),即將交易結(jié)果可靠的通知出去。

3、技術(shù)解決方向不同 可靠消息一致性要解決消息從發(fā)出到接收的一致性,即消息發(fā)出并且被接收到。  最大努力通知無(wú)法保證消息從發(fā)出到接收的一致性,只提供消息接收的可靠性機(jī)制。可靠機(jī)制是,最大努力地將消息通知給接收方,當(dāng)消息無(wú)法被接收方接收時(shí),由接收方主動(dòng)查詢(xún)消費(fèi)。

通過(guò)RabbitMQ實(shí)現(xiàn)最大努力通知

關(guān)于RabbitMQ相關(guān)文章《SpringBoot RabbitMQ消息可靠發(fā)送與接收 》,《RabbitMQ消息確認(rèn)機(jī)制confirm 》。

項(xiàng)目結(jié)構(gòu)

SpringBoot分布式事務(wù)中最大努力通知是怎樣的

兩個(gè)子模塊users-mananger(賬戶(hù)模塊),pay-manager(支付模塊)

依賴(lài)

<dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency>  <groupId>mysql</groupId>  <artifactId>mysql-connector-java</artifactId>  <scope>runtime</scope> </dependency>

子模塊pay-manager

配置文件

server:   port: 8080 --- spring:   rabbitmq:     host: localhost     port: 5672     username: guest     password: guest     virtual-host: /     publisherConfirmType: correlated     publisherReturns: true     listener:       simple:         concurrency: 5         maxConcurrency: 10         prefetch: 5         acknowledgeMode: MANUAL         retry:           enabled: true           initialInterval: 3000           maxAttempts: 3         defaultRequeueRejected: false

實(shí)體類(lèi)

記錄充值金額及賬戶(hù)信息

@Entity @Table(name = "t_pay_info") public class PayInfo implements Serializable{  @Id  private Long id;  private BigDecimal money ;  private Long accountId ; }

DAO及Service

public interface PayInfoRepository extends JpaRepository<PayInfo, Long> {  PayInfo findByOrderId(String orderId) ; }
@Service public class PayInfoService {          @Resource     private PayInfoRepository payInfoRepository ;     @Resource     private RabbitTemplate rabbitTemplate ;        // 數(shù)據(jù)保存完后發(fā)送消息(這里發(fā)送消息可以應(yīng)用確認(rèn)模式或事物模式)     @Transactional     public PayInfo savePayInfo(PayInfo payInfo) {         payInfo.setId(System.currentTimeMillis()) ;         PayInfo result = payInfoRepository.save(payInfo) ;         CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString().replaceAll("-", "")) ;         try {             rabbitTemplate.convertAndSend("pay-exchange", "pay.#", new ObjectMapper().writeValueAsString(payInfo), correlationData) ;         } catch (AmqpException | JsonProcessingException e) {             e.printStackTrace();         }         return result ;     }          public PayInfo queryByOrderId(String orderId) {         return payInfoRepository.findByOrderId(orderId) ;     }      }

支付完成后發(fā)送消息。

Controller接口

@RestController @RequestMapping("/payInfos") public class PayInfoController {  @Resource  private PayInfoService payInfoService ;        // 支付接口  @PostMapping("/pay")  public Object pay(@RequestBody PayInfo payInfo) {   payInfoService.savePayInfo(payInfo) ;   return "支付已提交,等待結(jié)果" ;  }       @GetMapping("/queryPay")  public Object queryPay(String orderId) {   return payInfoService.queryByOrderId(orderId) ;  }      }

子模塊users-manager

應(yīng)用配置

server:   port: 8081 --- spring:   rabbitmq:     host: localhost     port: 5672     username: guest     password: guest     virtual-host: /     publisherConfirmType: correlated     publisherReturns: true     listener:       simple:         concurrency: 5         maxConcurrency: 10         prefetch: 5         acknowledgeMode: MANUAL         retry:           enabled: true           initialInterval: 3000           maxAttempts: 3         defaultRequeueRejected: false

實(shí)體類(lèi)

@Entity @Table(name = "t_users") public class Users {  @Id  private Long id;  private String name ;  private BigDecimal money ; }

賬戶(hù)信息表

@Entity @Table(name = "t_users_log") public class UsersLog {  @Id  private Long id;  private String orderId ;  // 0: 支付中,1:已支付,2:已取消  @Column(columnDefinition = "int default 0")  private Integer status = 0 ;  private BigDecimal money ;  private Date createTime ; }

賬戶(hù)充值記錄表(去重)

DAO及Service

public interface UsersRepository extends JpaRepository<Users, Long> { } public interface UsersLogRepository extends JpaRepository<UsersLog, Long> {  UsersLog findByOrderId(String orderId) ; }

Service類(lèi)

@Service public class UsersService {      @Resource     private UsersRepository usersRepository ;     @Resource     private UsersLogRepository usersLogRepository ;       @Transactional  public boolean updateMoneyAndLogStatus(Long id, String orderId) {   UsersLog usersLog = usersLogRepository.findByOrderId(orderId) ;   if (usersLog != null && 1 == usersLog.getStatus()) {    throw new RuntimeException("已支付") ;   }   Users users = usersRepository.findById(id).orElse(null) ;   if (users == null) {    throw new RuntimeException("賬戶(hù)不存在") ;   }   users.setMoney(users.getMoney().add(usersLog.getMoney())) ;   usersRepository.save(users) ;   usersLog.setStatus(1) ;   usersLogRepository.save(usersLog) ;   return true ;  }       @Transactional  public boolean saveLog(UsersLog usersLog) {   usersLog.setId(System.currentTimeMillis()) ;   usersLogRepository.save(usersLog) ;   return true ;  } }

消息監(jiān)聽(tīng)

@Component public class PayMessageListener {       private static final Logger logger = LoggerFactory.getLogger(PayMessageListener.class) ;       @Resource  private  UsersService usersService ;       @SuppressWarnings("unchecked")  @RabbitListener(queues = {"pay-queue"})  @RabbitHandler  public void receive(Message message, Channel channel) {   long deliveryTag = message.getMessageProperties().getDeliveryTag() ;   byte[] buf =  null ;   try {    buf = message.getBody() ;    logger.info("接受到消息:{}", new String(buf, "UTF-8")) ;    Map<String, Object> result = new JsonMapper().readValue(buf, Map.class) ;    Long id = ((Integer) result.get("accountId")) + 0L ;    String orderId = (String) result.get("orderId") ;    usersService.updateMoneyAndLogStatus(id, orderId) ;    channel.basicAck(deliveryTag, true) ;   } catch (Exception e) {    logger.error("消息接受出現(xiàn)異常:{}, 異常消息:{}", e.getMessage(), new String(buf, Charset.forName("UTF-8"))) ;    e.printStackTrace() ;    try {     // 應(yīng)該將這類(lèi)異常的消息放入死信隊(duì)列中,以便人工排查。     channel.basicReject(deliveryTag, false);    } catch (IOException e1) {     logger.error("拒絕消息重入隊(duì)列異常:{}", e1.getMessage()) ;     e1.printStackTrace();    }   }  } }

Controller接口

@RestController @RequestMapping("/users") public class UsersController {          @Resource     private RestTemplate restTemplate ;     @Resource     private UsersService usersService ;          @PostMapping("/pay")     public Object pay(Long id, BigDecimal money) throws Exception {         HttpHeaders headers = new HttpHeaders() ;         headers.setContentType(MediaType.APPLICATION_JSON) ;         String orderId = UUID.randomUUID().toString().replaceAll("-", "") ;         Map<String, String> params = new HashMap<>() ;         params.put("accountId", String.valueOf(id)) ;         params.put("orderId", orderId) ;         params.put("money", money.toString()) ;                  UsersLog usersLog = new UsersLog() ;         usersLog.setCreateTime(new Date()) ;         usersLog.setOrderId(orderId);         usersLog.setMoney(money) ;         usersLog.setStatus(0) ;         usersService.saveLog(usersLog) ;         HttpEntity<String> requestEntity = new HttpEntity<String>(new ObjectMapper().writeValueAsString(params), headers) ;         return restTemplate.postForObject("http://localhost:8080/payInfos/pay", requestEntity, String.class) ;     }      }

以上是兩個(gè)子模塊的所有代碼了

測(cè)試

初始數(shù)據(jù)

SpringBoot分布式事務(wù)中最大努力通知是怎樣的

SpringBoot分布式事務(wù)中最大努力通知是怎樣的

賬戶(hù)子模塊控制臺(tái)

SpringBoot分布式事務(wù)中最大努力通知是怎樣的

支付子模塊控制臺(tái)

SpringBoot分布式事務(wù)中最大努力通知是怎樣的

數(shù)據(jù)表數(shù)據(jù)

SpringBoot分布式事務(wù)中最大努力通知是怎樣的

看完上述內(nèi)容,你們對(duì)SpringBoot分布式事務(wù)中最大努力通知是怎樣的有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道,感謝大家的支持。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI