您好,登錄后才能下訂單哦!
小編給大家分享一下PHP中如何實(shí)現(xiàn)基于Redis的MessageQueue隊(duì)列封裝,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!
Redis的鏈表List可以用來(lái)做鏈表,高并發(fā)的特性非常適合做分布式的并行消息傳遞。
左進(jìn)右出
$redis->lPush($key, $value); $redis->rPop($key);
以下程序已在生產(chǎn)環(huán)境中正式使用。
基于Redis的PHP消息隊(duì)列封裝
<?php /** * Created by PhpStorm. * User: huyanping * Date: 14-8-19 * Time: 下午12:10 * * 基于Redis的消息隊(duì)列封裝 */ namespace Zebra\MessageQueue; class RedisMessageQueue implements IMessageQueue { protected $redis_server; protected $server; protected $port; /** * @var 消息隊(duì)列標(biāo)志 */ protected $key; /** * 構(gòu)造隊(duì)列,創(chuàng)建redis鏈接 * @param $server_config * @param $key * @param bool $p_connect */ public function __construct($server_config = array('IP' => '127.0.0.1', 'PORT' => '6379'), $key = 'redis_message_queue', $p_connect = false) { if (empty($key)) throw new \Exception('message queue key can not be empty'); $this->server = $server_config['IP']; $this->port = $server_config['PORT']; $this->key = $key; $this->check_environment(); if ($p_connect) { $this->pconnect(); } else { $this->connect(); } } /** * 析構(gòu)函數(shù),關(guān)閉redis鏈接,使用長(zhǎng)連接時(shí),最好主動(dòng)調(diào)用關(guān)閉 */ public function __destruct() { $this->close(); } /** * 短連接 */ private function connect() { $this->redis_server = new \Redis(); $this->redis_server->connect($this->server, $this->port); } /** * 長(zhǎng)連接 */ public function pconnect() { $this->redis_server = new \Redis(); $this->redis_server->pconnect($this->server, $this->port); } /** * 關(guān)閉鏈接 */ public function close() { $this->redis_server->close(); } /** * 向隊(duì)列插入一條信息 * @param $message * @return mixed */ public function put($message) { return $this->redis_server->lPush($this->key, $message); } /** * 向隊(duì)列中插入一串信息 * @param $message * @return mixed */ public function puts(){ $params = func_get_args(); $message_array = array_merge(array($this->key), $params); return call_user_func_array(array($this->redis_server, 'lPush'), $message_array); } /** * 從隊(duì)列頂部獲取一條記錄 * @return mixed */ public function get() { return $this->redis_server->lPop($this->key); } /** * 選擇數(shù)據(jù)庫(kù),可以用于區(qū)分不同隊(duì)列 * @param $database */ public function select($database) { $this->redis_server->select($database); } /** * 獲得隊(duì)列狀態(tài),即目前隊(duì)列中的消息數(shù)量 * @return mixed */ public function size() { return $this->redis_server->lSize($this->key); } /** * 獲取某一位置的值,不會(huì)刪除該位置的值 * @param $pos * @return mixed */ public function view($pos) { return $this->redis_server->lGet($this->key, $pos); } /** * 檢查Redis擴(kuò)展 * @throws Exception */ protected function check_environment() { if (!\extension_loaded('redis')) { throw new \Exception('Redis extension not loaded'); } } }
如果需要一次寫(xiě)入多個(gè)隊(duì)列,可以使用如下調(diào)用方式:
<?php $redis = new RedisMessageQueue(); $redis->puts(1, 2, 3, 4); $redis->puts(5, 6, 7, 8, 9);
模仿HTTPSQS輸出結(jié)果的封裝如下,提供了寫(xiě)入位置和讀取位置記錄的功能:
<?php /** * Created by PhpStorm. * User: huyanping * Date: 14-9-5 * Time: 下午2:16 * * 附加了隊(duì)列狀態(tài)信息的RedisMessageQueue */ namespace Zebra\MessageQueue; class RedisMessageQueueStatus extends RedisMessageQueue { protected $record_status; protected $put_position; protected $get_position; public function __construct( $server_config = array('IP' => '127.0.0.1', 'PORT' => '6379'), $key = 'redis_message_queue', $p_connect = false, $record_status=true ){ parent::__construct($server_config, $key, $p_connect); $this->record_status = $record_status; $this->put_position = $this->key . '_put_position'; $this->get_position = $this->key . '_get_position'; } public function get(){ if($queue = parent::get()){ $incr_result = $this->redis_server->incr($this->get_position); if(!$incr_result) throw new \Exception('can not mark get position,please check the redis server'); return $queue; }else{ return false; } } public function put($message){ if(parent::put($message)){ $incr_result = $this->redis_server->incr($this->put_position); if(!$incr_result) throw new \Exception('can not mark put position,please check the redis server'); return true; }else{ return false; } } public function puts_status(){ $message_array = func_get_args(); $result = call_user_func_array(array($this, 'puts'), $message_array); if($result){ $this->redis_server->incrBy($this->put_position, count($message_array)); return true; } return false; } public function size(){ return $this->redis_server->lSize($this->key); } public function status(){ $status['put_position'] = ($put_position = $this->redis_server->get($this->put_position)) ? $put_position : 0; $status['get_position'] = ($get_position = $this->redis_server->get($this->get_position)) ? $get_position : 0; $status['unread_queue'] = $this->size(); $status['queue_name'] = $this->key; $status['server'] = $this->server; $status['port'] = $this->port; return $status; } public function status_normal(){ $status = $this->status(); $message = 'Redis Message Queue' . PHP_EOL; $message .= '-------------------' . PHP_EOL; $message .= 'Message queue name:' . $status['queue_name'] . PHP_EOL; $message .= 'Put position of queue:' . $status['put_position'] . PHP_EOL; $message .= 'Get position of queue:' . $status['get_position'] . PHP_EOL; $message .= 'Number of unread queue:' . $status['unread_queue'] . PHP_EOL; return $message; } public function status_json(){ return \json_encode($this->status()); } }
看完了這篇文章,相信你對(duì)“PHP中如何實(shí)現(xiàn)基于Redis的MessageQueue隊(duì)列封裝”有了一定的了解,如果想了解更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。