溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

PHP中怎么實現(xiàn)消息隊列MQ

發(fā)布時間:2021-08-03 15:10:37 來源:億速云 閱讀:204 作者:Leah 欄目:編程語言

這篇文章將為大家詳細講解有關(guān)PHP中怎么實現(xiàn)消息隊列MQ,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

1、MQ的應用場景

  優(yōu)點:
    a)主要解決異步消息
    b)應用解耦
    c)流量消峰等問題
    d)日志處理(kafka)

  缺點:

    a)系統(tǒng)可用性降低:你想啊,本來其他系統(tǒng)只要運行好好的,那你的系統(tǒng)就是正常的。現(xiàn)在你非要加個消息隊列進去,那消息隊列掛了,你的系統(tǒng)不是呵呵了。因此,系統(tǒng)可用性降低

    b)系統(tǒng)復雜性增加:要多考慮很多方面的問題,比如一致性問題、如何保證消息不被重復消費,如何保證保證消息可靠傳輸。因此,需要考慮的東西更多,系統(tǒng)復雜性增大。

1、為什么會造成重復消費?

因為網(wǎng)絡(luò)傳輸?shù)鹊裙收?/strong>,確認信息沒有傳送到消息隊列,導致消息隊列不知道自己已經(jīng)消費過該消息了,再次將該消息分發(fā)給其他的消費者。

2、解決重復消費的方案:

(1)比如,你拿到這個消息做數(shù)據(jù)庫的insert操作。那就容易了,給這個消息做一個唯一主鍵,那么就算出現(xiàn)重復消費的情況,就會導致主鍵沖突,避免數(shù)據(jù)庫出現(xiàn)臟數(shù)據(jù)。

(2)再比如,你拿到這個消息redis的set的操作,那就容易了,不用解決,因為你無論set幾次結(jié)果都是一樣的,set操作本來就算冪等操作。


(3)如果上面兩種情況還不行,上大招。準備一個第三方介質(zhì),來做消費記錄。以redis為例,給消息分配一個全局id,只要消費過該消息,將<id,message>以K-V形式寫入redis。那消費者開始消費前,先去redis中查詢有沒消費記錄即可。

2、消息模型
a)P2P(Point to Point)點對點模式(也就是一個任務只能被一個消費者消費)
1、包含三個角色:消息隊列(Queue),發(fā)送者(Sender),接受者(Receiver)

PHP實現(xiàn):

安裝rabbitMQ擴展:

在你的項目中添加一個 composer.json文件:

{
     "require": {
      "php-amqplib/php-amqplib": "2.6.1"
     }
 }

2、簡單模式(一對一)

PHP中怎么實現(xiàn)消息隊列MQ

<?php
# @File  : sample-send.php
# @Author: Liugp
# @Date  : 2019/7/22
# @Desc  : 生產(chǎn)者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立AMQP連接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();
// 定義隊列名稱
$channel->queue_declare('hello', false, false, false, false);
// 定義要發(fā)送的信息
$msg = new AMQPMessage('Hello World!'.time());
// 發(fā)送消息
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();
<?php
# @File  : sample-reciver.php
# @Author: Liugp
# @Date  : 2019/7/22
# @Desc  : 消費者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 建立連接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 創(chuàng)建信息通道
$channel    = $connection->channel();

// 聲明隊列
$channel->queue_declare('hello', false, false, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";

//  接受生產(chǎn)者的消息回調(diào)函數(shù)
$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
};

// 消費信息
$channel->basic_consume('hello', '', false, true, false, false, $callback);

// 正在消費時,則等待
while ($channel->is_consuming()) {
    $channel->wait();
}
$channel->close();
$connection->close();

3、Work模式(輪循隊列,每個消費者消費的數(shù)量是一樣的)(一對多)
4、Work模式(能者多勞)(一對多)

PHP中怎么實現(xiàn)消息隊列MQ

<?php
# @File  : work-send.php
# @Author: Liugp
# @Date  : 2019/7/22
# @Desc  : [work模式]生產(chǎn)者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 定義隊列名稱
// 隊列聲明為持久化(durable); 通過queue_declare的第三參數(shù)為true
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "Hello World!";
}

$msg = new AMQPMessage(
    $data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$channel->basic_publish($msg, '', 'task_queue');
echo ' [x] Sent ', $data, "\n";
$channel->close();
$connection->close();
<?php
# @File  : work-reciver.php
# @Author: Liugp
# @Date  : 2019/7/22
# @Desc  : [work模式]消費者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 建立連接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 創(chuàng)建信息通道
$channel    = $connection->channel();

// 隊列聲明為持久化(durable); 通過queue_declare的第三參數(shù)為true
$channel->queue_declare('task_queue', false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

// 公平調(diào)度(即能者多勞)
$channel->basic_qos(null, 1, null);

// 第四個參數(shù)basic_consume為false (true 意味著不響應ack);消費者掛掉這后,所有沒有響應的消息都會重新發(fā)送,減小消息丟失的概率,改為false后,則是手動確認,默認是自動確認
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}
$channel->close();
$connection->close();

5、每個消息只有一個消費者

6、發(fā)送者和接受者沒有時間依賴

7、接受者確認消息接受和處理成功

PHP中怎么實現(xiàn)消息隊列MQ

b)Publish/Subscribe(Pub/Sub)發(fā)布訂閱模式
    1、包含三個角色:主題(Topic),發(fā)布者(Publisher),訂閱者(Subscriber)
    2、一個生產(chǎn)者,多個消息者;客戶端只有訂閱后才能收到消息;持久化和非持久化
    3、每個消費者都有自己的隊列
    4、生產(chǎn)者沒有直接把消息發(fā)送到隊列,而是發(fā)送到交換機 轉(zhuǎn)發(fā)器exchange
    5、每個隊列都要綁定到交換機上
    6、生產(chǎn)者發(fā)送的消息經(jīng)過交換機到達隊列,就能實現(xiàn)一個消息被多個消費者消費
7、Exchange(交換機 轉(zhuǎn)發(fā)器)
      1、一方面是接受生產(chǎn)者的消息,另一方面是向隊列推送消息
      2、匿名轉(zhuǎn)發(fā)
      3、Fanout(訂閱模式;不處理路由鍵,廣播)

PHP中怎么實現(xiàn)消息隊列MQ

<?php
# @File  : subscribe-send.php
# @Author: Liugp
# @Date  : 2019/7/23
# @Desc  : [發(fā)布/訂閱模式]生產(chǎn)者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立連接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 創(chuàng)建信息通道
$channel    = $connection->channel();

// 定義交換機,第一個參數(shù)是交換機名稱,第二參數(shù)是交換機類型
$channel->exchange_declare('logs', 'fanout', false, false, false);
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "info: Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'logs');
echo ' [x] Sent ', $data, "\n";
$channel->close();
$connection->close();
<?php
# @File  : subscribe-reciver.php
# @Author: Liugp
# @Date  : 2019/7/23
# @Desc  : [發(fā)布/訂閱模式]消費者
#

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 建立連接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 創(chuàng)建信息通道
$channel    = $connection->channel();

// 定義交換機,第一個參數(shù)是交換機名稱,第二參數(shù)是交換機類型
$channel->exchange_declare('logs', 'fanout', false, false, false);

// 定義隊列
// 在 php-amqplib 客戶端,當我們提供隊列名稱為空字符串時,我們創(chuàng)建了一個具有生成名稱的非持久隊列:
// list($queue_name, ,) = $channel->queue_declare("");
// 方法返回時,$queue_name變量包含一個隨機生成的RabbitMQ隊列名稱。例如,類似amq.gen-jzty20brgko-hjmujj0wlg。
list($queue_name, ) = $channel->queue_declare("", false, false, true, false);

// 將交換機綁定到隊列
$channel->queue_bind($queue_name, 'logs');

echo " [*] Waiting for logs. To exit press CTRL+C\n";
$callback = function ($msg) {
    echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while ($channel->is_consuming()) {
    $channel->wait();
}
$channel->close();
$connection->close();

PHP中怎么實現(xiàn)消息隊列MQ

4、Direct(路由模式;處理路由鍵,發(fā)布與訂閱,完全匹配)

PHP中怎么實現(xiàn)消息隊列MQ

多個綁定(Multiple bindings)

PHP中怎么實現(xiàn)消息隊列MQ

整合

PHP中怎么實現(xiàn)消息隊列MQ

<?php
# @File  : routing-send.php
# @Author: Liugp
# @Date  : 2019/7/23
# @Desc  : [路由模式]生產(chǎn)者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立連接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 創(chuàng)建信息通道
$channel = $connection->channel();

// 聲明交換機,第一參數(shù)為交換機名稱,第二參數(shù)為交換機類型
$channel->exchange_declare('direct_logs', 'direct', false, false, false);

$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'direct_logs', $severity);

echo " [x] Sent ",$severity,':',$data," \n";

$channel->close();
$connection->close();

?>

以上內(nèi)容希望幫助到大家,很多PHPer在進階的時候總會遇到一些問題和瓶頸,業(yè)務代碼寫多了沒有方向感,不知道該從那里入手去提升,對此我整理了一些資料,包括但不限于:分布式架構(gòu)、高可擴展、高性能、高并發(fā)、服務器性能調(diào)優(yōu)、TP6,laravel,YII2,Redis,Swoole、Swoft、Kafka、Mysql優(yōu)化、shell腳本、Docker、微服務、Nginx等多個知識點高級進階干貨需要的可以免費分享給大家,需要

PHP進階架構(gòu)師>>>視頻、面試文檔免費獲取shimo.imPHP中怎么實現(xiàn)消息隊列MQ

然后繼續(xù)看

<?php
# @File  : routing-reciver.php
# @Author: Liugp
# @Date  : 2019/7/23
# @Desc  : [路由模式]消費者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

// 定義隊列,第一個參數(shù)為隊列名稱,為空則隨機生成
list($queue_name, ) = $channel->queue_declare("", false, false, true, false);

$severities = array_slice($argv, 1);
if (empty($severities)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
    exit(1);
}

foreach ($severities as $severity) {
    // 第二參數(shù)是交換機名稱,第三個參數(shù)是路由鍵名稱
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

  5、TopicTopic模式,規(guī)則匹配)

  1、將路由鍵和某模式匹配
2、"#"匹配零個或者多個
3、“*”匹配任意一個

PHP中怎么實現(xiàn)消息隊列MQ

<?php
# @File  : topic-send.php
# @Author: Liugp
# @Date  : 2019/7/23
# @Desc  : [主題模式]生產(chǎn)者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立連接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 創(chuàng)建信息通道
$channel = $connection->channel();

// 聲明交換機,第一參數(shù)為交換機名稱,第二參數(shù)為交換機類型
$channel->exchange_declare('topic_logs', 'topic', false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data        = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = "Hello World!";
}

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'topic_logs', $routing_key);

echo " [x] Sent ", $routing_key, ':', $data, " \n";

$channel->close();
$connection->close();
<?php
# @File  : topic-reciver.php
# @Author: Liugp
# @Date  : 2019/7/23
# @Desc  : [主題模式]消費者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();

// 聲明交換機,第一參數(shù)為交換機名稱,第二參數(shù)為交換機類型
$channel->exchange_declare('topic_logs', 'topic', false, false, false);

list($queue_name, ) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
    exit(1);
}

foreach ($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

PHP中怎么實現(xiàn)消息隊列MQ

3、使用RabbitMQ實現(xiàn)松耦合設(shè)計

PHP中怎么實現(xiàn)消息隊列MQ

4、RabbitMQ消息處理
  a)RabbitMQ的消息持久化處理
    1、消息的可靠性是RabbitMQ的一部分,那么RabbitMQ是如何保證消息可靠性的呢---消息持久化
    2、autoDelete
      @Queue:當所有消費客戶端鏈接斷開后,是否自動刪除隊列隊列;true:刪除,false:不刪除
      @Exchange:當所有綁定隊列都不在使用時,是否自動刪除交換機;true:刪除 false:不刪除
    3、消息確認ACK機制
      ACK機制是消費者從RabbitMQ收到消息并處理完成后,反饋給RabbitMQ,RabbitMQ收到反饋才將此消息從隊列中刪除

5、Rabbitmq的消息確認機制(事務+confirm)
  a)在rabbitmq中,可以通過持久化數(shù)據(jù),解決rabbitmq服務器異常的數(shù)據(jù)丟失問題
  b)問題:生產(chǎn)者將消息發(fā)送出去之后,消息到底有沒有達到rabbitmq服務器,默認是不知道的
    解決(兩種方式):
      1、AMQP實現(xiàn)事務機制
      2、Confirm
  c)事務機制(txSelect,txCommit,txRollback)
    1、txSelect:用戶將當前channel設(shè)置成transation模式
    2、txCommit:用于提交事務
    3、txRollback:回滾事務

關(guān)于PHP中怎么實現(xiàn)消息隊列MQ就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

php
AI