溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

think-queue的示例分析

發(fā)布時(shí)間:2021-07-27 10:56:21 來(lái)源:億速云 閱讀:150 作者:小新 欄目:編程語(yǔ)言

這篇文章主要介紹了think-queue的示例分析,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

前言

分析之前請(qǐng)大家務(wù)必了解消息隊(duì)列的實(shí)現(xiàn)

tp5的消息隊(duì)列是基于database redis 和tp官方自己實(shí)現(xiàn)的 Topthink
本章是圍繞redis來(lái)做分析

存儲(chǔ)key:

key類型描述
queues:queueNamelist要執(zhí)行的任務(wù)
think:queue:restartstring重啟隊(duì)列時(shí)間戳
queues:queueName:delayedzSet延遲任務(wù)
queues:queueName:reservedzSet執(zhí)行失敗,等待重新執(zhí)行

執(zhí)行命令

work和listen的區(qū)別在下面會(huì)解釋
命令描述
php think queue:work監(jiān)聽(tīng)隊(duì)列
php think queue:listen監(jiān)聽(tīng)隊(duì)列
php think queue:restart重啟隊(duì)列
php think queue:subscribe暫無(wú),可能是保留的 官方有什么其他想法但是還沒(méi)實(shí)現(xiàn)

行為標(biāo)簽

標(biāo)簽描述
worker_daemon_start守護(hù)進(jìn)程開(kāi)啟
worker_memory_exceeded內(nèi)存超出
worker_queue_restart重啟守護(hù)進(jìn)程
worker_before_process任務(wù)開(kāi)始執(zhí)行之前
worker_before_sleep任務(wù)延遲執(zhí)行
queue_failed任務(wù)執(zhí)行失敗

命令參數(shù)

參數(shù)默認(rèn)值可以使用的模式描述
queuenullwork,listen要執(zhí)行的任務(wù)名稱
daemonnullwork以守護(hù)進(jìn)程執(zhí)行任務(wù)
delay0work,listen失敗后重新執(zhí)行的時(shí)間
forcenullwork失敗后重新執(zhí)行的時(shí)間
memory128Mwork,listen限制最大內(nèi)存
sleep3work,listen沒(méi)有任務(wù)的時(shí)候等待的時(shí)間
tries0work,listen任務(wù)失敗后最大嘗試次數(shù)

模式區(qū)別

1: 執(zhí)行原理不同
work: 單進(jìn)程的處理模式;
無(wú) daemon 參數(shù) work進(jìn)程在處理完下一個(gè)消息后直接結(jié)束當(dāng)前進(jìn)程。當(dāng)不存在新消息時(shí),會(huì)sleep一段時(shí)間然后退出;
有 daemon 參數(shù) work進(jìn)程會(huì)循環(huán)地處理隊(duì)列中的消息,直到內(nèi)存超出參數(shù)配置才結(jié)束進(jìn)程。當(dāng)不存在新消息時(shí),會(huì)在每次循環(huán)中sleep一段時(shí)間;

listen: 父進(jìn)程 + 子進(jìn)程 的處理模式;
會(huì)在所在的父進(jìn)程會(huì)創(chuàng)建一個(gè)單次執(zhí)行模式的work子進(jìn)程,并通過(guò)該work子進(jìn)程來(lái)處理隊(duì)列中的下一個(gè)消息,當(dāng)這個(gè)work子進(jìn)程退出之后;
所在的父進(jìn)程會(huì)監(jiān)聽(tīng)到該子進(jìn)程的退出信號(hào),并重新創(chuàng)建一個(gè)新的單次執(zhí)行的work子進(jìn)程;

2: 退出時(shí)機(jī)不同
work: 看上面
listen: 所在的父進(jìn)程正常情況會(huì)一直運(yùn)行,除非遇到下面兩種情況
01: 創(chuàng)建的某個(gè)work子進(jìn)程的執(zhí)行時(shí)間超過(guò)了 listen命令行中的--timeout 參數(shù)配置;此時(shí)work子進(jìn)程會(huì)被強(qiáng)制結(jié)束,listen所在的父進(jìn)程也會(huì)拋出一個(gè) ProcessTimeoutException 異常并退出;

開(kāi)發(fā)者可以選擇捕獲該異常,讓父進(jìn)程繼續(xù)執(zhí)行;
02: 所在的父進(jìn)程因某種原因存在內(nèi)存泄露,則當(dāng)父進(jìn)程本身占用的內(nèi)存超過(guò)了命令行中的 --memory 參數(shù)配置時(shí),父子進(jìn)程均會(huì)退出。正常情況下,listen進(jìn)程本身占用的內(nèi)存是穩(wěn)定不變的。

3: 性能不同
work: 是在腳本內(nèi)部做循環(huán),框架腳本在命令執(zhí)行的初期就已加載完畢;

listen: 是處理完一個(gè)任務(wù)之后新開(kāi)一個(gè)work進(jìn)程,此時(shí)會(huì)重新加載框架腳本;

因此 work 模式的性能會(huì)比listen模式高。
注意: 當(dāng)代碼有更新時(shí),work 模式下需要手動(dòng)去執(zhí)行 php think queue:restart 命令重啟隊(duì)列來(lái)使改動(dòng)生效;而listen 模式會(huì)自動(dòng)生效,無(wú)需其他操作。

4: 超時(shí)控制能力
work: 本質(zhì)上既不能控制進(jìn)程自身的運(yùn)行時(shí)間,也無(wú)法限制執(zhí)行中的任務(wù)的執(zhí)行時(shí)間;
listen: 可以限制其創(chuàng)建的work子進(jìn)程的超時(shí)時(shí)間;

可通過(guò) timeout 參數(shù)限制work子進(jìn)程允許運(yùn)行的最長(zhǎng)時(shí)間,超過(guò)該時(shí)間限制仍未結(jié)束的子進(jìn)程會(huì)被強(qiáng)制結(jié)束;
expire 和time的區(qū)別

expire 在配置文件中設(shè)置,指任務(wù)的過(guò)期時(shí)間 這個(gè)時(shí)間是全局的,影響到所有的work進(jìn)程
timeout 在命令行參數(shù)中設(shè)置,指work子進(jìn)程的超時(shí)時(shí)間,這個(gè)時(shí)間只對(duì)當(dāng)前執(zhí)行的listen 命令有效,timeout 針對(duì)的對(duì)象是 work 子進(jìn)程;

5: 使用場(chǎng)景不同

work 適用場(chǎng)景是:
01: 任務(wù)數(shù)量較多
02: 性能要求較高
03: 任務(wù)的執(zhí)行時(shí)間較短
04: 消費(fèi)者類中不存在死循環(huán),sleep() ,exit() ,die() 等容易導(dǎo)致bug的邏輯

listen 適用場(chǎng)景是:

01: 任務(wù)數(shù)量較少
02: 任務(wù)的執(zhí)行時(shí)間較長(zhǎng)
03: 任務(wù)的執(zhí)行時(shí)間需要有嚴(yán)格限制

公有操作

由于我們是根據(jù)redis來(lái)做分析 所以只需要分析src/queue/connector/redis.php
01: 首先調(diào)用 src/Queue.php中的魔術(shù)方法 __callStatic
02: 在__callStatic方法中調(diào)用了 buildConnector
03: buildConnector 中首先加載配置文件 如果無(wú)將是同步執(zhí)行
04: 根據(jù)配置文件去創(chuàng)建連接并且傳入配置

在redis.php類的構(gòu)造方法中的操作:
01: 檢測(cè)redis擴(kuò)展是否安裝
02: 合并配置
03: 檢測(cè)是redis擴(kuò)展還是 pRedis
04: 創(chuàng)建連接對(duì)象

發(fā)布過(guò)程

發(fā)布參數(shù)

參數(shù)名默認(rèn)值描述可以使用的方法
$job無(wú)要執(zhí)行任務(wù)的類push,later
$data任務(wù)數(shù)據(jù)push,later
$queuedefault任務(wù)名稱push,later
$delaynull延遲時(shí)間later

立即執(zhí)行

    push($job, $data, $queue)
    Queue::push(Test::class, ['id' => 1], 'test');

一頓騷操作后返回一個(gè)數(shù)組 并且序列化后 rPush到redis中 key為 queue:queueName
數(shù)組結(jié)構(gòu):

[
    'job' => $job, // 要執(zhí)行任務(wù)的類
    'data' => $data, // 任務(wù)數(shù)據(jù)
    'id'=>'xxxxx' //任務(wù)id
]

寫(xiě)入 redis并且返回隊(duì)列id
至于中間的那頓騷操作太長(zhǎng)了就沒(méi)寫(xiě)

延遲發(fā)布

    later($delay, $job, $data, $queue)
    Queue::later(100, Test::class, ['id' => 1], 'test');

跟上面的差不多
一頓騷操作后返回一個(gè)數(shù)組 并且序列化后 zAdd 到redis中 key為 queue:queueName:delayed score為當(dāng)前的時(shí)間戳+$delay

執(zhí)行過(guò)程

執(zhí)行過(guò)程有work模式和listen模式 兩種 區(qū)別上面已經(jīng)說(shuō)了 代碼邏輯由于太多等下回分解;
最后講一下標(biāo)簽的使用

    //守護(hù)進(jìn)程開(kāi)啟
    'worker_daemon_start' => [
        \app\index\behavior\WorkerDaemonStart::class
    ],
    //內(nèi)存超出
    'worker_memory_exceeded' => [
        \app\index\behavior\WorkerMemoryExceeded::class
    ],
    //重啟守護(hù)進(jìn)程
    'worker_queue_restart' => [
        \app\index\behavior\WorkerQueueRestart::class
    ],
    //任務(wù)開(kāi)始執(zhí)行之前
    'worker_before_process' => [
        \app\index\behavior\WorkerBeforeProcess::class
    ],
    //任務(wù)延遲執(zhí)行
    'worker_before_sleep' => [
        \app\index\behavior\WorkerBeforeSleep::class
    ],
    //任務(wù)執(zhí)行失敗
    'queue_failed' => [
        \app\index\behavior\QueueFailed::class
    ]

think-queue的示例分析

public function run(Output $output)
    {
        $output->write('<info>任務(wù)執(zhí)行失敗</info>', true);
    }

控制臺(tái)執(zhí)行 php think queue:work --queue test --daemon
會(huì)在控制臺(tái)一次輸出

守護(hù)進(jìn)程開(kāi)啟
任務(wù)延遲執(zhí)行

失敗的處理 如果有任務(wù)執(zhí)行失敗或者執(zhí)行次數(shù)達(dá)到最大值
會(huì)觸發(fā) queue_failed

app\index\behavior@run方法里面寫(xiě)失敗的邏輯 比如郵件通知 寫(xiě)入日志等

最后我們來(lái)說(shuō)一下如何在其他框架或者項(xiàng)目中給tp的項(xiàng)目推送消息隊(duì)列,例如兩個(gè)項(xiàng)目是分開(kāi)的 另一個(gè)使用的卻不是tp5的框架

在其他項(xiàng)目中推任務(wù)

php版本

<?php

class Index
{
    private $redis = null;

    public function __construct()
    {
        $this->redis = new Redis();
        $this->redis->connect('127.0.0.1', 6379);
        $this->redis->select(10);
    }

    public function push($job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->rPush('queues:' . $queue, $payload);
    }

    public function later($delay, $job, $data, $queue)
    {
        $payload = $this->createPayload($job, $data);
        $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload);
    }

    private function createPayload($job, $data)
    {
        $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32));
        return $this->setMeta($payload, 'attempts', 1);
    }

    private function setMeta($payload, $key, $value)
    {
        $payload = json_decode($payload, true);
        $payload[$key] = $value;
        $payload = json_encode($payload);

        if (JSON_ERROR_NONE !== json_last_error()) {
            throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg());
        }

        return $payload;
    }

    private function random(int $length = 16): string
    {
        $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
        $randomString = '';
        for ($i = 0; $i < $length; $i++) {
            $randomString .= $str[rand(0, strlen($str) - 1)];
        }
        return $randomString;
    }
}

(new Index())->later(10, 'app\index\jobs\Test', ['id' => 1], 'test');

go版本

package main

import (
    "encoding/json"
    "github.com/garyburd/redigo/redis"
    "math/rand"
    "time"
)

type Payload struct {
    Id       string      `json:"id"`
    Job      string      `json:"job"`
    Data     interface{} `json:"data"`
    Attempts int         `json:"attempts"`
}

var RedisClient *redis.Pool

func init() {
    RedisClient = &redis.Pool{
        MaxIdle:     20,
        MaxActive:   500,
        IdleTimeout: time.Second * 100,
        Dial: func() (conn redis.Conn, e error) {
            c, err := redis.Dial("tcp", "127.0.0.1:6379")

            if err != nil {
                return nil, err
            }

            _, _ = c.Do("SELECT", 10)

            return c, nil
        },
    }

}

func main() {

    var data = make(map[string]interface{})
    data["id"] = "1"

    later(10, "app\\index\\jobs\\Test", data, "test")
}

func push(job string, data interface{}, queue string) {
    payload := createPayload(job, data)
    queueName := "queues:" + queue

    _, _ = RedisClient.Get().Do("rPush", queueName, payload)
}

func later(delay int, job string, data interface{}, queue string) {

    m, _ := time.ParseDuration("+1s")
    currentTime := time.Now()
    op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix()
    createPayload(job, data)
    payload := createPayload(job, data)
    queueName := "queues:" + queue + ":delayed"

    _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload)
}

// 創(chuàng)建指定格式的數(shù)據(jù)
func createPayload(job string, data interface{}) (payload string) {
    payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1}

    jsonStr, _ := json.Marshal(payload1)

    return string(jsonStr)
}

// 創(chuàng)建隨機(jī)字符串
func random(n int) string {

    var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

    b := make([]rune, n)
    for i := range b {
        b[i] = str[rand.Intn(len(str))]
    }
    return string(b)
}

感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“think-queue的示例分析”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來(lái)學(xué)習(xí)!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI