溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Redis 實(shí)現(xiàn)隊(duì)列原理的實(shí)例詳解

發(fā)布時(shí)間:2020-08-25 11:58:41 來(lái)源:腳本之家 閱讀:145 作者:吃嘛嘛香 欄目:數(shù)據(jù)庫(kù)

Redis 實(shí)現(xiàn)隊(duì)列原理的實(shí)例詳解

場(chǎng)景說(shuō)明:

·用于處理比較耗時(shí)的請(qǐng)求,例如批量發(fā)送郵件,如果直接在網(wǎng)頁(yè)觸發(fā)執(zhí)行發(fā)送,程序會(huì)出現(xiàn)超時(shí)

·高并發(fā)場(chǎng)景,當(dāng)某個(gè)時(shí)刻請(qǐng)求瞬間增加時(shí),可以把請(qǐng)求寫(xiě)入到隊(duì)列,后臺(tái)在去處理這些請(qǐng)求

·搶購(gòu)場(chǎng)景,先入先出的模式

命令:

rpush + blpop 或 lpush + brpop

rpush : 往列表右側(cè)推入數(shù)據(jù)

blpop : 客戶端阻塞直到隊(duì)列有值輸出

簡(jiǎn)單隊(duì)列:

simple.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {  $redis->rPush('goods:task', json_encode($row));} $redis->close();

獲取20000萬(wàn)個(gè)商品,并把json化后的數(shù)據(jù)推入goods:task隊(duì)列

queueBlpop.php

// 出隊(duì)
while (true) {  
// 阻塞設(shè)置超時(shí)時(shí)間為3秒  
$task = $redis->blPop(array('goods:task'), 3); 
  if ($task) { 
    $redis->rPush('goods:success:task', $task[1]);
    $task = json_decode($task[1], true);
    echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success';
    echo PHP_EOL; 
  } else {
    echo 'nothing' . PHP_EOL;    sleep(5);  
 }
}

設(shè)置blpop阻塞時(shí)間為3秒,當(dāng)有數(shù)據(jù)出隊(duì)時(shí)保存到goods:success:task表示執(zhí)行成功,當(dāng)隊(duì)列沒(méi)有數(shù)據(jù)時(shí),程序睡眠10秒重新檢查goods:task是否有數(shù)據(jù)出隊(duì)

cli 模式執(zhí)行命令:

php simple.phpphp queueBlpop.php

優(yōu)先級(jí)隊(duì)列

思路:

blpop 有多個(gè)鍵時(shí),blpop會(huì)從左至右遍歷鍵,一旦一個(gè)鍵能彈出元素,客戶端立即返回。例如:

blpop key1 key2 key3 key4

從key1到key4遍歷,如果哪個(gè)key有值,則彈出這個(gè)值,若多個(gè)key同時(shí)有值時(shí),優(yōu)先彈出排在左邊的key。

priority.php

// 設(shè)置優(yōu)先級(jí)隊(duì)列
$high = 'goods:high:task';
$mid = 'goods:mid:task';
$low = 'goods:low:task';
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');
$stmt->execute();
while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {  
// cid 小于100放在低級(jí)隊(duì)列  
if ($row['cid'] < 100) {
    $redis->rPush($low, json_encode($row));  

// cid 100到600之間放在中級(jí)隊(duì)列  
}elseif ($row['cid'] > 100 && $row['cid'] < 600) {
    $redis->rPush($mid, json_encode($row));
}  
// cid 大于600放在高級(jí)隊(duì)列   else {    $redis->rPush($high, json_encode($row));  }
}$redis->close();

priorityBlop.php

// 優(yōu)先級(jí)隊(duì)列
$high = 'goods:high:task';
$mid = 'goods:mid:task';$low = 'goods:low:task';
// 出隊(duì)
while(true){  // 優(yōu)先級(jí)高的隊(duì)列放在左側(cè)  
  $task = $redis->blPop(array($high, $mid, $low), 3);
  if ($task) {
    $task = json_decode($task[1], true);
    echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success';
    echo PHP_EOL;
  } else {
    echo 'nothing' . PHP_EOL;    sleep(5);
  }
}

優(yōu)先級(jí)高的隊(duì)列放在blpop命令左側(cè),依次排序,blpop命令會(huì)依次彈出high, mid, low隊(duì)列的值

cli 模式執(zhí)行命令:

php priority.phpphp priorityBlpop.php

延遲隊(duì)列

思路:

可以用一個(gè)有序集合來(lái)保存延遲任務(wù),member保存任務(wù)內(nèi)容,score保存(當(dāng)前時(shí)間 + 延時(shí)時(shí)間)。用時(shí)間作為score。程序只要用有序集合的第一條任務(wù)的score和當(dāng)前時(shí)間做比較,如果當(dāng)前時(shí)間比score小,說(shuō)明有序集合的所有任務(wù)還沒(méi)到執(zhí)行時(shí)間。

delay.php
$stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) {  $redis->zAdd('goods:delay:task', time() + rand(1, 300), json_encode($row));}

將20萬(wàn)條任務(wù)導(dǎo)入有序集合goods:delay:task,所有任務(wù)延遲到之后的1秒到300秒內(nèi)執(zhí)行

delayHandle.php

while (true) {// 因?yàn)槭怯行蚣?,只要判斷第一條記錄的延時(shí)時(shí)間,例如第一條未到執(zhí)行時(shí)間    // 相對(duì)說(shuō)明集合的其他任務(wù)未到執(zhí)行時(shí)間  

$rs = $redis->zRange('goods:delay:task', 0, 0, true);

// 集合沒(méi)有任務(wù),睡眠時(shí)間設(shè)置為5秒  

  if (empty($rs)) {    
        echo 'no tasks , sleep 5 seconds' . PHP_EOL;sleep(5);continue;}
   $taskJson = key($rs);  
       $delay = $rs[$taskJson];  
       $task = json_decode($taskJson, true);  
   $now = time();// 到時(shí)間執(zhí)行延時(shí)任務(wù)  
   if ($delay <= $now) {    

// 對(duì)當(dāng)前任務(wù)加鎖,避免移動(dòng)移動(dòng)延時(shí)任務(wù)到任務(wù)隊(duì)列時(shí)被其他客戶端修改      

  if (!($identifier = acquireLock($task['id']))) {     
    continue;}    

// 移動(dòng)延時(shí)任務(wù)到任務(wù)隊(duì)列      

$redis->zRem('goods:delay:task', $taskJson);    
$redis->rPush('goods:task', $taskJson);    
echo $task['id'] . ' run ' . PHP_EOL;    

// 釋放鎖      

releaseLock($task['id'], $identifier);  } 
  else {    

// 延時(shí)任務(wù)未到執(zhí)行時(shí)間      

 $sleep = $delay - $now;    

// 最大值設(shè)置為2秒,保證如果有新的任務(wù)(延時(shí)時(shí)間1秒)進(jìn)入集合時(shí)能夠及時(shí)的被處理

  $sleep = $sleep > 2 ? 2 :$sleep;    
  echo 'wait ' . $sleep . ' seconds ' . PHP_EOL;    sleep($sleep);  
  }
}

這個(gè)文件對(duì)有序集合內(nèi)的延遲任務(wù)做處理,如果延遲任務(wù)到了執(zhí)行時(shí)間,則把延遲任務(wù)移動(dòng)到任務(wù)隊(duì)列中

queueBlpop.php
// 出隊(duì)while (true) {  
// 阻塞設(shè)置超時(shí)時(shí)間為3秒  
 $task = $redis->blPop(array('goods:task'), 3);  
if ($task) {    
$redis->rPush('goods:success:task', $task[1]);    
 $task = json_decode($task[1], true);    
echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success';    
echo PHP_EOL;  } else {    
echo 'nothing' . PHP_EOL;sleep(5);  
   }
}



處理任務(wù)隊(duì)列中的任務(wù)

cli模式下執(zhí)行命令:

php delay.phpphp delayHanlde.phpphp queueBlpop.php

如有疑問(wèn)請(qǐng)留言或者到本站社區(qū)交流討論,感謝閱讀,希望能幫助到大家,謝謝大家對(duì)本站的支持!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI