您好,登錄后才能下訂單哦!
這篇文章給大家介紹如何理解Java RabbitMQ的TTL和DLX,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。
RabbitMQ的TTL全稱為Time-To-Live,表示的是消息的有效期。消息如果在隊(duì)列中一直沒有被消費(fèi)并且存在時(shí)間超過了TTL,消息就會(huì)變成了"死信" (Dead Message),后續(xù)無法再被消費(fèi)了。設(shè)置TTL有兩種方式:
第一種是聲明隊(duì)列的時(shí)候,在隊(duì)列的屬性中設(shè)置,這樣該隊(duì)列中的消息都會(huì)有相同的有效期;
第二種是發(fā)送消息時(shí)給消息設(shè)置屬性,可以為每條消息都設(shè)置不同的TTL。
如果兩種方式都設(shè)置了,則以設(shè)置的較小的為準(zhǔn)。兩者的區(qū)別:如果聲明隊(duì)列時(shí)設(shè)置了有效期,則消息過期了就會(huì)被刪掉;如果是發(fā)消息時(shí)設(shè)置的有效期,消息過期了也不會(huì)被立馬刪掉,因?yàn)檫@時(shí)消息是否過期是在要投遞給消費(fèi)者時(shí)判斷的。至于為啥要這樣處理很容易想清楚:第一種方式隊(duì)列的消息有效期都一樣,先入隊(duì)的在隊(duì)列頭部,頭部也是最早要過期的消息,RabbitMQ起一個(gè)定時(shí)任務(wù)從隊(duì)列的頭部開始掃描是否有過期消息即可;第二種方式每條消息的過期時(shí)間不同,所以只有遍歷整個(gè)隊(duì)列才可以篩選出來過期的消息,這樣效率太低了,而且消息量大了之后根本不可行的,可以等到消息要投遞給消費(fèi)者時(shí)再判斷刪除,雖然刪除的不夠及時(shí)但是不影響功能,其實(shí)就是用空間換時(shí)間。
如果不設(shè)置TTL,則表示此消息永久有效(默認(rèn)消息是不會(huì)失效的)。如果將TTL設(shè)為0,則表示如果消息不能被立馬消費(fèi)則會(huì)被立即丟掉,這個(gè)特性可以部分替代RabbitMQ3.0以前支持的immediate參數(shù),之所以所部分代替,是應(yīng)為immediate參數(shù)在投遞失敗會(huì)有basic.return方法將消息體返回(這個(gè)功能可以利用死信隊(duì)列來實(shí)現(xiàn))。
還記得我們之前聲明隊(duì)列的方法嗎,queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments),該方法的最后一個(gè)參數(shù)可以設(shè)置隊(duì)列的屬性,屬性名為x-message-ttl,單位為毫秒。如果不清楚隊(duì)列屬性有哪些,可以查看web控制臺(tái)的添加隊(duì)列的地方。
具體代碼如下:
//設(shè)置隊(duì)列上所有的消息的有效期,單位為毫秒 Map<String, Object> argss = new HashMap<String , Object>(); arguments.put("x-message-ttl " , 5000);//5秒鐘 channel.queueDeclare(queueName , durable , exclusive , autoDelete , arguments) ;
查看控制臺(tái)的隊(duì)列列表如下:D表示持久化,TTL表示設(shè)置了消息的有效期。
過了幾秒鐘后發(fā)現(xiàn)消息已經(jīng)不存在了。
也可以用RabbitMQ的命令行模式來設(shè)置:
rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues
還可以通過HTTP接口調(diào)用:
$ curl -i -u guest:guest -H "content-type:application/json" -XPUT -d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 60000}}' http://ip:15672/api/queues/{vhost}/{queuename}
發(fā)送消息時(shí)basicPublish方法可以設(shè)置屬性參數(shù),里面通過expiration屬性設(shè)置消息有效期,單位為毫秒,代碼如下所示
Builder bd = new AMQP.BasicProperties().builder(); bd.deliveryMode(2);//持久化 bd.expiration("100000");//設(shè)置消息有效期100秒鐘 BasicProperties pros = bd.build(); String message = "測(cè)試ttl消息"; channel.basicPublish(EXCHANGE_NAME, "error", true,false, pros, message.getBytes());
另外也可以通過HTTPAPI 接口設(shè)置:
$ curl -i -u guest:guest -H "content-type:application/json" -XPOST -d '{"properties":{"expiration":"60000"},"routing_key":"routingkey","payload":"my body","payload_encoding":"string"}' http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish
完整的通過隊(duì)列設(shè)置消息有效期、發(fā)布消息時(shí)通過屬性設(shè)置有效期的代碼如下:可以運(yùn)行后,觀察下控制臺(tái),可以發(fā)現(xiàn)同時(shí)設(shè)置時(shí),消息的有效期是以較小的為準(zhǔn)的。項(xiàng)目GitHub地址 https://github.com/RookieMember/RabbitMQ-Learning.git。
package cn.wkp.rabbitmq.newest.ttl; import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.AMQP.BasicProperties.Builder; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import cn.wkp.rabbitmq.util.ConnectionUtil; /** * * @ClassName: Send * @Description: 消息有效期 * @author wkg * @date: 2021年9月1日 下午11:28:22 */ public class Send { private final static String EXCHANGE_NAME = "ttl_exchange"; private final static String QUEUE_NAME = "ttl_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 從連接中創(chuàng)建通道 Channel channel = connection.createChannel(); // 聲明交換機(jī) channel.exchangeDeclare(EXCHANGE_NAME, "direct",true); //*****1:通過隊(duì)列設(shè)置有效期 2:通過消息屬性設(shè)置有效期,如果都設(shè)置了以較小的為準(zhǔn)***** //聲明隊(duì)列 Map<String, Object> arguments=new HashMap<String,Object>(); //設(shè)置隊(duì)列上所有的消息的有效期,單位為毫秒 arguments.put("x-message-ttl", 5000);//5秒鐘 channel.queueDeclare(QUEUE_NAME, true, false, false, arguments); //綁定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); Builder bd = new AMQP.BasicProperties().builder(); bd.deliveryMode(2);//持久化 bd.expiration("100000");//設(shè)置消息有效期100秒鐘 BasicProperties pros = bd.build(); String message = "測(cè)試ttl消息"; channel.basicPublish(EXCHANGE_NAME, "error", true,false, pros, message.getBytes()); System.out.println("Sent message:" + message); // 關(guān)閉通道和連接 channel.close(); connection.close(); } }
上面在web管控臺(tái)添加隊(duì)列的時(shí)候,我們看到有一個(gè)x-expires參數(shù),可以讓隊(duì)列在指定時(shí)間內(nèi) "未被使用" 的話會(huì)自動(dòng)過期刪除,未使用的意思是 queue 上沒有任何 consumer,queue 沒有被重新聲明,并且在過期時(shí)間段內(nèi)未調(diào)用過 basic.get 命令。該方式可用于,例如,RPC-style 的回復(fù) queue, 其中許多queue 會(huì)被創(chuàng)建出來,但是卻從未被使用。
服務(wù)器會(huì)確保在過期時(shí)間到達(dá)后 queue 被刪除,但是不保證刪除的動(dòng)作有多么的及時(shí)。在服務(wù)器重啟后,持久化的queue 的超時(shí)時(shí)間將重新計(jì)算。 x-expires 參數(shù)值以毫秒為單位,并且服從和 x-message-ttl 一樣的約束條件,且不能設(shè)置為 0 。所以,如果該參數(shù)設(shè)置為 1000 ,則表示該 queue 如果在 1s之內(nèi)未被使用則會(huì)被刪除。
Map<String, Object> args = new HashMap<String, Object>(); args.put("x-expires", 18000); //隊(duì)列有效期18秒 channel.queueDeclare("myqueue", false, false, false, args);
DLX是Dead-Letter-Exchange的簡寫,意思是死信交換機(jī)。
它的作用其實(shí)是用來接收死信消息(dead message)的。那什么是死信消息呢?一般消息變成死信消息有如下幾種情況:
消息被拒絕(Basic.Reject/Basic.Nack) ,井且設(shè)置requeue 參數(shù)為false
消息過期
隊(duì)列達(dá)到最大長度
當(dāng)消息在一個(gè)隊(duì)列中變成了死信消息后,可以被發(fā)送到另一個(gè)交換機(jī),這個(gè)交換機(jī)就是DLX,綁定DLX的隊(duì)列成為死信隊(duì)列。當(dāng)這個(gè)隊(duì)列中存在死信時(shí), RabbitMQ 就會(huì)立即自動(dòng)地將這個(gè)消息重新發(fā)布到設(shè)置的DLX 上去,進(jìn)而被路由到綁定該DLX的死信隊(duì)列上??梢员O(jiān)聽這個(gè)隊(duì)列中的消息、以進(jìn)行相應(yīng)的處理,這個(gè)特性與將消息的TTL 設(shè)置為0 配合使用可以彌補(bǔ)imrnediate 參數(shù)的功能。
因?yàn)橄⑷绻幢徽OM(fèi)并設(shè)置了requeue為false時(shí)會(huì)進(jìn)入死信隊(duì)列,我們可以監(jiān)控消費(fèi)死信隊(duì)列中消息,來觀察和分析系統(tǒng)的問題。DLX還有一個(gè)非常重要的作用,就是結(jié)合TTL實(shí)現(xiàn)延遲隊(duì)列(延遲隊(duì)列的使用范圍還是挺廣的:比如下單超過多長時(shí)間自動(dòng)關(guān)閉;比如我們接入過第三方支付系統(tǒng)的同學(xué)一定知道,我們的訂單中會(huì)傳一個(gè)notify_url用于接收支付結(jié)果知,如果我們給第三方支付響應(yīng)的不是成功的消息,其會(huì)隔一段時(shí)間繼續(xù)調(diào)用通知我們的notify_url,超過幾次后不再進(jìn)行通知,一般通知頻率都是 0秒-5秒-30秒-5分鐘-30分鐘-1小時(shí)-6小時(shí)-12小時(shí);比如我們的家用電器定時(shí)關(guān)機(jī)。。。。。。這些場(chǎng)景都是可以用延遲隊(duì)列實(shí)現(xiàn)的)。
下面在web管控臺(tái)添加隊(duì)列的時(shí)候,我們看到有兩個(gè)DLX相關(guān)的參數(shù):x-dead-letter-exchange和x-dead-letter-routing-key。x-dead-letter-exchange是設(shè)置隊(duì)列的DLX的;x-dead-letter-routing-key是設(shè)置死信消息進(jìn)入DLX時(shí)的routing key的,這個(gè)是可以不設(shè)置的,如果不設(shè)置,則默認(rèn)使用原隊(duì)列的routing key。
客戶端可以通過channel.queueDeclare方法聲明隊(duì)列時(shí)設(shè)置x-dead-letter-exchange參數(shù),具體代碼如下所示
channel.exchangeDeclare("dlx_exchange" , "direct"); //創(chuàng)建DLX: dlx_exchange Map<String, Object> args = new HashMap<String, Object>(); args.put("x-dead-letter-exchange" , "dlx_exchange ");//設(shè)置DLX args.put("x-dead-letter-routing-key" , "dlx-routing-key");//設(shè)置DLX的路由鍵(可以不設(shè)置) //為隊(duì)列myqueue 添加DLX channel.queueDeclare("myqueue" , false , false , false , args);
上面說的可能比較抽象,下面我們通過一個(gè)具體的例子,來演示一下DLX的具體使用:
package cn.wkp.rabbitmq.newest.dlx; import java.util.Date; import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; import cn.wkp.rabbitmq.util.ConnectionUtil; public class SendDLX { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //聲明一個(gè)交換機(jī),做死信交換機(jī)用 channel.exchangeDeclare("dlx_exchange", "topic", true, false, null); //聲明一個(gè)隊(duì)列,做死信隊(duì)列用 channel.queueDeclare("dlx_queue", true, false, false, null); //隊(duì)列綁定到交換機(jī)上 channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*"); channel.exchangeDeclare("normal_exchange", "fanout", true, false, null); Map<String, Object> arguments=new HashMap<String, Object>(); arguments.put("x-message-ttl" , 5000);//設(shè)置消息有效期1秒,過期后變成私信消息,然后進(jìn)入DLX arguments.put("x-dead-letter-exchange" , "dlx_exchange");//設(shè)置DLX arguments.put("x-dead-letter-routing-key" , "dlx.test");//設(shè)置DLX的路由鍵(可以不設(shè)置) //為隊(duì)列normal_queue 添加DLX channel.queueDeclare("normal_queue", true, false, false, arguments); channel.queueBind("normal_queue", "normal_exchange", ""); channel.basicPublish("normal_exchange", "", MessageProperties.PERSISTENT_TEXT_PLAIN, ("測(cè)試死信消息").getBytes()); System.out.println("發(fā)送消息時(shí)間:"+ConnectionUtil.formatDate(new Date())); channel.close(); connection.close(); } }
上面是發(fā)送者的代碼,運(yùn)行后觀察控制臺(tái)可以看到如下所示:
死信隊(duì)列dlx_queue的綁定如下,其已與死信交換機(jī)dlx_exchange(topic類型)進(jìn)行了綁定,routing key為"dlx.*"
隊(duì)列normal_queue的綁定如下,其已與交換機(jī)normal_exchange(fanout類型)進(jìn)行了綁定
queues視圖如下:DLX和DLK表示設(shè)置給normal_queue設(shè)置了死信交換機(jī)和死信消息的routing key,我們看到消息已經(jīng)被路由到了死信隊(duì)列上面。整個(gè)流程為:
消息發(fā)送到交換機(jī)normal_exchange,然后路由到隊(duì)列normal_queue上
因?yàn)殛?duì)列normal_queue沒有消費(fèi)者,消息過期后成為死信消息
死信消息攜帶設(shè)置的x-dead-letter-routing-key=dlx.test進(jìn)入到死信交換機(jī)dlx_exechage
dlx_exechage與dlx_queue綁定的routing key為"dlx.*",死信消息的路由鍵dlx.test符合該規(guī)則被路由到dlx.queue上面。
然后我們給死信隊(duì)列添加消費(fèi)者如下:我們測(cè)試一下死信消息進(jìn)入DLX的時(shí)間,先將之前的那個(gè)死信消息刪除
package cn.wkp.rabbitmq.newest.dlx; import java.io.IOException; import java.util.Date; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import cn.wkp.rabbitmq.util.ConnectionUtil; public class RecvDLX { public static void main(String[] argv) throws Exception { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare("dlx_exchange", "topic", true, false, null); channel.queueDeclare("dlx_queue", true, false, false, null); channel.queueBind("dlx_queue", "dlx_exchange", "dlx.*"); // 指該消費(fèi)者在接收到隊(duì)列里的消息但沒有返回確認(rèn)結(jié)果之前,它不會(huì)將新的消息分發(fā)給它。 channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println("消費(fèi)者收到消息:" + new String(body)+",當(dāng)前時(shí)間:"+ConnectionUtil.formatDate(new Date())); // 消費(fèi)者手動(dòng)發(fā)送ack應(yīng)答 channel.basicAck(envelope.getDeliveryTag(), false); } }; System.out.println("消費(fèi)死信隊(duì)列中的消息======================"); // 監(jiān)聽隊(duì)列 channel.basicConsume("dlx_queue", false, consumer); } }
運(yùn)行結(jié)果如下(先運(yùn)行的死信隊(duì)列消費(fèi)者,然后運(yùn)行生產(chǎn)者):我們看到消息過期后10毫秒就被死信隊(duì)列的消費(fèi)者消費(fèi)到了,顯然,消息成為死信后是立即被發(fā)送到了DLX中。
消費(fèi)死信隊(duì)列中的消息======================
消費(fèi)者收到消息:測(cè)試死信消息,當(dāng)前時(shí)間:2021-09-24 16:30:05:740
發(fā)送消息時(shí)間:2021-09-24 17:57:00:730
關(guān)于如何理解Java RabbitMQ的TTL和DLX就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺得文章不錯(cuò),可以把它分享出去讓更多的人看到。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。