您好,登錄后才能下訂單哦!
1.安裝 Workerman
安裝GatewayWorker內(nèi)核文件(不包含start_gateway.php start_businessworker.php等啟動(dòng)入口文件),直接上composer
composer require workerman/gateway-worker
2.創(chuàng)建 Workerman 啟動(dòng)文件
創(chuàng)建一個(gè)自定義命令類(lèi)文件來(lái)啟動(dòng) Socket 服務(wù)端,新建
application/push/command/Workerman.php
namespace app\push\command;
use Workerman\Worker;
use GatewayWorker\Register;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
class Workerman extends Command
{
protected function configure()
{
$this->setName('workerman')
->addArgument('action', Argument::OPTIONAL, "action start|stop|restart")
->addArgument('type', Argument::OPTIONAL, "d -d")
->setDescription('workerman chat');
}
protected function execute(Input $input, Output $output)
{
global $argv;
$action = trim($input->getArgument('action'));
$type = trim($input->getArgument('type')) ? '-d' : '';
$argv[0] = 'chat';
$argv[1] = $action;
$argv[2] = $type ? '-d' : '';
$this->start();
}
private function start()
{
$this->startGateWay();
$this->startBusinessWorker();
$this->startRegister();
Worker::runAll();
}
private function startBusinessWorker()
{
// bussinessWorker 進(jìn)程
$worker = new BusinessWorker();
// worker名稱(chēng)
$worker->name = 'YourAppBusinessWorker';
// bussinessWorker進(jìn)程數(shù)量
$worker->count = 4;
//設(shè)置處理業(yè)務(wù)的類(lèi),此處制定Events的命名空間
$worker->eventHandler= \app\push\controller\Events::class;
// 服務(wù)注冊(cè)地址
$worker->registerAddress = '127.0.0.1:1238';
}
private function startGateWay()
{
// gateway 進(jìn)程,這里使用Text協(xié)議,可以用telnet測(cè)試
$gateway = new Gateway("websocket://0.0.0.0:8282");
// gateway名稱(chēng),status方便查看
$gateway->name = 'YourAppGateway';
// gateway進(jìn)程數(shù)
$gateway->count = 4;
// 本機(jī)ip,分布式部署時(shí)使用內(nèi)網(wǎng)ip
$gateway->lanIp = '127.0.0.1';
// 內(nèi)部通訊起始端口,假如$gateway->count=4,起始端口為4000
// 則一般會(huì)使用4000 4001 4002 4003 4個(gè)端口作為內(nèi)部通訊端口
$gateway->startPort = 20003;
// 服務(wù)注冊(cè)地址
$gateway->registerAddress = '127.0.0.1:1238';
// 心跳間隔
$gateway->pingInterval = 55;
$gateway->pingNotResponseLimit = 1;
// 心跳數(shù)據(jù)
$gateway->pingData = '';
}
private function startRegister()
{
new Register('text://0.0.0.0:1238');
}
}
配置 application/command.php 文件
return [
'app\common\command\Workerman',
];
3.創(chuàng)建事件監(jiān)聽(tīng)文件
創(chuàng)建 application/push/controller/Events.php 文件來(lái)監(jiān)聽(tīng)處理 workerman 的各種事件。
<?php
namespace app\push\controller;
use GatewayWorker\Lib\Gateway;
use think\Hook;
use Workerman\Lib\Timer;
class Events
{
//定時(shí)器間隔
protected static $interval = 2;
//定時(shí)器
protected static $timer = null;
//事件處理類(lèi)
protected static $evevtRunClass = \app\push\controller\EvevtRun::class;
/*
* 消息事件回調(diào) class
*
* */
protected static $eventClassName = \app\push\controller\Push::class;
/**
* 當(dāng)客戶(hù)端發(fā)來(lái)消息時(shí)觸發(fā)
* @param int $client_id 連接id
* @param mixed $message 具體消息
*/
public static function onMessage($client_id, $message)
{
$message_data = json_decode($message,true);
if (!$message_data) return ;
try{
if(!isset($message_data['type'])) throw new \Exception('缺少消息參數(shù)類(lèi)型');
//消息回調(diào)處理
$evevtName = self::$eventClassName.'::instance';
if(is_callable($evevtName))
$evevtName()->start($message_data['type'],$client_id,$message_data);
else
throw new \Exception('消息處理回調(diào)不存在。['+$evevtName+']');
}catch (\Exception $e){
var_dump([
'file'=>$e->getFile(),
'code'=>$e->getCode(),
'msg'=>$e->getMessage(),
'line'=>$e->getLine()
]);
}
}
/**
* 當(dāng)用戶(hù)連接時(shí)觸發(fā)的方法
* @param integer $client_id 連接的客戶(hù)端
* @return void
*/
public static function onConnect($client_id)
{
Gateway::sendToClient($client_id, json_encode(array(
'type' => 'init',
'client_id' => $client_id
)));
}
/**
* 當(dāng)用戶(hù)斷開(kāi)連接時(shí)觸發(fā)的方法
* @param integer $client_id 斷開(kāi)連接的客戶(hù)端
* @return void
*/
public static function onClose($client_id)
{
Gateway::sendToClient($client_id,json_encode([
'type'=>'logout',
'message'=>"client[$client_id]"
]));
}
/**
* 當(dāng)進(jìn)程啟動(dòng)時(shí)
* @param integer $businessWorker 進(jìn)程實(shí)例
*/
public static function onWorkerStart($worker)
{
//在進(jìn)程1上開(kāi)啟定時(shí)器 每self::$interval秒執(zhí)行
if($worker->id === 0){
$last = time();
$task = [6 => $last, 10 => $last, 30 => $last, 60 => $last, 180 => $last, 300 => $last];
self::$timer = Timer::add(self::$interval, function() use(&$task) {
try {
$now = time();
Hook::exec(self::$evevtRunClass);
foreach ($task as $sec => &$time) {
if (($now - $time) >= $sec) {
$time = $now;
Hook::exec(self::$evevtRunClass,'task_'.$sec);
}
}
} catch (\Throwable $e) {}
});
}
}
/**
* 當(dāng)進(jìn)程關(guān)閉時(shí)
* @param integer $businessWorker 進(jìn)程實(shí)例
*/
public static function onWorkerStop($worker)
{
if($worker->id === 0) Timer::del(self::$timer);
}
}
消息事件回調(diào) class 方法里的處理根據(jù)自身情況編寫(xiě)
<?php
namespace app\push\controller;
use app\wap\model\live\LiveUser;
use GatewayWorker\Lib\Gateway;
use app\wap\model\live\LiveHonouredGuest;
use app\wap\model\user\User;
use app\wap\model\live\LiveBarrage;
class Push
{
/*
* @var array 消息內(nèi)容
* */
protected $message_data = [
'type' => '',
'message'=>'',
];
/*
* @var string 消息類(lèi)型
* */
protected $message_type = '';
/*
* @var string $client_id
* */
protected $client_id = '';
/*
* @var int 當(dāng)前登陸用戶(hù)
* */
protected $uid = null;
/*
* @var null 本類(lèi)實(shí)例化結(jié)果
* */
protected static $instance = null;
/*
*
* */
protected function __construct($message_data = [])
{
}
/*
* 實(shí)例化本類(lèi)
* */
public static function instance()
{
if(is_null(self::$instance)) self::$instance = new static();
return self::$instance;
}
/*
* 檢測(cè)參數(shù)并返回
* @param array || string $keyValue 需要提取的鍵值
* @param null || bool $value
* @return array;
* */
protected function checkValue($keyValue = null,$value = null)
{
if(is_null($keyValue))
$message_data = $this->message_data;
if(is_string($keyValue))
$message_data = isset($this->message_data[$keyValue]) ? $this->message_data[$keyValue] : (is_null($value) ? '': $value);
if(is_array($keyValue))
$message_data = array_merge($keyValue,$this->message_data);
if(is_bool($value) && $value === true && is_array($message_data) && is_array($keyValue)){
$newData = [];
foreach ($keyValue as $key => $item){
$newData [] = $message_data[$key];
}
return $newData;
}
return $message_data;
}
/*
* 開(kāi)始設(shè)置回調(diào)
* @param string $typeFnName 回調(diào)函數(shù)名
* @param string $client_id
* @param array $message_data
*
* */
public function start($typeFnName,$client_id,$message_data)
{
$this->message_type = $typeFnName;
$this->message_data = $message_data;
$this->client_id = $client_id;
$this->uid = Gateway::getUidByClientId($client_id);
//記錄用戶(hù)上線(xiàn)
if($this->uid && Gateway::isOnline($client_id) && ($live_id = $this->checkValue('room')))
{
LiveUser::setLiveUserOnline($live_id,$this->uid,1);
}
if(method_exists($this,$typeFnName))
call_user_func([$this,$typeFnName]);
else
throw new \Exception('缺少回調(diào)方法');
}
/*
* 心跳檢測(cè)
*
* */
protected function ping()
{
return ;
}
/*
* 綁定用戶(hù)相應(yīng)客戶(hù)端
* @param string $client_id
* @param array $message_data
* @return
* */
protected function handshake()
{
$message_data = $this->checkValue(['uid'=>0,'room'=>0]);
if(!$message_data['uid']) throw new \Exception("缺少用戶(hù)uid,無(wú)法綁定用戶(hù)");
$new_message = [
'type' => $this->message_type,
'client_id' => $this->client_id,
'time' => date('H:i:s'),
'msg' => '綁定成功!'
];
Gateway::bindUid($this->client_id,$message_data['uid']);
//如果有群組id加入群組
if($message_data['room']){
// 加入某個(gè)群組(可調(diào)用多次加入多個(gè)群組) 將clientid加入roomid分組中
Gateway::joinGroup($this->client_id, $message_data['room']);
}
Gateway::sendToClient($this->client_id, json_encode($new_message));
}
/*
* 接受客戶(hù)端發(fā)送的消息
* @param string $client_id 客戶(hù)端client_id
* @param array $message_data 發(fā)送的數(shù)據(jù)
* @return
*
* */
protected function send()
{
list($toUid,$message,$room,$type) = $this->checkValue(['uid'=>0,'content'=>'','room'=>false,'ms_type' => 0],true);
$client_id = $this->client_id;
if(!$this->uid) {
//認(rèn)證用戶(hù)信息失敗,關(guān)閉用戶(hù)鏈接
Gateway::closeClient($client_id);
throw new \Exception("缺少用戶(hù)uid");
}
$userInfo = User::get($this->uid);
if(!$userInfo){
//認(rèn)證用戶(hù)信息失敗,關(guān)閉用戶(hù)鏈接
Gateway::closeClient($client_id);
throw new \Exception("用戶(hù)信息缺少");
}
if($room && Gateway::getClientIdCountByGroup($room)){
$user_type = LiveHonouredGuest::where(['uid'=>$this->uid,'live_id'=>$room])->value('type');
if(is_null($user_type)) $user_type = 2;
$res = LiveBarrage::set([
'live_id'=>$room,
'uid'=>$this->uid,
'type'=>$type,
'barrage'=>$message,
'add_time'=>time(),
'is_show'=>1
]);
if(!$res) throw new \Exception("寫(xiě)入歷史記錄失敗");
Gateway::sendToGroup($room,json_encode([
'message'=>$message,
'm_type'=>$type,
'type'=>'message',
'user_type'=>$user_type,
'userInfo'=>$userInfo,
'id'=>$res['id']
]));
}else{
$new_message = [
'type' => 'reception',
'content' => $message,
'time' => date('H:i:s'),
'timestamp' => time(),
];
if(Gateway::isUidOnline($toUid)) return Gateway::sendToUid($toUid, json_encode($new_message));
}
}
/*
* 消息撤回
* @param string $client_id
* @param array $message_data
* */
protected function recall()
{
list($id,$room) = $this->checkValue(['id'=>0,'room'=>''],true);
if(!$id)
throw new \Exception('缺少撤回消息的id');
if(!$room)
throw new \Exception('缺少房間號(hào)');
if(LiveBarrage::del($id)){
Gateway::sendToGroup($room,json_encode([
'type'=>'recall',
'id'=>$id
]),Gateway::getClientIdByUid($this->uid));
}
}
}
定時(shí)任務(wù)事件處理類(lèi) 按照自身情況編寫(xiě)方法內(nèi)邏輯
<?php
namespace app\push\controller;
use GatewayWorker\Lib\Gateway;
/*
* 定時(shí)任務(wù)
*
* */
class EvevtRun
{
/*
* 默認(rèn)定時(shí)器執(zhí)行事件
* */
public function run()
{
}
/*
* 每隔6秒執(zhí)行
* */
public function task_6()
{
}
/*
* 每隔10秒執(zhí)行
* */
public function task_10()
{
}
/*
* 每隔30秒執(zhí)行
* */
public function task_30()
{
}
/*
* 每隔60秒執(zhí)行
* */
public function task_60()
{
}
/*
* 每隔180秒執(zhí)行
* */
public function task_180()
{
}
/*
* 每隔300秒執(zhí)行
* */
public function task_300()
{
}
}
4.啟動(dòng) Workerman 服務(wù)端
以debug(調(diào)試)方式啟動(dòng)
以debug(調(diào)試)方式啟動(dòng)
php think workerman start
//以daemon(守護(hù)進(jìn)程)方式啟動(dòng)
php think workerman start d
//停止
php think workerman stop
//重啟
php think workerman restart
//平滑重啟
php think workerman reload
//查看狀態(tài)
php think workerman status
//當(dāng)你看到如下結(jié)果的時(shí)候,workerman已經(jīng)啟動(dòng)成功了。
Workerman[chat] start in DEBUG mode
----------------------- WORKERMAN -----------------------------
Workerman version:3.5.11 PHP version:7.0.29
------------------------ WORKERS -------------------------------
user worker listen processes status
tegic Gateway websocket://0.0.0.0:8282 4 [OK]
tegic BusinessWorker none 1 [OK]
tegic Register text://0.0.0.0:1236 4 [OK]
----------------------------------------------------------------
Press Ctrl+C to stop. Start success.
5.客戶(hù)端連接使用
socket.ws.send()調(diào)用可發(fā)送消息,socket.onmessage 內(nèi)是處理消息類(lèi)型,即可實(shí)現(xiàn)長(zhǎng)鏈接
(function (global) {
var socketDebug = window.socketDebug == undefined ? false : window.socketDebug;
var socket = {
ws:null,
connect:function () {
var that= this;
that.ws = new WebSocket("ws://"+document.domain+":"+window.workermanConfig.port);//這里如果使用127.0.0.1或者localhost會(huì)出現(xiàn)連接失敗。當(dāng)時(shí)為了方便以后的維護(hù),這里在php的全局文件里定義了一個(gè)常量來(lái)定義ip,后來(lái)本地開(kāi)發(fā)完提交到linux服務(wù)器環(huán)境之后發(fā)現(xiàn)鏈接失敗!按照此行代碼會(huì)有效連接~
that.ws.onopen = this.onopen;
that.ws.onmessage = this.onmessage;
that.ws.onclose = function(e) {
socketDebug && console.log("連接關(guān)閉,定時(shí)重連");
that.connect();
};
that.ws.onerror = function(e) {
socketDebug && console.log("出現(xiàn)錯(cuò)誤");
};
},
onopen:function () {
var joint = '{"type":"handshake","role":"user","uid":'+window.uid+',"room":'+window.room+'}';
socket.ws.send(joint);
socket.heartCheck.start();
},
sendMsg:function(content,type,id){
socket.ws.send("{content:'"+content+"',m_type:'"+type+"',room:"+id+",type:'send'}")
},
onmessage:function (e) {
try {
var data = JSON.parse(e.data);
socketDebug && console.log(data)
switch(data.type){
case 'init':
break;
// 服務(wù)端ping客戶(hù)端
case 'ping':
break;
// 登錄 更新用戶(hù)列表
case 'handshake':
break;
// 提醒
case 'reception':
break;
//直播進(jìn)行中
case 'live_ing':
break;
//直播結(jié)束
case 'live_end':
break;
//消息提醒
case 'message':
break;
//消息撤回
case 'recall':
break;
case 'ban':
break;
}
}catch (e) {
socketDebug && console.info(e);
}
},
heartCheck:{
timeout: 3000,
timeoutObj: null,
start: function(){
this.timeoutObj = setInterval(function(){
socket.ws.send("{'type':'ping'}");
}, this.timeout);
}
}
};
window.onload=function () {
socket.connect();
};
global.socket = socket;
return socket
}(this));
windows 版本無(wú)法啟動(dòng),已經(jīng)在商城項(xiàng)目中使用
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。