溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

TP5.0.24+Workerman+定時(shí)任務(wù)

發(fā)布時(shí)間:2020-08-05 16:15:40 來(lái)源:網(wǎng)絡(luò) 閱讀:534 作者:qq5b8f70dcdd315 欄目:web開(kāi)發(fā)

1.安裝 Workerman
安裝GatewayWorker內(nèi)核文件(不包含start_gateway.php start_businessworker.php等啟動(dòng)入口文件),直接上composer

composer require workerman/gateway-worker

2.創(chuàng)建 Workerman 啟動(dòng)文件
創(chuàng)建一個(gè)自定義命令類(lèi)文件來(lái)啟動(dòng) Socket 服務(wù)端,新建

application/push/command/Workerman.php
namespace app\push\command;

use Workerman\Worker;
use GatewayWorker\Register;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;

class Workerman extends Command
{

    protected function configure()
    {
        $this->setName('workerman')
            ->addArgument('action', Argument::OPTIONAL, "action  start|stop|restart")
            ->addArgument('type', Argument::OPTIONAL, "d -d")
            ->setDescription('workerman chat');
    }

    protected function execute(Input $input, Output $output)
    {
        global $argv;
        $action = trim($input->getArgument('action'));
        $type   = trim($input->getArgument('type')) ? '-d' : '';

        $argv[0] = 'chat';
        $argv[1] = $action;
        $argv[2] = $type ? '-d' : '';

        $this->start();
    }

    private function start()
    {
        $this->startGateWay();
        $this->startBusinessWorker();
        $this->startRegister();
        Worker::runAll();
    }

    private function startBusinessWorker()
    {
        // bussinessWorker 進(jìn)程
        $worker = new BusinessWorker();
        // worker名稱(chēng)
        $worker->name = 'YourAppBusinessWorker';
        // bussinessWorker進(jìn)程數(shù)量
        $worker->count = 4;
        //設(shè)置處理業(yè)務(wù)的類(lèi),此處制定Events的命名空間
        $worker->eventHandler= \app\push\controller\Events::class;
        // 服務(wù)注冊(cè)地址
        $worker->registerAddress = '127.0.0.1:1238';
    }

    private function startGateWay()
    {
        // gateway 進(jìn)程,這里使用Text協(xié)議,可以用telnet測(cè)試
        $gateway = new Gateway("websocket://0.0.0.0:8282");
        // gateway名稱(chēng),status方便查看
        $gateway->name = 'YourAppGateway';
        // gateway進(jìn)程數(shù)
        $gateway->count = 4;
        // 本機(jī)ip,分布式部署時(shí)使用內(nèi)網(wǎng)ip
        $gateway->lanIp = '127.0.0.1';
        // 內(nèi)部通訊起始端口,假如$gateway->count=4,起始端口為4000
        // 則一般會(huì)使用4000 4001 4002 4003 4個(gè)端口作為內(nèi)部通訊端口
        $gateway->startPort = 20003;
        // 服務(wù)注冊(cè)地址
        $gateway->registerAddress = '127.0.0.1:1238';
        // 心跳間隔
        $gateway->pingInterval = 55;
        $gateway->pingNotResponseLimit = 1;
        // 心跳數(shù)據(jù)
        $gateway->pingData = '';
    }

    private function startRegister()
    {
        new Register('text://0.0.0.0:1238');
    }
}

配置 application/command.php 文件

return [
    'app\common\command\Workerman',
];

3.創(chuàng)建事件監(jiān)聽(tīng)文件
創(chuàng)建 application/push/controller/Events.php 文件來(lái)監(jiān)聽(tīng)處理 workerman 的各種事件。

<?php

namespace app\push\controller;

use GatewayWorker\Lib\Gateway;
use think\Hook;
use Workerman\Lib\Timer;

class Events
{

    //定時(shí)器間隔
    protected static $interval = 2;
    //定時(shí)器
    protected static $timer = null;
    //事件處理類(lèi)
    protected static $evevtRunClass = \app\push\controller\EvevtRun::class;
    /*
     * 消息事件回調(diào) class
     *
     * */
    protected static $eventClassName = \app\push\controller\Push::class;
    /**
     * 當(dāng)客戶(hù)端發(fā)來(lái)消息時(shí)觸發(fā)
     * @param int $client_id 連接id
     * @param mixed $message 具體消息
     */
    public static function onMessage($client_id, $message)
    {
        $message_data = json_decode($message,true);
        if (!$message_data) return ;
        try{
            if(!isset($message_data['type'])) throw new \Exception('缺少消息參數(shù)類(lèi)型');
            //消息回調(diào)處理
            $evevtName = self::$eventClassName.'::instance';
            if(is_callable($evevtName))
                $evevtName()->start($message_data['type'],$client_id,$message_data);
            else
                throw new \Exception('消息處理回調(diào)不存在。['+$evevtName+']');
        }catch (\Exception $e){
            var_dump([
                'file'=>$e->getFile(),
                'code'=>$e->getCode(),
                'msg'=>$e->getMessage(),
                'line'=>$e->getLine()
            ]);
        }
    }

    /**
     * 當(dāng)用戶(hù)連接時(shí)觸發(fā)的方法
     * @param integer $client_id 連接的客戶(hù)端
     * @return void
     */
    public static function onConnect($client_id)
    {
        Gateway::sendToClient($client_id, json_encode(array(
            'type'      => 'init',
            'client_id' => $client_id
        )));
    }

    /**
     * 當(dāng)用戶(hù)斷開(kāi)連接時(shí)觸發(fā)的方法
     * @param integer $client_id 斷開(kāi)連接的客戶(hù)端
     * @return void
     */
    public static function onClose($client_id)
    {
        Gateway::sendToClient($client_id,json_encode([
            'type'=>'logout',
            'message'=>"client[$client_id]"
        ]));
    }

    /**
     * 當(dāng)進(jìn)程啟動(dòng)時(shí)
     * @param integer $businessWorker 進(jìn)程實(shí)例
     */
    public static function onWorkerStart($worker)
    {
        //在進(jìn)程1上開(kāi)啟定時(shí)器 每self::$interval秒執(zhí)行
        if($worker->id === 0){
            $last = time();
            $task = [6 => $last, 10 => $last, 30 => $last, 60 => $last, 180 => $last, 300 => $last];
            self::$timer = Timer::add(self::$interval, function() use(&$task) {
                try {
                    $now = time();
                    Hook::exec(self::$evevtRunClass);
                    foreach ($task as $sec => &$time) {
                        if (($now - $time) >= $sec) {
                            $time = $now;
                            Hook::exec(self::$evevtRunClass,'task_'.$sec);
                        }
                    }
                } catch (\Throwable $e) {}

            });
        }

    }

    /**
     * 當(dāng)進(jìn)程關(guān)閉時(shí)
     * @param integer $businessWorker 進(jìn)程實(shí)例
     */
    public static function onWorkerStop($worker)
    {
        if($worker->id === 0) Timer::del(self::$timer);
    }
}

消息事件回調(diào) class 方法里的處理根據(jù)自身情況編寫(xiě)

<?php

namespace app\push\controller;

use app\wap\model\live\LiveUser;
use GatewayWorker\Lib\Gateway;
use app\wap\model\live\LiveHonouredGuest;
use app\wap\model\user\User;
use app\wap\model\live\LiveBarrage;

class Push
{

    /*
     * @var array 消息內(nèi)容
     * */
    protected $message_data = [
        'type' => '',
        'message'=>'',
    ];
    /*
     * @var string 消息類(lèi)型
     * */
    protected $message_type = '';
    /*
     * @var string $client_id
     * */
    protected $client_id    = '';
    /*
     * @var int 當(dāng)前登陸用戶(hù)
     * */
    protected $uid = null;
    /*
     * @var null 本類(lèi)實(shí)例化結(jié)果
     * */
    protected static $instance = null;
    /*
     *
     * */
    protected function __construct($message_data = [])
    {

    }
    /*
     * 實(shí)例化本類(lèi)
     * */
    public static function instance()
    {
        if(is_null(self::$instance)) self::$instance = new static();
        return self::$instance;
    }

    /*
     * 檢測(cè)參數(shù)并返回
     * @param array || string $keyValue 需要提取的鍵值
     * @param null || bool $value
     * @return array;
     * */
    protected function checkValue($keyValue = null,$value = null)
    {
        if(is_null($keyValue))
            $message_data = $this->message_data;
        if(is_string($keyValue))
            $message_data = isset($this->message_data[$keyValue]) ? $this->message_data[$keyValue] : (is_null($value) ? '': $value);
        if(is_array($keyValue))
            $message_data = array_merge($keyValue,$this->message_data);
        if(is_bool($value) && $value === true && is_array($message_data) && is_array($keyValue)){
            $newData = [];
            foreach ($keyValue as $key => $item){
                $newData [] = $message_data[$key];
            }
            return $newData;
        }
        return $message_data;
    }

