RabbitMQ延遲隊列是通過插件rabbitmq_delayed_message_exchange來實現(xiàn)的,下面是使用步驟:
安裝插件:首先需要安裝rabbitmq_delayed_message_exchange插件,可以通過以下命令安裝:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
創(chuàng)建延遲交換機(jī):使用下面的命令創(chuàng)建一個延遲交換機(jī):
rabbitmqadmin declare exchange name=delayed_exchange type=x-delayed-message arguments='{"x-delayed-type":"direct"}'
創(chuàng)建延遲隊列:使用下面的命令創(chuàng)建一個延遲隊列,并將其綁定到延遲交換機(jī)上:
rabbitmqadmin declare queue name=delayed_queue
rabbitmqadmin declare binding source=delayed_exchange destination=delayed_queue routing_key=delayed_routing_key
發(fā)布延遲消息:使用下面的代碼片段發(fā)布一個延遲消息到延遲隊列:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.basic_publish(exchange='delayed_exchange',
routing_key='delayed_routing_key',
body='Delayed message',
properties=pika.BasicProperties(
headers={'x-delay': 5000} # 設(shè)置延遲時間,單位是毫秒
))
connection.close()
上述代碼中,通過設(shè)置headers中的x-delay字段來指定延遲時間,單位是毫秒。
消費延遲消息:使用下面的代碼片段消費延遲隊列中的消息:
import pika
def callback(ch, method, properties, body):
print("Received message:", body)
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.basic_consume(queue='delayed_queue',
on_message_callback=callback,
auto_ack=True)
channel.start_consuming()
上述代碼中,通過指定basic_consume方法的queue參數(shù)為延遲隊列名稱,并設(shè)置auto_ack為True,即自動確認(rèn)消息。
注意:以上代碼片段中的’localhost’和’delayed_routing_key’需要根據(jù)實際情況進(jìn)行修改。