溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

thinkphp6使用think-queue怎么實(shí)現(xiàn)普通隊(duì)列和延遲隊(duì)列

發(fā)布時(shí)間:2022-04-20 13:37:22 來源:億速云 閱讀:651 作者:zzz 欄目:編程語言

本文小編為大家詳細(xì)介紹“thinkphp6使用think-queue怎么實(shí)現(xiàn)普通隊(duì)列和延遲隊(duì)列”,內(nèi)容詳細(xì),步驟清晰,細(xì)節(jié)處理妥當(dāng),希望這篇“thinkphp6使用think-queue怎么實(shí)現(xiàn)普通隊(duì)列和延遲隊(duì)列”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學(xué)習(xí)新知識(shí)吧。

###TP6 隊(duì)列

TP6 中使用 think-queue 可以實(shí)現(xiàn)普通隊(duì)列和延遲隊(duì)列。

think-queue 是thinkphp 官方提供的一個(gè)消息隊(duì)列服務(wù),它支持消息隊(duì)列的一些基本特性:

  • 消息的發(fā)布,獲取,執(zhí)行,刪除,重發(fā),失敗處理,延遲執(zhí)行,超時(shí)控制等

  • 隊(duì)列的多隊(duì)列, 內(nèi)存限制 ,啟動(dòng),停止,守護(hù)等

  • 消息隊(duì)列可降級(jí)為同步執(zhí)行

消息隊(duì)列實(shí)現(xiàn)過程

1、通過生產(chǎn)者推送消息到消息隊(duì)列服務(wù)中

2、消息隊(duì)列服務(wù)將收到的消息存入redis隊(duì)列中(zset)

3、消費(fèi)者進(jìn)行監(jiān)聽隊(duì)列,當(dāng)監(jiān)聽到隊(duì)列有新的消息時(shí),獲取隊(duì)列第一條

4、處理獲取下來的消息調(diào)用業(yè)務(wù)類進(jìn)行處理相關(guān)業(yè)務(wù)

5、業(yè)務(wù)處理后,需要從隊(duì)列中刪除消息

composer 安裝 think-queue

composer require topthink/think-queue

配置文件

安裝完 think-queue 后會(huì)在 config 目錄中生成 queue.php,這個(gè)文件是隊(duì)列的配置文件。

tp6中提供了多種消息隊(duì)列的實(shí)現(xiàn)方式,默認(rèn)使用sync,我這里選擇使用Redis。

return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => env('redis.host', '127.0.0.1'),
            'port'       => env('redis.port', '6379'),
            'password'   => env('redis.password','123456'),
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

創(chuàng)建目錄及隊(duì)列消費(fèi)類文件

在 app 目錄下創(chuàng)建 queue 目錄,然后在該目錄下新建一個(gè)抽象類 Queue.php 文件,作為基礎(chǔ)類

<?phpnamespace app\queue;use think\facade\Cache;use think\queue\Job;use think\facade\Log;/**
 * Class Queue 隊(duì)列消費(fèi)基礎(chǔ)類
 * @package app\queue
 */abstract class Queue{
    /**
     * @describe:fire是消息隊(duì)列默認(rèn)調(diào)用的方法
     * @param \think\queue\Job $job
     * @param $message
     */
    public function fire(Job $job, $data)
    {
        if (empty($data)) {
            Log::error(sprintf('[%s][%s] 隊(duì)列無消息', __CLASS__, __FUNCTION__));
            return ;
        }

        $jobId = $job->getJobId(); // 隊(duì)列的數(shù)據(jù)庫id或者redis key
        // $jobClassName = $job->getName(); // 隊(duì)列對(duì)象類
        // $queueName = $job->getQueue(); // 隊(duì)列名稱

        // 如果已經(jīng)執(zhí)行中或者執(zhí)行完成就不再執(zhí)行了
        if (!$this->checkJob($jobId, $data)) {
            $job->delete();
            Cache::store('redis')->delete($jobId);
            return ;
        }

        // 執(zhí)行業(yè)務(wù)處理
        if ($this->execute($data)) {
            Log::record(sprintf('[%s][%s] 隊(duì)列執(zhí)行成功', __CLASS__, __FUNCTION__));
            $job->delete(); // 任務(wù)執(zhí)行成功后刪除
            Cache::store('redis')->delete($jobId); // 刪除redis中的緩存
        } else {
            // 檢查任務(wù)重試次數(shù)
            if ($job->attempts() > 3) {
                Log::error(sprintf('[%s][%s] 隊(duì)列執(zhí)行重試次數(shù)超過3次,執(zhí)行失敗', __CLASS__, __FUNCTION__));
                 // 第1種處理方式:重新發(fā)布任務(wù),該任務(wù)延遲10秒后再執(zhí)行;也可以不指定秒數(shù)立即執(zhí)行
                //$job->release(10); 
                // 第2種處理方式:原任務(wù)的基礎(chǔ)上1分鐘執(zhí)行一次并增加嘗試次數(shù)
                //$job->failed();   
                // 第3種處理方式:刪除任務(wù)
                $job->delete(); // 任務(wù)執(zhí)行后刪除
                Cache::store('redis')->delete($jobId); // 刪除redis中的緩存
            }
        }
    }

    /**
     * 消息在到達(dá)消費(fèi)者時(shí)可能已經(jīng)不需要執(zhí)行了
     * @param  string  $jobId
     * @param $message
     * @return bool 任務(wù)執(zhí)行的結(jié)果
     * @throws \Psr\SimpleCache\InvalidArgumentException
     */
    protected function checkJob(string $jobId, $message): bool
    {
        // 查詢r(jià)edis
        $data = Cache::store('redis')->get($jobId);
        if (!empty($data)) {
            return false;
        }
        Cache::store('redis')->set($jobId, $message);
        return true;
    }

    /**
     * @describe: 根據(jù)消息中的數(shù)據(jù)進(jìn)行實(shí)際的業(yè)務(wù)處理
     * @param $data 數(shù)據(jù)
     * @return bool 返回結(jié)果
     */
    abstract protected function execute($data): bool;}

所有真正的消費(fèi)類繼承基礎(chǔ)抽象類

<?phpnamespace app\queue\test;use app\queue\Queue;class Test extends Queue{
    protected function execute($data): bool
    {
       // 具體消費(fèi)業(yè)務(wù)邏輯
    }}

生產(chǎn)者邏輯

use think\facade\Queue;

// 普通隊(duì)列生成調(diào)用方式
Queue::push($job, $data, $queueName);
// 例:
Queue::push(Test::class, $data, $queueName);

// 延時(shí)隊(duì)列生成調(diào)用方式
Queue::later($delay, $job, $data, $queueName);
// 例如使用延時(shí)隊(duì)列 10 秒后執(zhí)行:
Queue::later(10 , Test::class, $data, $queueName);

開啟進(jìn)程監(jiān)聽任務(wù)并執(zhí)行

php think queue:listen
php think queue:work

命令模式介紹

命令模式
  • queue:work 命令

    work 命令: 該命令將啟動(dòng)一個(gè) work 進(jìn)程來處理消息隊(duì)列。

    php think queue:work --queue TestQueue
  • queue:listen 命令

    listen 命令: 該命令將會(huì)創(chuàng)建一個(gè) listen 父進(jìn)程 ,然后由父進(jìn)程通過 proc_open(‘php think queue:work’) 的方式來創(chuàng)建一個(gè)work 子 進(jìn)程來處理消息隊(duì)列,且限制該work進(jìn)程的執(zhí)行時(shí)間。

    php think queue:listen --queue TestQueue
命令行參數(shù)
  • Work 模式

    php think queue:work \
    --daemon            //是否循環(huán)執(zhí)行,如果不加該參數(shù),則該命令處理完下一個(gè)消息就退出
    --queue  helloJobQueue  //要處理的隊(duì)列的名稱
    --delay  0 \        //如果本次任務(wù)執(zhí)行拋出異常且任務(wù)未被刪除時(shí),設(shè)置其下次執(zhí)行前延遲多少秒,默認(rèn)為0
    --force  \          //系統(tǒng)處于維護(hù)狀態(tài)時(shí)是否仍然處理任務(wù),并未找到相關(guān)說明
    --memory 128 \      //該進(jìn)程允許使用的內(nèi)存上限,以 M 為單位
    --sleep  3 \        //如果隊(duì)列中無任務(wù),則sleep多少秒后重新檢查(work+daemon模式)或者退出(listen或非daemon模式)
    --tries  2          //如果任務(wù)已經(jīng)超過嘗試次數(shù)上限,則觸發(fā)‘任務(wù)嘗試次數(shù)超限’事件,默認(rèn)為0
  • Listen 模式

    php think queue:listen \
    --queue  helloJobQueue \   //監(jiān)聽的隊(duì)列的名稱
    --delay  0 \         //如果本次任務(wù)執(zhí)行拋出異常且任務(wù)未被刪除時(shí),設(shè)置其下次執(zhí)行前延遲多少秒,默認(rèn)為0
    --memory 128 \       //該進(jìn)程允許使用的內(nèi)存上限,以 M 為單位
    --sleep  3 \         //如果隊(duì)列中無任務(wù),則多長(zhǎng)時(shí)間后重新檢查,daemon模式下有效
    --tries  0 \         //如果任務(wù)已經(jīng)超過重發(fā)次數(shù)上限,則進(jìn)入失敗處理邏輯,默認(rèn)為0
    --timeout 60         //創(chuàng)建的work子進(jìn)程的允許執(zhí)行的最長(zhǎng)時(shí)間,以秒為單位

    可以看到 listen 模式下,不包含 --deamon 參數(shù),原因下面會(huì)說明

  • 消息隊(duì)列的開始,停止與重啟

    • 開始一個(gè)消息隊(duì)列:

      php think queue:work
    • 停止所有的消息隊(duì)列:

      php think queue:restart
    • 重啟所有的消息隊(duì)列:

      php think queue:restart 
      php think queue:work

讀到這里,這篇“thinkphp6使用think-queue怎么實(shí)現(xiàn)普通隊(duì)列和延遲隊(duì)列”文章已經(jīng)介紹完畢,想要掌握這篇文章的知識(shí)點(diǎn)還需要大家自己動(dòng)手實(shí)踐使用過才能領(lǐng)會(huì),如果想了解更多相關(guān)內(nèi)容的文章,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI