溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

RabbitMQ中怎么實(shí)現(xiàn)延遲功能

發(fā)布時(shí)間:2021-08-10 11:47:08 來源:億速云 閱讀:128 作者:Leah 欄目:編程語言

本篇文章為大家展示了RabbitMQ中怎么實(shí)現(xiàn)延遲功能,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。

2.啟用插件

使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang啟用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchang

輸出如下:

The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

通過rabbitmq-plugins list查看已安裝列表,如下:

[ ] rabbitmq_delayed_message_exchange 20171215-3.6.x

3.機(jī)制解釋

安裝插件后會(huì)生成新的Exchange類型x-delayed-message,該類型消息支持延遲投遞機(jī)制,接收到消息后并未立即將消息投遞至目標(biāo)隊(duì)列中,而是存儲(chǔ)在mnesia(一個(gè)分布式數(shù)據(jù)系統(tǒng))表中,檢測(cè)消息延遲時(shí)間,如達(dá)到可投遞時(shí)間時(shí)并將其通過x-delayed-type類型標(biāo)記的交換機(jī)類型投遞至目標(biāo)隊(duì)列。

4.php實(shí)現(xiàn)過程

消費(fèi)者 delay_consumer2.php:

<?php

//header('Content-Type:text/html;charset=utf8;');

$params = array(
    'exchangeName' => 'delayed_exchange_test',
    'queueName' => 'delayed_queue_test',
    'routeKey' => 'delayed_route_test',
);
$connectConfig = array(
    'host' => 'localhost',
    'port' => 5672,
    'login' => 'guest',
    'password' => 'guest',
    'vhost' => '/'
);

//var_dump(extension_loaded('amqp'));

//exit();

try {
    $conn = new AMQPConnection($connectConfig);
    $conn->connect();
    if (!$conn->isConnected()) {
        //die('Conexiune esuata');
        //TODO 記錄日志
        echo 'rabbit-mq 連接錯(cuò)誤:', json_encode($connectConfig);
        exit();
    }
    $channel = new AMQPChannel($conn);
    if (!$channel->isConnected()) {
        // die('Connection through channel failed');
        //TODO 記錄日志
        echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
        exit();
    }
    $exchange = new AMQPExchange($channel);
    //$exchange->setFlags(AMQP_DURABLE);//聲明一個(gè)已存在的交換器的,如果不存在將拋出異常,這個(gè)一般用在consume端
    $exchange->setName($params['exchangeName']);
    $exchange->setType('x-delayed-message'); //x-delayed-message類型
    /*RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。

      fanout:把所有發(fā)送到該Exchange的消息投遞到所有與它綁定的隊(duì)列中。

      direct:把消息投遞到那些binding key與routing key完全匹配的隊(duì)列中。

      topic:將消息路由到binding key與routing key模式匹配的隊(duì)列中。*/
    $exchange->setArgument('x-delayed-type','direct');
    $exchange->declareExchange();

    //$channel->startTransaction();

    $queue = new AMQPQueue($channel);
    $queue->setName($params['queueName']);
    $queue->setFlags(AMQP_DURABLE);
    $queue->declareQueue();

    //綁定
    $queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {
    echo $e->getMessage();
    exit();
}

function callback(AMQPEnvelope $message) {
    global $queue;
    if ($message) {
        $body = $message->getBody();
        echo '接收時(shí)間:'.date("Y-m-d H:i:s", time()). PHP_EOL;
        echo '接收內(nèi)容:'.$body . PHP_EOL;
        //為了防止接收端在處理消息時(shí)down掉,只有在消息處理完成后才發(fā)送ack消息
        $queue->ack($message->getDeliveryTag());
    } else {
        echo 'no message' . PHP_EOL;
    }
}

//$queue->consume('callback');  第一種消費(fèi)方式,但是會(huì)阻塞,程序一直會(huì)卡在此處

//第二種消費(fèi)方式,非阻塞
/*$start = time();
while(true)
{
    $message = $queue->get();
    if(!empty($message))
    {
        echo $message->getBody();
        $queue->ack($message->getDeliveryTag());    //應(yīng)答,代表該消息已經(jīng)消費(fèi)
        $end = time();
        echo '<br>' . ($end - $start);
        exit();
    }
    else
    {
        //echo 'message not found' . PHP_EOL;
    }
}*/

//注意:這里需要注意的是這個(gè)方法:$queue->consume,queue對(duì)象有兩個(gè)方法可用于取消息:consume和get。前者是阻塞的,無消息時(shí)會(huì)被掛起,適合循環(huán)中使用;后者則是非阻塞的,取消息時(shí)有則取,無則返回false。
//就是說用了consume之后,會(huì)同步阻塞,該程序常駐內(nèi)存,不能用nginx,apache調(diào)用。 
$action = '2';

if($action == '1'){
    $queue->consume('callback');  //第一種消費(fèi)方式,但是會(huì)阻塞,程序一直會(huì)卡在此處
}else{
    //第二種消費(fèi)方式,非阻塞
    $start = time();
    while(true)
    {
        $message = $queue->get();
        if(!empty($message))
        {
            echo '接收時(shí)間:'.date("Y-m-d H:i:s", time()). PHP_EOL;
            echo '接收內(nèi)容:'.$message->getBody().PHP_EOL;
            $queue->ack($message->getDeliveryTag());    //應(yīng)答,代表該消息已經(jīng)消費(fèi)
            $end = time();
            echo '運(yùn)行時(shí)間:'.($end - $start).'秒'.PHP_EOL;
            //exit();
        }
        else
        {
            //echo 'message not found' . PHP_EOL;
        }
    }
}

生產(chǎn)者delay_publisher2.php:

<?php

//header('Content-Type:text/html;charset=utf-8;');

$params = array(
    'exchangeName' => 'delayed_exchange_test',
    'queueName' => 'delayed_queue_test',
    'routeKey' => 'delayed_route_test',
);

$connectConfig = array(
    'host' => 'localhost',
    'port' => 5672,
    'login' => 'guest',
    'password' => 'guest',
    'vhost' => '/'
);

//var_dump(extension_loaded('amqp')); 判斷是否加載amqp擴(kuò)展
//exit();
try {
    $conn = new AMQPConnection($connectConfig);
    $conn->connect();
    if (!$conn->isConnected()) {
        //die('Conexiune esuata');
        //TODO 記錄日志
        echo 'rabbit-mq 連接錯(cuò)誤:', json_encode($connectConfig);
        exit();
    }
    $channel = new AMQPChannel($conn);
    if (!$channel->isConnected()) {
        // die('Connection through channel failed');
        //TODO 記錄日志
        echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
        exit();
    }
    $exchange = new AMQPExchange($channel);
    $exchange->setName($params['exchangeName']);
    $exchange->setType('x-delayed-message'); //x-delayed-message類型
    /*RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。

      fanout:把所有發(fā)送到該Exchange的消息投遞到所有與它綁定的隊(duì)列中。

      direct:把消息投遞到那些binding key與routing key完全匹配的隊(duì)列中。

      topic:將消息路由到binding key與routing key模式匹配的隊(duì)列中。*/
    $exchange->setArgument('x-delayed-type','direct');
    $exchange->declareExchange();

    //$channel->startTransaction();
    //RabbitMQ不容許聲明2個(gè)相同名稱、配置不同的Queue,否則報(bào)錯(cuò)
    $queue = new AMQPQueue($channel);
    $queue->setName($params['queueName']);
    $queue->setFlags(AMQP_DURABLE);
    $queue->declareQueue();

    //綁定隊(duì)列和交換機(jī)
    $queue->bind($params['exchangeName'], $params['routeKey']);
    //$channel->commitTransaction();
} catch(Exception $e) {

}

for($i=5;$i>0;$i--){
    //生成消息
    echo '發(fā)送時(shí)間:'.date("Y-m-d H:i:s", time()).PHP_EOL;
    echo 'i='.$i.',延遲'.$i.'秒'.PHP_EOL;
    $message = json_encode(['order_id'=>time(),'i'=>$i]);
    $exchange->publish($message, $params['routeKey'], AMQP_NOPARAM, ['headers'=>['x-delay'=> 1000*$i]]);
    sleep(2);
}
$conn->disconnect();

對(duì)于代碼來講,首先對(duì)于消費(fèi)者核心代碼

$exchange->setType('x-delayed-message'); //x-delayed-message類型
$exchange->setArgument('x-delayed-type','direct');

生產(chǎn)者核心代碼

$exchange = new AMQPExchange($channel);
$exchange->setName($params['exchangeName']);
$exchange->setType('x-delayed-message'); //x-delayed-message類型
$exchange->setArgument('x-delayed-type','direct');
$exchange->declareExchange();

使用方法:先運(yùn)行delay_consumer1.php,再運(yùn)行delay_publisher1.php

運(yùn)行效果:

RabbitMQ中怎么實(shí)現(xiàn)延遲功能

上述內(nèi)容就是RabbitMQ中怎么實(shí)現(xiàn)延遲功能,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI