您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關(guān)PHP中怎么實現(xiàn)消息隊列MQ,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
1、MQ的應用場景
優(yōu)點:
a)主要解決異步消息
b)應用解耦
c)流量消峰等問題
d)日志處理(kafka)
缺點:
a)系統(tǒng)可用性降低:你想啊,本來其他系統(tǒng)只要運行好好的,那你的系統(tǒng)就是正常的。現(xiàn)在你非要加個消息隊列進去,那消息隊列掛了,你的系統(tǒng)不是呵呵了。因此,系統(tǒng)可用性降低
b)系統(tǒng)復雜性增加:要多考慮很多方面的問題,比如一致性問題、如何保證消息不被重復消費,如何保證保證消息可靠傳輸。因此,需要考慮的東西更多,系統(tǒng)復雜性增大。
1、為什么會造成重復消費?
因為網(wǎng)絡(luò)傳輸?shù)鹊裙收?/strong>,確認信息沒有傳送到消息隊列,導致消息隊列不知道自己已經(jīng)消費過該消息了,再次將該消息分發(fā)給其他的消費者。
2、解決重復消費的方案:
(1)比如,你拿到這個消息做數(shù)據(jù)庫的insert操作。那就容易了,給這個消息做一個唯一主鍵,那么就算出現(xiàn)重復消費的情況,就會導致主鍵沖突,避免數(shù)據(jù)庫出現(xiàn)臟數(shù)據(jù)。
(2)再比如,你拿到這個消息做redis的set的操作,那就容易了,不用解決,因為你無論set幾次結(jié)果都是一樣的,set操作本來就算冪等操作。
(3)如果上面兩種情況還不行,上大招。準備一個第三方介質(zhì),來做消費記錄。以redis為例,給消息分配一個全局id,只要消費過該消息,將<id,message>以K-V形式寫入redis。那消費者開始消費前,先去redis中查詢有沒消費記錄即可。
2、消息模型
a)P2P(Point to Point)點對點模式(也就是一個任務只能被一個消費者消費)
1、包含三個角色:消息隊列(Queue),發(fā)送者(Sender),接受者(Receiver)
PHP實現(xiàn):
安裝rabbitMQ擴展:
在你的項目中添加一個 composer.json文件:
{ "require": { "php-amqplib/php-amqplib": "2.6.1" } }
2、簡單模式(一對一)
<?php # @File : sample-send.php # @Author: Liugp # @Date : 2019/7/22 # @Desc : 生產(chǎn)者 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 建立AMQP連接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); // 定義隊列名稱 $channel->queue_declare('hello', false, false, false, false); // 定義要發(fā)送的信息 $msg = new AMQPMessage('Hello World!'.time()); // 發(fā)送消息 $channel->basic_publish($msg, '', 'hello'); echo " [x] Sent 'Hello World!'\n"; $channel->close(); $connection->close();
<?php # @File : sample-reciver.php # @Author: Liugp # @Date : 2019/7/22 # @Desc : 消費者 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; // 建立連接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 創(chuàng)建信息通道 $channel = $connection->channel(); // 聲明隊列 $channel->queue_declare('hello', false, false, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; // 接受生產(chǎn)者的消息回調(diào)函數(shù) $callback = function ($msg) { echo ' [x] Received ', $msg->body, "\n"; }; // 消費信息 $channel->basic_consume('hello', '', false, true, false, false, $callback); // 正在消費時,則等待 while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close();
3、Work模式(輪循隊列,每個消費者消費的數(shù)量是一樣的)(一對多)
4、Work模式(能者多勞)(一對多)
<?php # @File : work-send.php # @Author: Liugp # @Date : 2019/7/22 # @Desc : [work模式]生產(chǎn)者 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); // 定義隊列名稱 // 隊列聲明為持久化(durable); 通過queue_declare的第三參數(shù)為true $channel->queue_declare('task_queue', false, true, false, false); $data = implode(' ', array_slice($argv, 1)); if (empty($data)) { $data = "Hello World!"; } $msg = new AMQPMessage( $data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); $channel->basic_publish($msg, '', 'task_queue'); echo ' [x] Sent ', $data, "\n"; $channel->close(); $connection->close();
<?php # @File : work-reciver.php # @Author: Liugp # @Date : 2019/7/22 # @Desc : [work模式]消費者 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; // 建立連接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 創(chuàng)建信息通道 $channel = $connection->channel(); // 隊列聲明為持久化(durable); 通過queue_declare的第三參數(shù)為true $channel->queue_declare('task_queue', false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo ' [x] Received ', $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo " [x] Done\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; // 公平調(diào)度(即能者多勞) $channel->basic_qos(null, 1, null); // 第四個參數(shù)basic_consume為false (true 意味著不響應ack);消費者掛掉這后,所有沒有響應的消息都會重新發(fā)送,減小消息丟失的概率,改為false后,則是手動確認,默認是自動確認 $channel->basic_consume('task_queue', '', false, false, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close();
5、每個消息只有一個消費者
6、發(fā)送者和接受者沒有時間依賴
7、接受者確認消息接受和處理成功
b)Publish/Subscribe(Pub/Sub)發(fā)布訂閱模式
1、包含三個角色:主題(Topic),發(fā)布者(Publisher),訂閱者(Subscriber)
2、一個生產(chǎn)者,多個消息者;客戶端只有訂閱后才能收到消息;持久化和非持久化
3、每個消費者都有自己的隊列
4、生產(chǎn)者沒有直接把消息發(fā)送到隊列,而是發(fā)送到交換機 轉(zhuǎn)發(fā)器exchange
5、每個隊列都要綁定到交換機上
6、生產(chǎn)者發(fā)送的消息經(jīng)過交換機到達隊列,就能實現(xiàn)一個消息被多個消費者消費
7、Exchange(交換機 轉(zhuǎn)發(fā)器)
1、一方面是接受生產(chǎn)者的消息,另一方面是向隊列推送消息
2、匿名轉(zhuǎn)發(fā)
3、Fanout(訂閱模式;不處理路由鍵,廣播)
<?php # @File : subscribe-send.php # @Author: Liugp # @Date : 2019/7/23 # @Desc : [發(fā)布/訂閱模式]生產(chǎn)者 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 建立連接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 創(chuàng)建信息通道 $channel = $connection->channel(); // 定義交換機,第一個參數(shù)是交換機名稱,第二參數(shù)是交換機類型 $channel->exchange_declare('logs', 'fanout', false, false, false); $data = implode(' ', array_slice($argv, 1)); if (empty($data)) { $data = "info: Hello World!"; } $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'logs'); echo ' [x] Sent ', $data, "\n"; $channel->close(); $connection->close();
<?php # @File : subscribe-reciver.php # @Author: Liugp # @Date : 2019/7/23 # @Desc : [發(fā)布/訂閱模式]消費者 # require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; // 建立連接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 創(chuàng)建信息通道 $channel = $connection->channel(); // 定義交換機,第一個參數(shù)是交換機名稱,第二參數(shù)是交換機類型 $channel->exchange_declare('logs', 'fanout', false, false, false); // 定義隊列 // 在 php-amqplib 客戶端,當我們提供隊列名稱為空字符串時,我們創(chuàng)建了一個具有生成名稱的非持久隊列: // list($queue_name, ,) = $channel->queue_declare(""); // 方法返回時,$queue_name變量包含一個隨機生成的RabbitMQ隊列名稱。例如,類似amq.gen-jzty20brgko-hjmujj0wlg。 list($queue_name, ) = $channel->queue_declare("", false, false, true, false); // 將交換機綁定到隊列 $channel->queue_bind($queue_name, 'logs'); echo " [*] Waiting for logs. To exit press CTRL+C\n"; $callback = function ($msg) { echo ' [x] ', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close();
4、Direct(路由模式;處理路由鍵,發(fā)布與訂閱,完全匹配)
多個綁定(Multiple bindings)
整合
<?php # @File : routing-send.php # @Author: Liugp # @Date : 2019/7/23 # @Desc : [路由模式]生產(chǎn)者 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 建立連接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 創(chuàng)建信息通道 $channel = $connection->channel(); // 聲明交換機,第一參數(shù)為交換機名稱,第二參數(shù)為交換機類型 $channel->exchange_declare('direct_logs', 'direct', false, false, false); $severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info'; $data = implode(' ', array_slice($argv, 2)); if(empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'direct_logs', $severity); echo " [x] Sent ",$severity,':',$data," \n"; $channel->close(); $connection->close(); ?>
以上內(nèi)容希望幫助到大家,很多PHPer在進階的時候總會遇到一些問題和瓶頸,業(yè)務代碼寫多了沒有方向感,不知道該從那里入手去提升,對此我整理了一些資料,包括但不限于:分布式架構(gòu)、高可擴展、高性能、高并發(fā)、服務器性能調(diào)優(yōu)、TP6,laravel,YII2,Redis,Swoole、Swoft、Kafka、Mysql優(yōu)化、shell腳本、Docker、微服務、Nginx等多個知識點高級進階干貨需要的可以免費分享給大家,需要
PHP進階架構(gòu)師>>>視頻、面試文檔免費獲取shimo.im
然后繼續(xù)看
<?php # @File : routing-reciver.php # @Author: Liugp # @Date : 2019/7/23 # @Desc : [路由模式]消費者 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('direct_logs', 'direct', false, false, false); // 定義隊列,第一個參數(shù)為隊列名稱,為空則隨機生成 list($queue_name, ) = $channel->queue_declare("", false, false, true, false); $severities = array_slice($argv, 1); if (empty($severities)) { file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n"); exit(1); } foreach ($severities as $severity) { // 第二參數(shù)是交換機名稱,第三個參數(shù)是路由鍵名稱 $channel->queue_bind($queue_name, 'direct_logs', $severity); } echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; $callback = function ($msg) { echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();
5、Topic(Topic模式,規(guī)則匹配)
1、將路由鍵和某模式匹配
2、"#"匹配零個或者多個
3、“*”匹配任意一個
<?php # @File : topic-send.php # @Author: Liugp # @Date : 2019/7/23 # @Desc : [主題模式]生產(chǎn)者 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 建立連接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 創(chuàng)建信息通道 $channel = $connection->channel(); // 聲明交換機,第一參數(shù)為交換機名稱,第二參數(shù)為交換機類型 $channel->exchange_declare('topic_logs', 'topic', false, false, false); $routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info'; $data = implode(' ', array_slice($argv, 2)); if (empty($data)) { $data = "Hello World!"; } $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'topic_logs', $routing_key); echo " [x] Sent ", $routing_key, ':', $data, " \n"; $channel->close(); $connection->close();
<?php # @File : topic-reciver.php # @Author: Liugp # @Date : 2019/7/23 # @Desc : [主題模式]消費者 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); // 聲明交換機,第一參數(shù)為交換機名稱,第二參數(shù)為交換機類型 $channel->exchange_declare('topic_logs', 'topic', false, false, false); list($queue_name, ) = $channel->queue_declare("", false, false, true, false); $binding_keys = array_slice($argv, 1); if (empty($binding_keys)) { file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n"); exit(1); } foreach ($binding_keys as $binding_key) { $channel->queue_bind($queue_name, 'topic_logs', $binding_key); } echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; $callback = function ($msg) { echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();
3、使用RabbitMQ實現(xiàn)松耦合設(shè)計
4、RabbitMQ消息處理
a)RabbitMQ的消息持久化處理
1、消息的可靠性是RabbitMQ的一部分,那么RabbitMQ是如何保證消息可靠性的呢---消息持久化
2、autoDelete
@Queue:當所有消費客戶端鏈接斷開后,是否自動刪除隊列隊列;true:刪除,false:不刪除
@Exchange:當所有綁定隊列都不在使用時,是否自動刪除交換機;true:刪除 false:不刪除
3、消息確認ACK機制
ACK機制是消費者從RabbitMQ收到消息并處理完成后,反饋給RabbitMQ,RabbitMQ收到反饋才將此消息從隊列中刪除
5、Rabbitmq的消息確認機制(事務+confirm)
a)在rabbitmq中,可以通過持久化數(shù)據(jù),解決rabbitmq服務器異常的數(shù)據(jù)丟失問題
b)問題:生產(chǎn)者將消息發(fā)送出去之后,消息到底有沒有達到rabbitmq服務器,默認是不知道的
解決(兩種方式):
1、AMQP實現(xiàn)事務機制
2、Confirm
c)事務機制(txSelect,txCommit,txRollback)
1、txSelect:用戶將當前channel設(shè)置成transation模式
2、txCommit:用于提交事務
3、txRollback:回滾事務
關(guān)于PHP中怎么實現(xiàn)消息隊列MQ就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。