    /*
     * 開(kāi)始設(shè)置回調(diào)
     * @param string $typeFnName 回調(diào)函數(shù)名
     * @param string $client_id
     * @param array $message_data
     *
     * */
    public function start($typeFnName,$client_id,$message_data)
    {

        $this->message_type = $typeFnName;

        $this->message_data = $message_data;

        $this->client_id    = $client_id;

        $this->uid = Gateway::getUidByClientId($client_id);
        //記錄用戶(hù)上線(xiàn)
        if($this->uid && Gateway::isOnline($client_id) && ($live_id = $this->checkValue('room')))
        {
            LiveUser::setLiveUserOnline($live_id,$this->uid,1);
        }

        if(method_exists($this,$typeFnName))
            call_user_func([$this,$typeFnName]);
        else
            throw new \Exception('缺少回調(diào)方法');
    }

    /*
     * 心跳檢測(cè)
     *
     * */
    protected  function ping()
    {
        return ;
    }

    /*
     * 綁定用戶(hù)相應(yīng)客戶(hù)端
     * @param string $client_id
     * @param array $message_data
     * @return
     * */
    protected function handshake()
    {
        $message_data = $this->checkValue(['uid'=>0,'room'=>0]);
        if(!$message_data['uid'])  throw new \Exception("缺少用戶(hù)uid,無(wú)法綁定用戶(hù)");
        $new_message    = [
            'type'      => $this->message_type,
            'client_id' => $this->client_id,
            'time'      => date('H:i:s'),
            'msg'       => '綁定成功!'
        ];
        Gateway::bindUid($this->client_id,$message_data['uid']);

        //如果有群組id加入群組
        if($message_data['room']){
            // 加入某個(gè)群組(可調(diào)用多次加入多個(gè)群組) 將clientid加入roomid分組中
            Gateway::joinGroup($this->client_id, $message_data['room']);
        }

        Gateway::sendToClient($this->client_id, json_encode($new_message));
    }

    /*
     * 接受客戶(hù)端發(fā)送的消息
     * @param string $client_id 客戶(hù)端client_id
     * @param array $message_data 發(fā)送的數(shù)據(jù)
     * @return
     *
     * */
    protected function send()
    {
        list($toUid,$message,$room,$type) = $this->checkValue(['uid'=>0,'content'=>'','room'=>false,'ms_type' => 0],true);
        $client_id      = $this->client_id;
        if(!$this->uid) {
            //認(rèn)證用戶(hù)信息失敗,關(guān)閉用戶(hù)鏈接
            Gateway::closeClient($client_id);
            throw new \Exception("缺少用戶(hù)uid");
        }
        $userInfo = User::get($this->uid);
        if(!$userInfo){
            //認(rèn)證用戶(hù)信息失敗,關(guān)閉用戶(hù)鏈接
            Gateway::closeClient($client_id);
            throw new \Exception("用戶(hù)信息缺少");
        }
        if($room && Gateway::getClientIdCountByGroup($room)){
            $user_type = LiveHonouredGuest::where(['uid'=>$this->uid,'live_id'=>$room])->value('type');
            if(is_null($user_type)) $user_type = 2;
            $res = LiveBarrage::set([
                'live_id'=>$room,
                'uid'=>$this->uid,
                'type'=>$type,
                'barrage'=>$message,
                'add_time'=>time(),
                'is_show'=>1
            ]);
            if(!$res) throw new \Exception("寫(xiě)入歷史記錄失敗");
            Gateway::sendToGroup($room,json_encode([
                'message'=>$message,
                'm_type'=>$type,
                'type'=>'message',
                'user_type'=>$user_type,
                'userInfo'=>$userInfo,
                'id'=>$res['id']
            ]));
        }else{
            $new_message    = [
                'type'      => 'reception',
                'content'   => $message,
                'time'      => date('H:i:s'),
                'timestamp' => time(),
            ];
            if(Gateway::isUidOnline($toUid)) return Gateway::sendToUid($toUid, json_encode($new_message));
        }
    }

    /*
     * 消息撤回
     * @param string $client_id
     * @param array $message_data
     * */
    protected function recall()
    {
        list($id,$room) = $this->checkValue(['id'=>0,'room'=>''],true);

        if(!$id)
            throw new \Exception('缺少撤回消息的id');

        if(!$room)
            throw new \Exception('缺少房間號(hào)');

        if(LiveBarrage::del($id)){
            Gateway::sendToGroup($room,json_encode([
                'type'=>'recall',
                'id'=>$id
            ]),Gateway::getClientIdByUid($this->uid));
        }
    }

}

定時(shí)任務(wù)事件處理類(lèi) 按照自身情況編寫(xiě)方法內(nèi)邏輯

<?php

namespace app\push\controller;

use GatewayWorker\Lib\Gateway;

/*
 * 定時(shí)任務(wù)
 *
 * */

class EvevtRun
{

    /*
     * 默認(rèn)定時(shí)器執(zhí)行事件
     * */
    public function run()
    {

    }

    /*
     * 每隔6秒執(zhí)行
     * */
    public function task_6()
    {

    }

    /*
     * 每隔10秒執(zhí)行
     * */
    public function task_10()
    {

    }

    /*
     * 每隔30秒執(zhí)行
     * */
    public function task_30()
    {

    }

    /*
     * 每隔60秒執(zhí)行
     * */
    public function task_60()
    {

    }
    /*
     * 每隔180秒執(zhí)行
     * */
    public function task_180()
    {

    }

    /*
     * 每隔300秒執(zhí)行
     * */
    public function task_300()
    {

    }

}

4.啟動(dòng) Workerman 服務(wù)端
以debug(調(diào)試)方式啟動(dòng)

以debug(調(diào)試)方式啟動(dòng)
php think workerman start
//以daemon(守護(hù)進(jìn)程)方式啟動(dòng)
php think workerman start d
//停止
php think workerman stop
//重啟
php think workerman restart
//平滑重啟
php think workerman reload
//查看狀態(tài)
php think workerman status

//當(dāng)你看到如下結(jié)果的時(shí)候,workerman已經(jīng)啟動(dòng)成功了。
Workerman[chat] start in DEBUG mode
----------------------- WORKERMAN -----------------------------
Workerman version:3.5.11          PHP version:7.0.29
------------------------ WORKERS -------------------------------
user          worker          listen                    processes status
tegic         Gateway         websocket://0.0.0.0:8282   4         [OK]
tegic         BusinessWorker  none                       1         [OK]
tegic         Register        text://0.0.0.0:1236        4         [OK]
----------------------------------------------------------------
Press Ctrl+C to stop. Start success.

5.客戶(hù)端連接使用

socket.ws.send()調(diào)用可發(fā)送消息,socket.onmessage 內(nèi)是處理消息類(lèi)型,即可實(shí)現(xiàn)長(zhǎng)鏈接

(function (global) {

    var socketDebug = window.socketDebug == undefined ? false : window.socketDebug;
    var socket = {
        ws:null,
        connect:function () {
            var that= this;
            that.ws = new WebSocket("ws://"+document.domain+":"+window.workermanConfig.port);//這里如果使用127.0.0.1或者localhost會(huì)出現(xiàn)連接失敗。當(dāng)時(shí)為了方便以后的維護(hù),這里在php的全局文件里定義了一個(gè)常量來(lái)定義ip,后來(lái)本地開(kāi)發(fā)完提交到linux服務(wù)器環(huán)境之后發(fā)現(xiàn)鏈接失敗!按照此行代碼會(huì)有效連接~
            that.ws.onopen = this.onopen;
            that.ws.onmessage = this.onmessage;
            that.ws.onclose = function(e) {
                socketDebug && console.log("連接關(guān)閉,定時(shí)重連");
                that.connect();
            };
            that.ws.onerror = function(e) {
                socketDebug && console.log("出現(xiàn)錯(cuò)誤");
            };
        },
        onopen:function () {
            var joint = '{"type":"handshake","role":"user","uid":'+window.uid+',"room":'+window.room+'}';
            socket.ws.send(joint);
            socket.heartCheck.start();
        },
        sendMsg:function(content,type,id){
            socket.ws.send("{content:'"+content+"',m_type:'"+type+"',room:"+id+",type:'send'}")
        },
        onmessage:function (e) {
            try {
                var data = JSON.parse(e.data);
                socketDebug && console.log(data)
                switch(data.type){
                    case 'init':

                        break;
                    // 服務(wù)端ping客戶(hù)端
                    case 'ping':

                        break;
                    // 登錄 更新用戶(hù)列表
                    case 'handshake':

                        break;
                    // 提醒
                    case 'reception':

                        break;
                    //直播進(jìn)行中
                    case 'live_ing':

                        break;
                    //直播結(jié)束
                    case 'live_end':

                        break;
                    //消息提醒
                    case 'message':

                        break;
                    //消息撤回
                    case 'recall':

                        break;
                    case 'ban':

                        break;
                }
            }catch (e) {
                socketDebug && console.info(e);
            }

        },
        heartCheck:{
            timeout: 3000,
            timeoutObj: null,
            start: function(){
                this.timeoutObj = setInterval(function(){
                    socket.ws.send("{'type':'ping'}");
                }, this.timeout);
            }
        }
    };

    window.onload=function () {
        socket.connect();
    };

    global.socket = socket;

    return socket
}(this));

windows 版本無(wú)法啟動(dòng),已經(jīng)在商城項(xiàng)目中使用

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI