溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

怎么在Kotlin中使用RocketMQ實(shí)現(xiàn)一個延時消息

發(fā)布時間:2021-03-24 17:03:24 來源:億速云 閱讀:239 作者:Leah 欄目:移動開發(fā)

這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)怎么在Kotlin中使用RocketMQ實(shí)現(xiàn)一個延時消息,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

一. 延時消息

延時消息是指消息被發(fā)送以后,并不想讓消費(fèi)者立即拿到消息,而是等待指定時間后,消費(fèi)者才拿到這個消息進(jìn)行消費(fèi)。

使用延時消息的典型場景,例如:

  • 在電商系統(tǒng)中,用戶下完訂單30分鐘內(nèi)沒支付,則訂單可能會被取消。

  • 在電商系統(tǒng)中,用戶七天內(nèi)沒有評價商品,則默認(rèn)好評。

這些場景對應(yīng)的解決方案,包括:

  • 輪詢遍歷數(shù)據(jù)庫記錄

  • JDK 的 DelayQueue

  • ScheduledExecutorService

  • 基于 Quartz 的定時任務(wù)

  • 基于 Redis 的 zset 實(shí)現(xiàn)延時隊(duì)列。

除此之外,還可以使用消息隊(duì)列來實(shí)現(xiàn)延時消息,例如 RocketMQ。

二. RocketMQ

RocketMQ 是一個分布式消息和流數(shù)據(jù)平臺,具有低延遲、高性能、高可靠性、萬億級容量和靈活的可擴(kuò)展性。RocketMQ 是2012年阿里巴巴開源的第三代分布式消息中間件。

怎么在Kotlin中使用RocketMQ實(shí)現(xiàn)一個延時消息

三. RocketMQ 實(shí)現(xiàn)延時消息

3.1 業(yè)務(wù)背景

我們的系統(tǒng)完成某項(xiàng)操作之后,會推送事件消息到業(yè)務(wù)方的接口。當(dāng)我們調(diào)用業(yè)務(wù)方的通知接口返回值為成功時,表示本次推送消息成功;當(dāng)返回值為失敗時,則會多次推送消息,直到返回成功為止(保證至少成功一次)。
當(dāng)我們推送失敗后,雖然會進(jìn)行多次推送消息,但并不是立即進(jìn)行。會有一定的延遲,并按照一定的規(guī)則進(jìn)行推送消息。
例如:1小時后嘗試推送、3小時后嘗試推送、1天后嘗試推送、3天后嘗試推送等等。因此,考慮使用延時消息實(shí)現(xiàn)該功能。

3.2 生產(chǎn)者(Producer)

生產(chǎn)者負(fù)責(zé)產(chǎn)生消息,生產(chǎn)者向消息服務(wù)器發(fā)送由業(yè)務(wù)應(yīng)用程序系統(tǒng)生成的消息。

首先,定義一個支持延時發(fā)送的 AbstractProducer。

abstract class AbstractProducer :ProducerBean() {
  var producerId: String? = null
  var topic: String? = null
  var tag: String?=null
  var timeoutMillis: Int? = null
  var delaySendTimeMills: Long? = null

  val log = LogFactory.getLog(this.javaClass)

  open fun sendMessage(messageBody: Any, tag: String) {
    val msgBody = JSON.toJSONString(messageBody)
    val message = Message(topic, tag, msgBody.toByteArray())

    if (delaySendTimeMills != null) {
      val startDeliverTime = System.currentTimeMillis() + delaySendTimeMills!!
      message.startDeliverTime = startDeliverTime
      log.info( "send delay message producer startDeliverTime:${startDeliverTime}currentTime :${System.currentTimeMillis()}")
    }
    val logMessageId = buildLogMessageId(message)
    try {
      val sendResult = send(message)
      log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)
    } catch (e: Exception) {
      log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)
    }

  }

  fun buildLogMessageId(message: Message): String {
    return "topic: " + message.topic + "\n" +
        "producer: " + producerId + "\n" +
        "tag: " + message.tag + "\n" +
        "key: " + message.key + "\n"
  }
}

根據(jù)業(yè)務(wù)需要,增加一個支持重試機(jī)制的 Producer

@Component
@ConfigurationProperties("mqs.ons.producers.xxx-producer")
@Configuration
@Data
class CleanReportPushEventProducer :AbstractProducer() {

  lateinit var delaySecondList:List<Long>

  fun sendMessage(messageBody: CleanReportPushEventMessage){
    //重試超過次數(shù)之后不再發(fā)事件
    if (delaySecondList!=null) {

      if(messageBody.times>=delaySecondList.size){
        return
      }
      val msgBody = JSON.toJSONString(messageBody)
      val message = Message(topic, tag, msgBody.toByteArray())
      val delayTimeMills = delaySecondList[messageBody.times]*1000L
      message.startDeliverTime = System.currentTimeMillis() + delayTimeMills
      log.info( "messageBody: " + msgBody+ "startDeliverTime: "+message.startDeliverTime )
      val logMessageId = buildLogMessageId(message)
      try {
        val sendResult = send(message)
        log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)
      } catch (e: Exception) {
        log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)
      }
    }
  }
}

在 CleanReportPushEventProducer 中,超過了重試的次數(shù)就不會再發(fā)送消息了。

每一次延時消息的時間也會不同,因此需要根據(jù)重試的次數(shù)來獲取這個delayTimeMills 。

通過 System.currentTimeMillis() + delayTimeMills 可以設(shè)置 message 的 startDeliverTime。然后調(diào)用 send(message) 即可發(fā)送延時消息。

我們使用商用版的 RocketMQ,因此支持精度為秒級別的延遲消息。在開源版本中,RocketMQ 只支持18個特定級別的延遲消息。:(

3.3 消費(fèi)者(Consumer)

消費(fèi)者負(fù)責(zé)消費(fèi)消息,消費(fèi)者從消息服務(wù)器拉取信息并將其輸入用戶應(yīng)用程序。

定義 Push 類型的 AbstractConsumer:

@Data
abstract class AbstractConsumer ():MessageListener{

  var consumerId: String? = null

  lateinit var subscribeOptions: List<SubscribeOptions>

  var threadNums: Int? = null

  val log = LogFactory.getLog(this.javaClass)

  override fun consume(message: Message, context: ConsumeContext): Action {
    val logMessageId = buildLogMessageId(message)
    val body = String(message.body)
    try {
      log.info(logMessageId + " body: " + body)
      val result = consumeInternal(message, context, JSON.parseObject(body, getMessageBodyType(message.tag)))
      log.info(logMessageId + " result: " + result.name)
      return result
    } catch (e: Exception) {
      if (message.reconsumeTimes >= 3) {
        log.error(logMessageId + " error: " + e.message, e)
      }
      return Action.ReconsumeLater
    }

  }

  abstract fun getMessageBodyType(tag: String): Type?

  abstract fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action

  protected fun buildLogMessageId(message: Message): String {
    return "topic: " + message.topic + "\n" +
        "consumer: " + consumerId + "\n" +
        "tag: " + message.tag + "\n" +
        "key: " + message.key + "\n" +
        "MsgId:" + message.msgID + "\n" +
        "BornTimestamp" + message.bornTimestamp + "\n" +
        "StartDeliverTime:" + message.startDeliverTime + "\n" +
        "ReconsumeTimes:" + message.reconsumeTimes + "\n"
  }
}

再定義具體的消費(fèi)者,并且在消費(fèi)失敗之后能夠再發(fā)送一次消息。

@Configuration
@ConfigurationProperties("mqs.ons.consumers.clean-report-push-event-consumer")
@Data
class CleanReportPushEventConsumer(val cleanReportService: CleanReportService,val eventProducer:CleanReportPushEventProducer):AbstractConsumer() {

  val logger: Logger = LoggerFactory.getLogger(this.javaClass)

  override fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action {
    if(obj is CleanReportPushEventMessage){
      //清除事件
      logger.info("consumer clean-report event report_id:${obj.id} ")

      //消費(fèi)失敗之后再發(fā)送一次消息
      if(!cleanReportService.sendCleanReportEvent(obj.id)){
        val times = obj.times+1
        eventProducer.sendMessage(CleanReportPushEventMessage(obj.id,times))
      }
    }
    return Action.CommitMessage
  }

  override fun getMessageBodyType(tag: String): Type? {
    return CleanReportPushEventMessage::class.java
  }
}

其中,cleanReportService 的 sendCleanReportEvent() 會通過 http 的方式調(diào)用業(yè)務(wù)方提供的接口,進(jìn)行事件消息的推送。如果推送失敗了,則會進(jìn)行下一次的推送。(這里使用了 eventProducer 的 sendMessage() 方法再次投遞消息,是因?yàn)橐鶕?jù)調(diào)用的http接口返回的內(nèi)容來判斷消息是否發(fā)送成功。)

最后,定義 ConsumerFactory

@Component
class ConsumerFactory(val consumers: List<AbstractConsumer>,val aliyunOnsOptions: AliyunOnsOptions) {

  val logger: Logger = LoggerFactory.getLogger(this.javaClass)


  @PostConstruct
  fun start() {
    CompletableFuture.runAsync{
      consumers.stream().forEach {
        val properties = buildProperties(it.consumerId!!, it.threadNums)
        val consumer = ONSFactory.createConsumer(properties)
        if (it.subscribeOptions != null && !it.subscribeOptions!!.isEmpty()) {
          for (options in it.subscribeOptions!!) {
            consumer.subscribe(options.topic, options.tag, it)
          }
          consumer.start()
          val message = "\n".plus(
              it.subscribeOptions!!.stream().map{ a -> String.format("topic: %s, tag: %s has been started", a.topic, a.tag)}
                  .collect(Collectors.toList<Any>()))
          logger.info(String.format("consumer: %s\n", message))
        }
      }
    }
  }

  private fun buildProperties(consumerId: String,threadNums: Int?): Properties {
    val properties = Properties()
    properties.put(PropertyKeyConst.ConsumerId, consumerId)
    properties.put(PropertyKeyConst.AccessKey, aliyunOnsOptions.accessKey)
    properties.put(PropertyKeyConst.SecretKey, aliyunOnsOptions.secretKey)
    if (StringUtils.isNotEmpty(aliyunOnsOptions.onsAddr)) {
      properties.put(PropertyKeyConst.ONSAddr, aliyunOnsOptions.onsAddr)
    } else {
      // 測試環(huán)境接入RocketMQ
      properties.put(PropertyKeyConst.NAMESRV_ADDR, aliyunOnsOptions.nameServerAddress)
    }
    properties.put(PropertyKeyConst.ConsumeThreadNums, threadNums!!)
    return properties
  }
}

上述就是小編為大家分享的怎么在Kotlin中使用RocketMQ實(shí)現(xiàn)一個延時消息了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI