溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RabbitMQ怎么在php中使用

發(fā)布時間:2021-06-12 17:37:32 來源:億速云 閱讀:559 作者:Leah 欄目:編程語言

本篇文章為大家展示了RabbitMQ怎么在php中使用,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

什么是隊列

消息隊列(Message Queue)是一種應用間的通信方式,消息發(fā)送后可以立即返回。消息使用者再從MQ中取消息進行邏輯處理。對于消耗較大的請求,可以立馬返回處理結果。減少服務器壓力。為各個子系統(tǒng)之間解耦和異步處理。

rabbitmq的整體結構

RabbitMQ怎么在php中使用

rmq簡單來說就是一個(生產(chǎn)/消費)的模型結構。消息生產(chǎn)者把數(shù)據(jù)丟到隊列中,消息消費者從隊列中取出數(shù)據(jù)進行邏輯處理。

那么如何確保,生產(chǎn)者添加的數(shù)據(jù),能夠到達指定的隊列中呢?

rmq(消息隊列)主要提供了三個概念(中間件?)來確保消息的分發(fā)。Exchange(交換機)、RoutingKey(路由)、Queue(隊列)。從上面的圖也可以看出來。 處理消息的接收、分發(fā),主要在Broker模塊中。

Exchange 所有生產(chǎn)消息的入口都是到交換機這里。exchange通過進來的路由(RoutingKey),去和已binding的規(guī)則進行匹配,找到指定的隊列。

RoutingKey 我的理解,這里相當于一把鑰匙。而binding的操作相當于一把鎖頭。

Queue 消息的存放區(qū)域,等到消費者來取。

Binding Exchange和Queue之間的一個綁定。

從這些概念來看,影響規(guī)則的主要是依賴Exchange。那rmq提供了哪些類型,都有什么特點呢?

exchange類型

RabbitMQ提供了四種Exchange類型

direct

fanout

topic

header

header類型在實際使用中較少,所以在這里就不進行說明。

Direct Exchange

RabbitMQ怎么在php中使用

direct 的規(guī)則比較簡單。在發(fā)布消息前,需要把exchange和queue做一個綁定。 如果發(fā)布消息的時候,RoutingKey 和綁定的值(key)一致。則將消息投遞到該隊列中。如果不存在對應的隊列,則消息會被丟棄。 (這時候訪問rmq管理web時??梢钥吹较⑦M來,但是隊列中沒有值)

Fanout Exchange

RabbitMQ怎么在php中使用

fanout 類型則更簡單一些。 只要exchange和隊列做了綁定。發(fā)布的消息都會到隊列中去。

Topic Exchange

RabbitMQ怎么在php中使用

相對來說 topic類型要復雜一些。 和direct類型相比。topic相當于模糊匹配,而direct為全等。類似mysql中 ‘like’關鍵詞。

針對direct 類型寫一個實例

實例分兩部分 生產(chǎn)者、消費者(回調函數(shù))

因為我的代碼,對mq的部分做了封裝,懶得拆分出來。 所以我只貼業(yè)務代碼和封裝的核心方法。

生產(chǎn)者代碼

$mqModel = new Rabbitmq();   // 初始化(rmq連接操作)
$newResult = ['tom','bill','jack'];
if ($mqModel) {
    $mqRoute = 'push_data_to_crm_routing';  // 路由
    $mqExchange = 'push_data_to_crm_exchange';  // 交換機
    $mqQuery = 'push_data_to_crm_queue';  // 隊列
    // 建立連接,設置交換機,設置隊列
    $mqModel->setChannel()->setExchange($mqExchange,AMQP_EX_TYPE_DIRECT,AMQP_DURABLE)->setQueue($mqQuery,AMQP_DURABLE,$mqExchange,$mqRoute);
    foreach ($newResult as $k => $v){
        $push_data = $v;
		$mqModel->publishMessage($push_data,$mqRoute); // 消息推送
	}
}

消費者代碼

$mqModel = new Rabbitmq();
// $mqRoute = 'push_data_to_crm_routing'; 消費者用不上路由,因為不需要指定。 只要想取隊列,消費即可。 
$mqExchange = 'push_data_to_crm_exchange';
$mqQuery = 'push_data_to_crm_queue';
$mqModel->setChannel()->setExchange($mqExchange,'', AMQP_PASSIVE)->setQueue($mqQuery, AMQP_PASSIVE);
$zmq->consume(function($msg){
    var_dump($msg);
    return true;
});

封裝類中的核心方法

//設置交換機
public function setExchange($changeName = '', $changeType = '', $flags = false) {	
	$errorMsg = '';
	try{
		if(!$this->channel){
			throw new \AMQPQueueException("Error channel on method setExchange", 1);
		}
		$this->exchange = new \AMQPExchange($this->channel);
		if($changeName){
			$this->changeName = $changeName; // 交換機名稱
			$this->exchange->setName($changeName); // 設置名稱
			$changeType = $changeType ? $changeType : AMQP_EX_TYPE_DIRECT;  // 交換機類型
		}else{
			$this->changeName = '';
		}
		if($changeType){
			$this->changeType = $changeType;
			$this->exchange->settype($changeType);  // 設置交換機類型
		}else{
			$this->changeType = '';
		}
		if($flags){
			$this->exchange->setFlags($flags);  //交換機標志
		}
		if($changeType || $flags){
			$this->exchange->declareExchange();  // 創(chuàng)建
		}
	} catch(AMQPQueueException $ex) {
		$errorMsg = "AMQPQueueException error exchange: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n";
	} catch(Exception $ex) {
		$errorMsg = "Exception error exchange:  {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n";
	}
	if($errorMsg){
		throw new Exception($errorMsg, 1);
	}
	return $this;
}

// 設置隊列
public function setQueue($queueName = '', $flags = '', $exchange_name = '', $routing_key = '', $arguments=[] ){
	$errorMsg = '';
	try{
		if(!$this->channel){
			throw new \AMQPQueueException("Error channel on method setQueue", 1);
		}
		$this->queue = new \AMQPQueue($this->channel);
		if(!$queueName){
			return false;
		}
		$this->queueName = $queueName;  // 隊列名稱
		$this->queue->setName($queueName);
		if($flags){
			$this->queue->setFlags($flags);  // 隊列標志。與消息持久化有關。 這篇文字不涉及這一塊的說明
		}
        if(is_array($arguments) && !empty($arguments)){
            $this->queue->setArguments($arguments);  // 參數(shù)配置
        }
		$this->queue->declareQueue();  // 創(chuàng)建一個隊列
		$exchange_name = $exchange_name === false ? 
						'' : 
						($exchange_name === true || !$exchange_name ? $this->changeName : $exchange_name);

		$routing_key = $routing_key ? $routing_key : $this->queueName;

		if($exchange_name && $routing_key ){
			$this->queue->bind($exchange_name, $routing_key);  // 交換機和隊列的綁定操作
		}

	} catch(AMQPQueueException $ex) {
		$errorMsg = "AMQPQueueException error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n";
	} catch(Exception $ex) {
		$errorMsg = "Exception error queue:  {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n";
	}

	if($errorMsg){
		throw new Exception($errorMsg, 1);
	}

	return $this;
}

// 發(fā)布消息
public function publishMessage($message = '', $routing_key = '', $flags = AMQP_NOPARAM, $attributes = []){
	if(!$message){
		return false;
	}
	$routing_key = $routing_key ? $routing_key : $this->queueName;
	// 發(fā)布消息,帶有路由key。如果需要,則會用于關聯(lián)。
	$this->exchange->publish($message, $routing_key, $flags, $attributes);
	return true;
}

// 消費
public function consume($callback = null, $qos = 0, $isAct = true){
	if($qos){
		$this->channel->qos(0, $qos);
	}
	$errorMsg = '';
	try{
		if(!$this->queue){
			throw new \AMQPQueueException("Error queue on method consume", 1);
		}
		$this->callBackFnc = $callback;
		$this->isAct = $isAct;
		$callback = function($envelope, $queue){
						if(is_callable($this->callBackFnc)){
							call_user_func($this->callBackFnc, $envelope->getBody());
							if($this->isAct){
								$queue->ack($envelope->getDeliveryTag());
							}else{
								$queue->nack($envelope->getDeliveryTag());
							}
						}
					};
		$this->queue->consume($callback);  
	} catch(AMQPQueueException $ex) {
		$errorMsg = "AMQPQueueException error queue: {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n";
	} catch(Exception $ex) {
		$errorMsg = "Exception error queue:  {$ex->getMessage()},\r\nline: {$ex->getLine()}\r\n";
	}
	if($errorMsg){
		throw new \Exception($errorMsg, 1);
	}
}

因為封裝代碼里寫了很多 try catch 所以看起來特別亂。 還有部分兼容的邏輯。 看起來不舒服,就先刪掉再看吧。

執(zhí)行結果

先跑一遍生產(chǎn)者代碼,這里可以用瀏覽器直接訪問。 執(zhí)行完了之后。 到rabbitmq 的web管理頁面中查看。 發(fā)現(xiàn)消息已經(jīng)正常添加到隊列中。(web管理頁面可查詢別的文章開啟)

RabbitMQ怎么在php中使用

這時候再執(zhí)行消費者代碼。 消費者代碼需要在cli下執(zhí)行。因為消費者為輪詢等待,是死循環(huán),無法在瀏覽器下執(zhí)行。

RabbitMQ怎么在php中使用

上述內容就是RabbitMQ怎么在php中使用,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業(yè)資訊頻道。

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內容。

AI