tp5緩存設(shè)置為redis做消息隊列的示例:
1.根據(jù)選擇的存儲方式,在\application\extra\queue.php這個配置文件中,添加消息隊列對應(yīng)的驅(qū)動配置,例如:
return [
'connector' => 'Redis', // Redis 驅(qū)動
'expire' => 60, // 任務(wù)的過期時間,默認(rèn)為60秒; 若要禁用,則設(shè)置為 null
'default' => 'default', // 默認(rèn)的隊列名稱
'host' => '127.0.0.1', // redis 主機ip
'port' => 6379, // redis 端口
'password' => '', // redis 密碼
'select' => 1, // 使用哪一個 db,默認(rèn)為 db0
'timeout' => 0, // redis連接的超時時間
'persistent' => false, // 是否是長連接
// 'connector' => 'Database', // 數(shù)據(jù)庫驅(qū)動
// 'expire' => 60, // 任務(wù)的過期時間,默認(rèn)為60秒; 若要禁用,則設(shè)置為 null
// 'default' => 'default', // 默認(rèn)的隊列名稱
// 'table' => 'jobs', // 存儲消息的表名,不帶前綴
// 'dsn' => [],
// 'connector' => 'Topthink', // ThinkPHP內(nèi)部的隊列通知服務(wù)平臺 ,本文不作介紹
// 'token' => '',
// 'project_id' => '',
// 'protocol' => 'https',
// 'host' => 'qns.topthink.com',
// 'port' => 443,
// 'api_version' => 1,
// 'max_retries' => 3,
// 'default' => 'default',
// 'connector' => 'Sync', // Sync 驅(qū)動,該驅(qū)動的實際作用是取消消息隊列,還原為同步執(zhí)行
];
2.在業(yè)務(wù)控制器中創(chuàng)建一個新的消息,并推送到helloJobQueue隊列。
新增\application\index\controller\JobTest.php控制器,在該控制器中添加actionWithHelloJob方法,代碼:
namespace app\index\controller;
use think\Queue;
class JobTest
{
/*
* 測試隊列action
* */
public function actionWithHelloJob(){
// 1.當(dāng)前任務(wù)將由哪個類來負(fù)責(zé)處理。
// 當(dāng)輪到該任務(wù)時,系統(tǒng)將生成一個該類的實例,并調(diào)用其 fire 方法
$jobHandlerClassName = 'app\index\job\Hello@fire';
// 2.當(dāng)前任務(wù)歸屬的隊列名稱,如果為新隊列,會自動創(chuàng)建
$jobQueueName = "helloJobQueue";
// 3.當(dāng)前任務(wù)所需的業(yè)務(wù)數(shù)據(jù) . 不能為 resource 類型,其他類型最終將轉(zhuǎn)化為json形式的字符串
// ( jobData 為對象時,需要在先在此處手動序列化,否則只存儲其public屬性的鍵值對)
$jobData = [ 'name' => 'test'.rand(), 'password'=>rand()] ;
// 4.將該任務(wù)推送到消息隊列,等待對應(yīng)的消費者去執(zhí)行
$time2wait = strtotime('2018-09-08 11:15:00') - strtotime('now'); // 定時執(zhí)行
$isPushed = Queue::later($time2wait, $jobHandlerClassName , $jobData , $jobQueueName );
// database 驅(qū)動時,返回值為 1|false ; redis 驅(qū)動時,返回值為 隨機字符串|false
if( $isPushed !== false ){
echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."
";
}else{
echo 'Oops, something went wrong.';
}
}
}
3.編寫Hello消費者類,用來處理helloJobQueue隊列中的任務(wù),新增\application\index\job\Hello.php消費者類,并編寫其fire()方法,代碼:
/**
* 文件路徑: \application\index\job\Hello.php
* 這是一個消費者類,用于處理 helloJobQueue 隊列中的任務(wù)
*/
namespace app\index\job;
use think\queue\Job;
use think\Db;
class Hello {
/**
* fire方法是消息隊列默認(rèn)調(diào)用的方法
* @param Job $job 當(dāng)前的任務(wù)對象
* @param array|mixed $data 發(fā)布任務(wù)時自定義的數(shù)據(jù)
*/
public function fire(Job $job,$data){
// 如有必要,可以根據(jù)業(yè)務(wù)需求和數(shù)據(jù)庫中的最新數(shù)據(jù),判斷該任務(wù)是否仍有必要執(zhí)行.
$isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
if(!$isJobStillNeedToBeDone){
$job->delete();
return;
}
$isJobDone = $this->doHelloJob($data);
if ($isJobDone) {
//如果任務(wù)執(zhí)行成功, 記得刪除任務(wù)
$job->delete();
}else{
if ($job->attempts() > 3) {
//通過這個方法可以檢查這個任務(wù)已經(jīng)重試了幾次了
$job->delete();
// 也可以重新發(fā)布這個任務(wù)
//$job->release(2); //$delay為延遲時間,表示該任務(wù)延遲2秒后再執(zhí)行
}
}
}
/**
* 有些消息在到達(dá)消費者時,可能已經(jīng)不再需要執(zhí)行了
* @param array|mixed $data 發(fā)布任務(wù)時自定義的數(shù)據(jù)
* @return boolean 任務(wù)執(zhí)行的結(jié)果
*/
private function checkDatabaseToSeeIfJobNeedToBeDone($data){
return true;
}
/**
* 根據(jù)消息中的數(shù)據(jù)進(jìn)行實際的業(yè)務(wù)處理
* @param array|mixed $data 發(fā)布任務(wù)時自定義的數(shù)據(jù)
* @return boolean 任務(wù)執(zhí)行的結(jié)果
*/
private function doHelloJob($data) {
// 根據(jù)消息中的數(shù)據(jù)進(jìn)行實際的業(yè)務(wù)處理...
// test
Db::name('admin')->insert([
'name'=>$data['name'],
'password'=>$data['password']
]);
return true;
}
}