溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

什么是SimpleMessageListenerContainer和DirectMessageListenerContainer

發(fā)布時(shí)間:2021-10-13 13:53:32 來(lái)源:億速云 閱讀:289 作者:iii 欄目:編程語(yǔ)言

這篇文章主要介紹“什么是SimpleMessageListenerContainer和DirectMessageListenerContainer”,在日常操作中,相信很多人在什么是SimpleMessageListenerContainer和DirectMessageListenerContainer問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”什么是SimpleMessageListenerContainer和DirectMessageListenerContainer”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!

RabbitMQ 消費(fèi)者代碼開(kāi)發(fā)過(guò)程中會(huì)使用到 SimpleMessageListenerContainer 和 DirectMessageListenerContainer。
在版本2.0之前的版本中,只有一種MessageListenerContainer 即 SimpleMessageListenerContainer; 2.0之后有第二個(gè)容器—DirectMessageListenerContainer

一、SimpleMessageListenerContainer

默認(rèn)情況下,偵聽(tīng)器容器將啟動(dòng)單個(gè)使用者,該使用者將從隊(duì)列接收消息。根據(jù)之前的文檔,我們知道有許多控制并發(fā)性的屬性。
最簡(jiǎn)單的是concurrentConsumers,它只創(chuàng)建(固定的)將并發(fā)處理消息的使用者數(shù)量。
此外,還添加了一個(gè)新的屬性 maxConcurrentConsumers,容器將根據(jù)工作負(fù)載動(dòng)態(tài)調(diào)整并發(fā)性。這與四個(gè)附加屬性一起工作:continutiveactivetrigger、startConsumerMinInterval、continutiveidletrigger、stopConsumerMinInterval。

1.1、在默認(rèn)設(shè)置下,增加消費(fèi)者的算法工作如下

如果尚未到達(dá)maxConcurrentConsumers,并且已有的使用者連續(xù)10個(gè)周期處于活動(dòng)狀態(tài),并且自上一個(gè)使用者啟動(dòng)以來(lái)至少已經(jīng)過(guò)了10秒,那么將啟動(dòng)一個(gè)新的使用者。如果使用者在txSize *中接收到至少一條消息,則認(rèn)為該使用者處于活動(dòng)狀態(tài)。

1.2、在默認(rèn)設(shè)置下,減少消費(fèi)者的算法工作如下

如果有多個(gè)concurrentConsumers正在運(yùn)行,并且某個(gè)consumer檢測(cè)到10個(gè)連續(xù)超時(shí)(空閑),并且上一個(gè)consumer至少在60秒之前停止,那么該consumer將停止。超時(shí)取決于receiveTimeout和txSize屬性。如果使用者在txSize *中沒(méi)有接收到任何消息,則認(rèn)為它是空閑的。因此,在默認(rèn)超時(shí)(1秒)和txSize為4的情況下,在40秒的空閑時(shí)間(4個(gè)超時(shí)對(duì)應(yīng)1個(gè)空閑檢測(cè))之后將考慮停止使用者。

1.3、配置如下
@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory){
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    //初始化消費(fèi)者數(shù)量
    factory.setConcurrentConsumers(this.concurrentConsumers);
    //最大消費(fèi)者數(shù)量
    factory.setMaxConcurrentConsumers(this.maxConcurrentConsumers);
    //手動(dòng)確認(rèn)消息
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    factory.setErrorHandler(rabbitErrorHandler);
    return factory;
}

二、DirectMessageListenerContainer

使用 DirectMessageListenerContainer,您需要確保 ConnectionFactory 配置了一個(gè)任務(wù)執(zhí)行器,該執(zhí)行器在使用該 ConnectionFactory 的所有偵聽(tīng)器容器中具有足夠的線程來(lái)支持所需的并發(fā)性。默認(rèn)連接池大小僅為5。
并發(fā)性基于配置的隊(duì)列和consumersPerQueue。每個(gè)隊(duì)列的每個(gè)使用者使用一個(gè)單獨(dú)的通道,并發(fā)性由rabbit客戶端庫(kù)控制;默認(rèn)情況下,它使用5個(gè)線程池;您可以配置taskExecutor來(lái)提供所需的最大并發(fā)性。

2.1、配置如下
@Bean
public DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(ConnectionFactory connectionFactory){
    DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    //每個(gè)隊(duì)列的消費(fèi)者數(shù)量
    factory.setConsumersPerQueue(this.consumersPerQueue);
    //手動(dòng)確認(rèn)消息
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    factory.setErrorHandler(rabbitErrorHandler);
    return factory;
}

三、服務(wù)對(duì)比

SimpleMessageListenerContainer提供了以下特性,但DirectMessageListenerContainer不提供:

  1. txSize—使用SimpleMessageListenerContainer,您可以將其設(shè)置為控制事務(wù)中傳遞的消息數(shù)量和/或減少ack的數(shù)量,但這可能會(huì)導(dǎo)致失敗后重復(fù)傳遞的數(shù)量增加。(與txSize和SimpleMessageListenerContainer一樣,DirectMessageListenerContainer也有mesagesPerAck,可以用來(lái)減少ack,但不能用于事務(wù)—每個(gè)消息都在單獨(dú)的事務(wù)中交付和打包)。

  2. maxconcurrentconsumer和consumer伸縮間隔/觸發(fā)器—DirectMessageListenerContainer中沒(méi)有自動(dòng)伸縮;但是,它允許您以編程方式更改consumersPerQueue屬性,并相應(yīng)地調(diào)整使用者。

然而,與SimpleMessageListenerContainer相比,DirectMessageListenerContainer有以下優(yōu)點(diǎn):

  1. 在運(yùn)行時(shí)添加和刪除隊(duì)列更有效;使用SimpleMessageListenerContainer,整個(gè)使用者線程重新啟動(dòng)(所有使用者取消并重新創(chuàng)建);對(duì)于DirectMessageListenerContainer,不受影響的使用者不會(huì)被取消。

  2. 避免了RabbitMQ客戶機(jī)線程和使用者線程之間的上下文切換。

  3. 線程是跨使用者共享的,而不是為SimpleMessageListenerContainer中的每個(gè)使用者都有一個(gè)專(zhuān)用線程。

到此,關(guān)于“什么是SimpleMessageListenerContainer和DirectMessageListenerContainer”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI