您好,登錄后才能下訂單哦!
這篇文章主要介紹了think-queue的示例分析,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
分析之前請(qǐng)大家務(wù)必了解消息隊(duì)列的實(shí)現(xiàn)
tp5的消息隊(duì)列是基于database redis 和tp官方自己實(shí)現(xiàn)的 Topthink
本章是圍繞redis來(lái)做分析
key | 類型 | 描述 |
---|---|---|
queues:queueName | list | 要執(zhí)行的任務(wù) |
think:queue:restart | string | 重啟隊(duì)列時(shí)間戳 |
queues:queueName:delayed | zSet | 延遲任務(wù) |
queues:queueName:reserved | zSet | 執(zhí)行失敗,等待重新執(zhí)行 |
work和listen的區(qū)別在下面會(huì)解釋
命令 | 描述 |
---|---|
php think queue:work | 監(jiān)聽(tīng)隊(duì)列 |
php think queue:listen | 監(jiān)聽(tīng)隊(duì)列 |
php think queue:restart | 重啟隊(duì)列 |
php think queue:subscribe | 暫無(wú),可能是保留的 官方有什么其他想法但是還沒(méi)實(shí)現(xiàn) |
標(biāo)簽 | 描述 |
---|---|
worker_daemon_start | 守護(hù)進(jìn)程開(kāi)啟 |
worker_memory_exceeded | 內(nèi)存超出 |
worker_queue_restart | 重啟守護(hù)進(jìn)程 |
worker_before_process | 任務(wù)開(kāi)始執(zhí)行之前 |
worker_before_sleep | 任務(wù)延遲執(zhí)行 |
queue_failed | 任務(wù)執(zhí)行失敗 |
參數(shù) | 默認(rèn)值 | 可以使用的模式 | 描述 |
---|---|---|---|
queue | null | work,listen | 要執(zhí)行的任務(wù)名稱 |
daemon | null | work | 以守護(hù)進(jìn)程執(zhí)行任務(wù) |
delay | 0 | work,listen | 失敗后重新執(zhí)行的時(shí)間 |
force | null | work | 失敗后重新執(zhí)行的時(shí)間 |
memory | 128M | work,listen | 限制最大內(nèi)存 |
sleep | 3 | work,listen | 沒(méi)有任務(wù)的時(shí)候等待的時(shí)間 |
tries | 0 | work,listen | 任務(wù)失敗后最大嘗試次數(shù) |
1: 執(zhí)行原理不同
work: 單進(jìn)程的處理模式;
無(wú) daemon 參數(shù) work進(jìn)程在處理完下一個(gè)消息后直接結(jié)束當(dāng)前進(jìn)程。當(dāng)不存在新消息時(shí),會(huì)sleep一段時(shí)間然后退出;
有 daemon 參數(shù) work進(jìn)程會(huì)循環(huán)地處理隊(duì)列中的消息,直到內(nèi)存超出參數(shù)配置才結(jié)束進(jìn)程。當(dāng)不存在新消息時(shí),會(huì)在每次循環(huán)中sleep一段時(shí)間;
listen: 父進(jìn)程 + 子進(jìn)程 的處理模式;
會(huì)在所在的父進(jìn)程會(huì)創(chuàng)建一個(gè)單次執(zhí)行模式的work子進(jìn)程,并通過(guò)該work子進(jìn)程來(lái)處理隊(duì)列中的下一個(gè)消息,當(dāng)這個(gè)work子進(jìn)程退出之后;
所在的父進(jìn)程會(huì)監(jiān)聽(tīng)到該子進(jìn)程的退出信號(hào),并重新創(chuàng)建一個(gè)新的單次執(zhí)行的work子進(jìn)程;
2: 退出時(shí)機(jī)不同
work: 看上面
listen: 所在的父進(jìn)程正常情況會(huì)一直運(yùn)行,除非遇到下面兩種情況
01: 創(chuàng)建的某個(gè)work子進(jìn)程的執(zhí)行時(shí)間超過(guò)了 listen命令行中的--timeout 參數(shù)配置;此時(shí)work子進(jìn)程會(huì)被強(qiáng)制結(jié)束,listen所在的父進(jìn)程也會(huì)拋出一個(gè) ProcessTimeoutException 異常并退出;
開(kāi)發(fā)者可以選擇捕獲該異常,讓父進(jìn)程繼續(xù)執(zhí)行;
02: 所在的父進(jìn)程因某種原因存在內(nèi)存泄露,則當(dāng)父進(jìn)程本身占用的內(nèi)存超過(guò)了命令行中的 --memory 參數(shù)配置時(shí),父子進(jìn)程均會(huì)退出。正常情況下,listen進(jìn)程本身占用的內(nèi)存是穩(wěn)定不變的。
3: 性能不同
work: 是在腳本內(nèi)部做循環(huán),框架腳本在命令執(zhí)行的初期就已加載完畢;
listen: 是處理完一個(gè)任務(wù)之后新開(kāi)一個(gè)work進(jìn)程,此時(shí)會(huì)重新加載框架腳本;
因此 work 模式的性能會(huì)比listen模式高。
注意: 當(dāng)代碼有更新時(shí),work 模式下需要手動(dòng)去執(zhí)行 php think queue:restart 命令重啟隊(duì)列來(lái)使改動(dòng)生效;而listen 模式會(huì)自動(dòng)生效,無(wú)需其他操作。
4: 超時(shí)控制能力
work: 本質(zhì)上既不能控制進(jìn)程自身的運(yùn)行時(shí)間,也無(wú)法限制執(zhí)行中的任務(wù)的執(zhí)行時(shí)間;
listen: 可以限制其創(chuàng)建的work子進(jìn)程的超時(shí)時(shí)間;
可通過(guò) timeout 參數(shù)限制work子進(jìn)程允許運(yùn)行的最長(zhǎng)時(shí)間,超過(guò)該時(shí)間限制仍未結(jié)束的子進(jìn)程會(huì)被強(qiáng)制結(jié)束;
expire 和time的區(qū)別
expire 在配置文件中設(shè)置,指任務(wù)的過(guò)期時(shí)間 這個(gè)時(shí)間是全局的,影響到所有的work進(jìn)程
timeout 在命令行參數(shù)中設(shè)置,指work子進(jìn)程的超時(shí)時(shí)間,這個(gè)時(shí)間只對(duì)當(dāng)前執(zhí)行的listen 命令有效,timeout 針對(duì)的對(duì)象是 work 子進(jìn)程;
5: 使用場(chǎng)景不同
work 適用場(chǎng)景是:
01: 任務(wù)數(shù)量較多
02: 性能要求較高
03: 任務(wù)的執(zhí)行時(shí)間較短
04: 消費(fèi)者類中不存在死循環(huán),sleep() ,exit() ,die() 等容易導(dǎo)致bug的邏輯
listen 適用場(chǎng)景是:
01: 任務(wù)數(shù)量較少
02: 任務(wù)的執(zhí)行時(shí)間較長(zhǎng)
03: 任務(wù)的執(zhí)行時(shí)間需要有嚴(yán)格限制
由于我們是根據(jù)redis來(lái)做分析 所以只需要分析src/queue/connector/redis.php
01: 首先調(diào)用src/Queue.php
中的魔術(shù)方法__callStatic
02: 在__callStatic方法中調(diào)用了buildConnector
03: buildConnector 中首先加載配置文件 如果無(wú)將是同步執(zhí)行
04: 根據(jù)配置文件去創(chuàng)建連接并且傳入配置
在redis.php類的構(gòu)造方法中的操作:
01: 檢測(cè)redis擴(kuò)展是否安裝
02: 合并配置
03: 檢測(cè)是redis擴(kuò)展還是 pRedis
04: 創(chuàng)建連接對(duì)象
參數(shù)名 | 默認(rèn)值 | 描述 | 可以使用的方法 |
---|---|---|---|
$job | 無(wú) | 要執(zhí)行任務(wù)的類 | push,later |
$data | 空 | 任務(wù)數(shù)據(jù) | push,later |
$queue | default | 任務(wù)名稱 | push,later |
$delay | null | 延遲時(shí)間 | later |
push($job, $data, $queue) Queue::push(Test::class, ['id' => 1], 'test');
一頓騷操作后返回一個(gè)數(shù)組 并且序列化后 rPush到redis中 key為 queue:queueName
數(shù)組結(jié)構(gòu):
[ 'job' => $job, // 要執(zhí)行任務(wù)的類 'data' => $data, // 任務(wù)數(shù)據(jù) 'id'=>'xxxxx' //任務(wù)id ]
寫(xiě)入 redis并且返回隊(duì)列id
至于中間的那頓騷操作太長(zhǎng)了就沒(méi)寫(xiě)
later($delay, $job, $data, $queue) Queue::later(100, Test::class, ['id' => 1], 'test');
跟上面的差不多
一頓騷操作后返回一個(gè)數(shù)組 并且序列化后 zAdd 到redis中 key為 queue:queueName:delayed
score為當(dāng)前的時(shí)間戳+$delay
執(zhí)行過(guò)程有work模式和listen模式 兩種 區(qū)別上面已經(jīng)說(shuō)了 代碼邏輯由于太多等下回分解;
最后講一下標(biāo)簽的使用
//守護(hù)進(jìn)程開(kāi)啟 'worker_daemon_start' => [ \app\index\behavior\WorkerDaemonStart::class ], //內(nèi)存超出 'worker_memory_exceeded' => [ \app\index\behavior\WorkerMemoryExceeded::class ], //重啟守護(hù)進(jìn)程 'worker_queue_restart' => [ \app\index\behavior\WorkerQueueRestart::class ], //任務(wù)開(kāi)始執(zhí)行之前 'worker_before_process' => [ \app\index\behavior\WorkerBeforeProcess::class ], //任務(wù)延遲執(zhí)行 'worker_before_sleep' => [ \app\index\behavior\WorkerBeforeSleep::class ], //任務(wù)執(zhí)行失敗 'queue_failed' => [ \app\index\behavior\QueueFailed::class ]
public function run(Output $output) { $output->write('<info>任務(wù)執(zhí)行失敗</info>', true); }
控制臺(tái)執(zhí)行 php think queue:work --queue test --daemon
會(huì)在控制臺(tái)一次輸出
守護(hù)進(jìn)程開(kāi)啟 任務(wù)延遲執(zhí)行
失敗的處理 如果有任務(wù)執(zhí)行失敗或者執(zhí)行次數(shù)達(dá)到最大值
會(huì)觸發(fā) queue_failed
在app\index\behavior@run
方法里面寫(xiě)失敗的邏輯 比如郵件通知 寫(xiě)入日志等
最后我們來(lái)說(shuō)一下如何在其他框架或者項(xiàng)目中給tp的項(xiàng)目推送消息隊(duì)列,例如兩個(gè)項(xiàng)目是分開(kāi)的 另一個(gè)使用的卻不是tp5的框架
<?php class Index { private $redis = null; public function __construct() { $this->redis = new Redis(); $this->redis->connect('127.0.0.1', 6379); $this->redis->select(10); } public function push($job, $data, $queue) { $payload = $this->createPayload($job, $data); $this->redis->rPush('queues:' . $queue, $payload); } public function later($delay, $job, $data, $queue) { $payload = $this->createPayload($job, $data); $this->redis->zAdd('queues:' . $queue . ':delayed', time() + $delay, $payload); } private function createPayload($job, $data) { $payload = $this->setMeta(json_encode(['job' => $job, 'data' => $data]), 'id', $this->random(32)); return $this->setMeta($payload, 'attempts', 1); } private function setMeta($payload, $key, $value) { $payload = json_decode($payload, true); $payload[$key] = $value; $payload = json_encode($payload); if (JSON_ERROR_NONE !== json_last_error()) { throw new InvalidArgumentException('Unable to create payload: ' . json_last_error_msg()); } return $payload; } private function random(int $length = 16): string { $str = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'; $randomString = ''; for ($i = 0; $i < $length; $i++) { $randomString .= $str[rand(0, strlen($str) - 1)]; } return $randomString; } } (new Index())->later(10, 'app\index\jobs\Test', ['id' => 1], 'test');
package main import ( "encoding/json" "github.com/garyburd/redigo/redis" "math/rand" "time" ) type Payload struct { Id string `json:"id"` Job string `json:"job"` Data interface{} `json:"data"` Attempts int `json:"attempts"` } var RedisClient *redis.Pool func init() { RedisClient = &redis.Pool{ MaxIdle: 20, MaxActive: 500, IdleTimeout: time.Second * 100, Dial: func() (conn redis.Conn, e error) { c, err := redis.Dial("tcp", "127.0.0.1:6379") if err != nil { return nil, err } _, _ = c.Do("SELECT", 10) return c, nil }, } } func main() { var data = make(map[string]interface{}) data["id"] = "1" later(10, "app\\index\\jobs\\Test", data, "test") } func push(job string, data interface{}, queue string) { payload := createPayload(job, data) queueName := "queues:" + queue _, _ = RedisClient.Get().Do("rPush", queueName, payload) } func later(delay int, job string, data interface{}, queue string) { m, _ := time.ParseDuration("+1s") currentTime := time.Now() op := currentTime.Add(time.Duration(time.Duration(delay) * m)).Unix() createPayload(job, data) payload := createPayload(job, data) queueName := "queues:" + queue + ":delayed" _, _ = RedisClient.Get().Do("zAdd", queueName, op, payload) } // 創(chuàng)建指定格式的數(shù)據(jù) func createPayload(job string, data interface{}) (payload string) { payload1 := &Payload{Job: job, Data: data, Id: random(32), Attempts: 1} jsonStr, _ := json.Marshal(payload1) return string(jsonStr) } // 創(chuàng)建隨機(jī)字符串 func random(n int) string { var str = []rune("0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") b := make([]rune, n) for i := range b { b[i] = str[rand.Intn(len(str))] } return string(b) }
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“think-queue的示例分析”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來(lái)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。