您好,登錄后才能下訂單哦!
本篇文章為大家展示了RabbitMQ怎么在php中使用,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
什么是隊列
消息隊列(Message Queue)是一種應用間的通信方式,消息發(fā)送后可以立即返回。消息使用者再從MQ中取消息進行邏輯處理。對于消耗較大的請求,可以立馬返回處理結果。減少服務器壓力。為各個子系統(tǒng)之間解耦和異步處理。
rabbitmq的整體結構
rmq簡單來說就是一個(生產(chǎn)/消費)的模型結構。消息生產(chǎn)者把數(shù)據(jù)丟到隊列中,消息消費者從隊列中取出數(shù)據(jù)進行邏輯處理。
那么如何確保,生產(chǎn)者添加的數(shù)據(jù),能夠到達指定的隊列中呢?
rmq(消息隊列)主要提供了三個概念(中間件?)來確保消息的分發(fā)。Exchange(交換機)、RoutingKey(路由)、Queue(隊列)。從上面的圖也可以看出來。 處理消息的接收、分發(fā),主要在Broker模塊中。
Exchange 所有生產(chǎn)消息的入口都是到交換機這里。exchange通過進來的路由(RoutingKey),去和已binding的規(guī)則進行匹配,找到指定的隊列。
RoutingKey 我的理解,這里相當于一把鑰匙。而binding的操作相當于一把鎖頭。
Queue 消息的存放區(qū)域,等到消費者來取。
Binding Exchange和Queue之間的一個綁定。
從這些概念來看,影響規(guī)則的主要是依賴Exchange。那rmq提供了哪些類型,都有什么特點呢?
exchange類型
RabbitMQ提供了四種Exchange類型
direct
fanout
topic
header
header類型在實際使用中較少,所以在這里就不進行說明。
Direct Exchange
direct 的規(guī)則比較簡單。在發(fā)布消息前,需要把exchange和queue做一個綁定。 如果發(fā)布消息的時候,RoutingKey 和綁定的值(key)一致。則將消息投遞到該隊列中。如果不存在對應的隊列,則消息會被丟棄。 (這時候訪問rmq管理web時??梢钥吹较⑦M來,但是隊列中沒有值)
Fanout Exchange
fanout 類型則更簡單一些。 只要exchange和隊列做了綁定。發(fā)布的消息都會到隊列中去。
Topic Exchange
相對來說 topic類型要復雜一些。 和direct類型相比。topic相當于模糊匹配,而direct為全等。類似mysql中 ‘like’關鍵詞。
針對direct 類型寫一個實例
實例分兩部分 生產(chǎn)者、消費者(回調函數(shù))
因為我的代碼,對mq的部分做了封裝,懶得拆分出來。 所以我只貼業(yè)務代碼和封裝的核心方法。
生產(chǎn)者代碼
$mqModel = new Rabbitmq(); // 初始化(rmq連接操作) $newResult = ['tom','bill','jack']; if ($mqModel) { $mqRoute = 'push_data_to_crm_routing'; // 路由 $mqExchange = 'push_data_to_crm_exchange'; // 交換機 $mqQuery = 'push_data_to_crm_queue'; // 隊列 // 建立連接,設置交換機,設置隊列 $mqModel->setChannel()->setExchange($mqExchange,AMQP_EX_TYPE_DIRECT,AMQP_DURABLE)->setQueue($mqQuery,AMQP_DURABLE,$mqExchange,$mqRoute); foreach ($newResult as $k => $v){ $push_data = $v; $mqModel->publishMessage($push_data,$mqRoute); // 消息推送 } }
消費者代碼
$mqModel = new Rabbitmq(); // $mqRoute = 'push_data_to_crm_routing'; 消費者用不上路由,因為不需要指定。 只要想取隊列,消費即可。 $mqExchange = 'push_data_to_crm_exchange'; $mqQuery = 'push_data_to_crm_queue'; $mqModel->setChannel()->setExchange($mqExchange,'', AMQP_PASSIVE)->setQueue($mqQuery, AMQP_PASSIVE); $zmq->consume(function($msg){ var_dump($msg); return true; });
封裝類中的核心方法
//設置交換機 public function setExchange($changeName = '', $changeType = '', $flags = false) { $errorMsg = ''; try{ if(!$this->channel){ throw new \AMQPQueueException("Error channel on method setExchange", 1); } $this->exchange = new \AMQPExchange($this->channel); if($changeName){ $this->changeName = $changeName; // 交換機名稱 $this->exchange->setName($changeName); // 設置名稱 $changeType = $changeType ? $changeType : AMQP_EX_TYPE_DIRECT; // 交換機類型 }else{ $this->changeName = ''; } if($changeType){ $this->changeType = $changeType; $this->exchange->settype($changeType); // 設置交換機類型 }else{ $this->changeType = ''; } if($flags){ $this->exchange->setFlags($flags); //交換機標志 } if($changeType || $flags){ $this->exchange->declareExchange(); // 創(chuàng)建 } } catch(AMQPQueueException $ex) { $errorMsg = "AMQPQueueException error exchange: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } catch(Exception $ex) { $errorMsg = "Exception error exchange: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } if($errorMsg){ throw new Exception($errorMsg, 1); } return $this; } // 設置隊列 public function setQueue($queueName = '', $flags = '', $exchange_name = '', $routing_key = '', $arguments=[] ){ $errorMsg = ''; try{ if(!$this->channel){ throw new \AMQPQueueException("Error channel on method setQueue", 1); } $this->queue = new \AMQPQueue($this->channel); if(!$queueName){ return false; } $this->queueName = $queueName; // 隊列名稱 $this->queue->setName($queueName); if($flags){ $this->queue->setFlags($flags); // 隊列標志。與消息持久化有關。 這篇文字不涉及這一塊的說明 } if(is_array($arguments) && !empty($arguments)){ $this->queue->setArguments($arguments); // 參數(shù)配置 } $this->queue->declareQueue(); // 創(chuàng)建一個隊列 $exchange_name = $exchange_name === false ? '' : ($exchange_name === true || !$exchange_name ? $this->changeName : $exchange_name); $routing_key = $routing_key ? $routing_key : $this->queueName; if($exchange_name && $routing_key ){ $this->queue->bind($exchange_name, $routing_key); // 交換機和隊列的綁定操作 } } catch(AMQPQueueException $ex) { $errorMsg = "AMQPQueueException error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } catch(Exception $ex) { $errorMsg = "Exception error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } if($errorMsg){ throw new Exception($errorMsg, 1); } return $this; } // 發(fā)布消息 public function publishMessage($message = '', $routing_key = '', $flags = AMQP_NOPARAM, $attributes = []){ if(!$message){ return false; } $routing_key = $routing_key ? $routing_key : $this->queueName; // 發(fā)布消息,帶有路由key。如果需要,則會用于關聯(lián)。 $this->exchange->publish($message, $routing_key, $flags, $attributes); return true; } // 消費 public function consume($callback = null, $qos = 0, $isAct = true){ if($qos){ $this->channel->qos(0, $qos); } $errorMsg = ''; try{ if(!$this->queue){ throw new \AMQPQueueException("Error queue on method consume", 1); } $this->callBackFnc = $callback; $this->isAct = $isAct; $callback = function($envelope, $queue){ if(is_callable($this->callBackFnc)){ call_user_func($this->callBackFnc, $envelope->getBody()); if($this->isAct){ $queue->ack($envelope->getDeliveryTag()); }else{ $queue->nack($envelope->getDeliveryTag()); } } }; $this->queue->consume($callback); } catch(AMQPQueueException $ex) { $errorMsg = "AMQPQueueException error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } catch(Exception $ex) { $errorMsg = "Exception error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n"; } if($errorMsg){ throw new \Exception($errorMsg, 1); } }
因為封裝代碼里寫了很多 try catch 所以看起來特別亂。 還有部分兼容的邏輯。 看起來不舒服,就先刪掉再看吧。
執(zhí)行結果
先跑一遍生產(chǎn)者代碼,這里可以用瀏覽器直接訪問。 執(zhí)行完了之后。 到rabbitmq 的web管理頁面中查看。 發(fā)現(xiàn)消息已經(jīng)正常添加到隊列中。(web管理頁面可查詢別的文章開啟)
這時候再執(zhí)行消費者代碼。 消費者代碼需要在cli下執(zhí)行。因為消費者為輪詢等待,是死循環(huán),無法在瀏覽器下執(zhí)行。
上述內容就是RabbitMQ怎么在php中使用,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業(yè)資訊頻道。
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。