溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

PHP中如何實(shí)現(xiàn)基于Redis的MessageQueue隊(duì)列封裝

發(fā)布時(shí)間:2021-06-03 11:13:24 來(lái)源:億速云 閱讀:188 作者:小新 欄目:web開(kāi)發(fā)

小編給大家分享一下PHP中如何實(shí)現(xiàn)基于Redis的MessageQueue隊(duì)列封裝,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!

Redis的鏈表List可以用來(lái)做鏈表,高并發(fā)的特性非常適合做分布式的并行消息傳遞。

左進(jìn)右出

$redis->lPush($key, $value);
$redis->rPop($key);

以下程序已在生產(chǎn)環(huán)境中正式使用。

基于Redis的PHP消息隊(duì)列封裝

<?php
/**
 * Created by PhpStorm.
 * User: huyanping
 * Date: 14-8-19
 * Time: 下午12:10
 *
 * 基于Redis的消息隊(duì)列封裝
 */
namespace Zebra\MessageQueue;
class RedisMessageQueue implements IMessageQueue
{
  protected $redis_server;
  protected $server;
  protected $port;
  /**
   * @var 消息隊(duì)列標(biāo)志
   */
  protected $key;
  /**
   * 構(gòu)造隊(duì)列,創(chuàng)建redis鏈接
   * @param $server_config
   * @param $key
   * @param bool $p_connect
   */
  public function __construct($server_config = array('IP' => '127.0.0.1', 'PORT' => '6379'), $key = 'redis_message_queue', $p_connect = false)
  {
    if (empty($key))
      throw new \Exception('message queue key can not be empty');
    $this->server = $server_config['IP'];
    $this->port = $server_config['PORT'];
    $this->key = $key;
    $this->check_environment();
    if ($p_connect) {
      $this->pconnect();
    } else {
      $this->connect();
    }
  }
  /**
   * 析構(gòu)函數(shù),關(guān)閉redis鏈接,使用長(zhǎng)連接時(shí),最好主動(dòng)調(diào)用關(guān)閉
   */
  public function __destruct()
  {
    $this->close();
  }
  /**
   * 短連接
   */
  private function connect()
  {
    $this->redis_server = new \Redis();
    $this->redis_server->connect($this->server, $this->port);
  }
  /**
   * 長(zhǎng)連接
   */
  public function pconnect()
  {
    $this->redis_server = new \Redis();
    $this->redis_server->pconnect($this->server, $this->port);
  }
  /**
   * 關(guān)閉鏈接
   */
  public function close()
  {
    $this->redis_server->close();
  }
  /**
   * 向隊(duì)列插入一條信息
   * @param $message
   * @return mixed
   */
  public function put($message)
  {
    return $this->redis_server->lPush($this->key, $message);
  }
  /**
   * 向隊(duì)列中插入一串信息
   * @param $message
   * @return mixed
   */
  public function puts(){
    $params = func_get_args();
    $message_array = array_merge(array($this->key), $params);
    return call_user_func_array(array($this->redis_server, 'lPush'), $message_array);
  }
  /**
   * 從隊(duì)列頂部獲取一條記錄
   * @return mixed
   */
  public function get()
  {
    return $this->redis_server->lPop($this->key);
  }
  /**
   * 選擇數(shù)據(jù)庫(kù),可以用于區(qū)分不同隊(duì)列
   * @param $database
   */
  public function select($database)
  {
    $this->redis_server->select($database);
  }
  /**
   * 獲得隊(duì)列狀態(tài),即目前隊(duì)列中的消息數(shù)量
   * @return mixed
   */
  public function size()
  {
    return $this->redis_server->lSize($this->key);
  }
  /**
   * 獲取某一位置的值,不會(huì)刪除該位置的值
   * @param $pos
   * @return mixed
   */
  public function view($pos)
  {
    return $this->redis_server->lGet($this->key, $pos);
  }
  /**
   * 檢查Redis擴(kuò)展
   * @throws Exception
   */
  protected function check_environment()
  {
    if (!\extension_loaded('redis')) {
      throw new \Exception('Redis extension not loaded');
    }
  }
}

如果需要一次寫(xiě)入多個(gè)隊(duì)列,可以使用如下調(diào)用方式:

<?php
$redis = new RedisMessageQueue();
$redis->puts(1, 2, 3, 4);
$redis->puts(5, 6, 7, 8, 9);

模仿HTTPSQS輸出結(jié)果的封裝如下,提供了寫(xiě)入位置和讀取位置記錄的功能:

<?php
/**
 * Created by PhpStorm.
 * User: huyanping
 * Date: 14-9-5
 * Time: 下午2:16
 *
 * 附加了隊(duì)列狀態(tài)信息的RedisMessageQueue
 */
namespace Zebra\MessageQueue;
class RedisMessageQueueStatus extends RedisMessageQueue {
  protected $record_status;
  protected $put_position;
  protected $get_position;
  public function __construct(
    $server_config = array('IP' => '127.0.0.1', 'PORT' => '6379'),
    $key = 'redis_message_queue',
    $p_connect = false,
    $record_status=true
  ){
    parent::__construct($server_config, $key, $p_connect);
    $this->record_status = $record_status;
    $this->put_position = $this->key . '_put_position';
    $this->get_position = $this->key . '_get_position';
  }
  public function get(){
    if($queue = parent::get()){
      $incr_result = $this->redis_server->incr($this->get_position);
      if(!$incr_result) throw new \Exception('can not mark get position,please check the redis server');
      return $queue;
    }else{
      return false;
    }
  }
  public function put($message){
    if(parent::put($message)){
      $incr_result = $this->redis_server->incr($this->put_position);
      if(!$incr_result) throw new \Exception('can not mark put position,please check the redis server');
      return true;
    }else{
      return false;
    }
  }
  public function puts_status(){
    $message_array = func_get_args();
    $result = call_user_func_array(array($this, 'puts'), $message_array);
    if($result){
      $this->redis_server->incrBy($this->put_position, count($message_array));
      return true;
    }
    return false;
  }
  public function size(){
    return $this->redis_server->lSize($this->key);
  }
  public function status(){
    $status['put_position'] = ($put_position = $this->redis_server->get($this->put_position)) ? $put_position : 0;
    $status['get_position'] = ($get_position = $this->redis_server->get($this->get_position)) ? $get_position : 0;
    $status['unread_queue'] = $this->size();
    $status['queue_name'] = $this->key;
    $status['server'] = $this->server;
    $status['port'] = $this->port;
    return $status;
  }
  public function status_normal(){
    $status = $this->status();
    $message = 'Redis Message Queue' . PHP_EOL;
    $message .= '-------------------' . PHP_EOL;
    $message .= 'Message queue name:' . $status['queue_name'] . PHP_EOL;
    $message .= 'Put position of queue:' . $status['put_position'] . PHP_EOL;
    $message .= 'Get position of queue:' . $status['get_position'] . PHP_EOL;
    $message .= 'Number of unread queue:' . $status['unread_queue'] . PHP_EOL;
    return $message;
  }
  public function status_json(){
    return \json_encode($this->status());
  }
}

看完了這篇文章,相信你對(duì)“PHP中如何實(shí)現(xiàn)基于Redis的MessageQueue隊(duì)列封裝”有了一定的了解,如果想了解更多相關(guān)知識(shí),歡迎關(guān)注億速云行業(yè)資訊頻道,感謝各位的閱讀!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI