setnx(okeys::subcribeLock(SERVER_INT_IP), 1); if(!$ret){ //等待上一次消息处理玩,防止进程阻塞 oo::commonOprRedis('server')->delete(okeys::subcribeLock(SERVER_INT_IP)); //解锁 usleep(500000); return; } // $msg = oo::commonOprRedis('server')->rpop(okeys::pushServer(SERVER_INT_IP)); // if(empty($msg)){ //队列中没有消息了 // oo::commonOprRedis('server')->delete(okeys::subcribeLock(SERVER_INT_IP)); //解锁 // return; // } $i=1000; while($i){ $msg = oo::commonOprRedis('server')->rpop(okeys::pushServer(SERVER_INT_IP)); if(empty($msg)){ //队列中没有消息了 break; } $this->send($msg); $i--; } oo::commonOprRedis('server')->delete(okeys::subcribeLock(SERVER_INT_IP)); //解锁 return true; } public function send($msg){ $data = (array)json_decode($msg, true); try{ if(!isset($data['type'], $data['cmd'], $data['data'], $data['flag'])){ oo::logs()->debug3(date('Y-m-d H:i:s').'-订阅异常,参数不完整:'.json_encode($data), 'pushmsg.log'); return; } switch($data['type']){ case 'user': //单播 $this->pushCur($data['flag'], $data['cmd'], $data['data'],$data['from']); break; case 'group': //组播 $this->pushCurGroup($data['flag'], $data['cmd'], $data['data']); break; case 'broadcast': //广播 $this->pushCurAll($data['cmd'], $data['data'],$data['from']); break; case 'function': //获取当前服务器所有在线玩家信息,并执行object->action $fun = $data['flag']; $model = oo::commonOprModel($data['cmd']); if($model == -1){ oo::logs()->debug3(date('Y-m-d H:i:s').'-订阅异常,对象不存在:'.$data['cmd'], 'pushmsg_function.log'); return; } if(!method_exists($model, $fun)){ oo::logs()->debug3(date('Y-m-d H:i:s').'-订阅异常,方法不存在:'.$fun, 'pushmsg_function.log'); return; } //下线日志太多,不记录日志了 if($data['flag'] == 'noticeOtherOffline'){ return; } oo::logs()->debug3(date('Y-m-d H:i:s').'-订阅到了函数方法,参数:'.$msg, 'pushmsg_function.log'); oo::commonOprModel($data['cmd'])->$fun(...$data['data']); break; } }catch(Exception $e){ oo::logs()->debug3(date('Y-m-d H:i:s').'-订阅异常:'.$e->getMessage().'消息:'.$msg, 'pushmsg_function.log'); return; } } /** * 玩家上线,广播通知其他服务器下线该玩家 * @param int $uid * @param string $ip */ public function noticeOtherOffline($uid, $curClientId) { //Gateway::$registerAddress = WORKER_REGISTER_URL; if(!Gateway::isUidOnline($uid)){ return; } $clientIds = Gateway::getClientIdByUid($uid); if(!count($clientIds)){ return; } //$str = $this->encryp(json_encode(['cmd'=>array_search('Repeat_Login_Rec',ocmd::$Cmd),'code'=>1])); foreach($clientIds as $row){ $session = isset($_SESSION[$row]) ? $_SESSION[$row] : ''; if($row != $curClientId && Gateway::isOnline($row)){ //Gateway::sendToClient($row, $str); Gateway::closeClient($row); unset($_SESSION[$row]); //清除session oo::logs()->debug3(['uid'=>$uid,'curClientId' => $curClientId,'OnlineClientId'=>$row,'session'=>$session,], 'UserBindOther'); } } } /** * 设置玩家在线,不同服之间能获取玩家在线状态(10分钟) * @param int $uid */ public function online($uid) { oo::commonOprRedis('common')->setex(okeys::userOnline($uid), true, 60 * 10); } /** * 设置玩家下线 * @param int $uid */ public function offline($uid) { oo::commonOprRedis('user')->delete(okeys::userWsReq($uid)); oo::commonOprRedis('common')->delete(okeys::userOnline($uid)); } public static function getWebsocketServers() { if(!isset(oo::$config['workerservers'])){ return []; } return oo::$config['workerservers']; } /** * 单播 * @param string $uid * @param string $cmd * @param array $data */ public function push($uid,$cmd,$data,$isFast=false){ $servers = self::getWebsocketServers(); if(oo::commonOprModel('Workerman')->isUidCurOnline($uid) || empty($servers)){ //在当前节点,则直接推送 $this->pushCur($uid, $cmd, $data); return; } if(!IS_DEBUF){//正式环境不再推送 return; } foreach($servers as $ip){ if(!IS_DEBUF){//测试服只有单节点,故兼容golang可看作其他节点 if($ip == SERVER_INT_IP){ //当前节点,不用操作 continue; } } if($isFast){ oo::commonOprRedis('server')->rpush(okeys::pushServer($ip), json_encode([ "type" => "user", "flag" => $uid, "cmd" => $cmd, "data" => $data ])); }else{ oo::commonOprRedis('server')->lpush(okeys::pushServer($ip), json_encode([ "type" => "user", "flag" => $uid, "cmd" => $cmd, "data" => $data ])); } } } /** * 推送执行指定的函数信息 * @param string $method model 如:activitynew * @param string $fun function 如:sendRankRewardFasebasePushNsq * @param string $params 函数参数,如array(1, 0) */ public function pushFun($method, $fun, array $params = []){ $servers = self::getWebsocketServers(); if(empty($servers)){ return; } if(!IS_DEBUF){//正式环境不再推送 return; } foreach($servers as $ip){ if($fun == 'noticeOtherOffline'){ if($ip == SERVER_INT_IP){ continue; //本机不用处理 } //需要马上处理的消息 oo::commonOprRedis('server')->rpush(okeys::pushServer($ip), json_encode([ 'type' => 'function', 'cmd' => $method, 'flag' => $fun, 'data' => $params ])); continue; } oo::commonOprRedis('server')->lpush(okeys::pushServer($ip), json_encode([ 'type' => 'function', 'cmd' => $method, 'flag' => $fun, 'data' => $params ])); } } /** * 群播 * @param string $group * @param string $cmd * @param $data */ public function pushGroup($group,$cmd,$data){ $servers = self::getWebsocketServers(); if(empty($servers)){ $this->pushCurGroup($group,$cmd,$data); return; } if(!IS_DEBUF){//正式环境不再推送 return; } foreach($servers as $ip){ if($ip == SERVER_INT_IP){ //当前节点,直接推送 $this->pushCurGroup($group,$cmd,$data); continue; } oo::commonOprRedis('server')->lpush(okeys::pushServer($ip), json_encode([ "type" => "group", "flag" => $group, "cmd" => $cmd, "data" => $data ])); } } /** * 广播 * @param string $cmd * @param array $data */ public function pushAll($cmd,$data){ $servers = self::getWebsocketServers(); if(empty($servers)){ $this->pushCurAll($cmd,$data); return; } if(!IS_DEBUF){//正式环境不再推送 return; } foreach($servers as $ip){ if($ip == SERVER_INT_IP){ //当前节点,直接推送 $this->pushCurAll($cmd,$data); continue; } oo::commonOprRedis('server')->lpush(okeys::pushServer($ip), json_encode([ "type" => "broadcast", "flag" => "", "cmd" => $cmd, "data" => $data ])); } } /** * 获取所有玩家uid列表(此接口改版后只做统计用,需要全服推送请使用队列中的broadcast方案) */ public function getAllUidList(){ $servers = self::getWebsocketServers(); $members = []; foreach($servers as $ip){ $tmp = oo::commonOprRedis('common')->hGetAll($this->getOnlineUidsKey(0, $ip)); if(empty($tmp)){ //防止读的那个瞬间key被delete,重新读一个写key $tmp = oo::commonOprRedis('common')->hGetAll($this->getOnlineUidsKey(1, $ip)); } $members = $tmp ? $members+$tmp : $members; } $members = $members ? array_unique($members) : $members; return $members; } /** * 获取key,每30秒切换一次读写key标记 * @param int $type (0:读key,1:写key) * @param string $ip * @return string */ public function getOnlineUidsKey($type = 0, $ip = '') { $s = date('s'); if($type == 1){ $index = $s < 30 ? 1 : 0; }else{ $index = $s < 30 ? 0 : 1; } $ip = $ip ? $ip : SERVER_INT_IP; return okeys::onUidlines($ip, $index); } /** * 获取上一次更新时间 */ public function getLastSveTime($ip = '') { $ip = $ip ? $ip : SERVER_INT_IP; return (int)oo::commonOprRedis('common')->get(okeys::onUidlinesSaveTimes($ip)); } /** * 检查当前worker进程是否需要更新当前在线数量(timer:30秒触发一次) */ public function checkNeedSave() { $lastTime = $this->getLastSveTime(); if(time() - $lastTime < 25){ //定时器每30秒跑一次,测试服上进程慢,调宽松一点 return false; } return true; } /** * 更新当前节点所有玩家uid列表 * 存2个redis,保证读写分离 * @param array $uids */ public function saveAllUids($uids = []) { //保存上次更新时间 oo::commonOprRedis('common')->setex(okeys::onUidlinesSaveTimes(SERVER_INT_IP), time(), 60); $key = $this->getOnlineUidsKey(1); //临时添加测试在线玩家uid if(IS_DEBUF){ $last = oo::commonOprRedis('common')->hGetAll($key); $lost = array_values(array_diff($last, $uids)); //每30秒流失的uid $new = array_values(array_diff($uids, $last)); //每30秒新进来的uid,检查新进来的是不是从其它节点过来的uid oo::logs()->debug3(['date'=> date('Y-m-d H:i:s', time()), 'key' => $key, 'count' => count($uids), 'last_count' => count($last), 'lost' => $lost, 'new' => $new], 'all_uids.log'); //正式服下降超过30人记录日志 }elseif(oo::commonOprRedis('common')->hLen($key) - count($uids) > 15){ $last = oo::commonOprRedis('common')->hGetAll($key); $lost = array_values(array_diff($last, $uids)); //每30秒流失的uid $new = array_values(array_diff($uids, $last)); //每30秒新进来的uid,检查新进来的是不是从其它节点过来的uid oo::logs()->debug3(['date'=> date('Y-m-d H:i:s', time()), 'key' => $key, 'count' => count($uids), 'last_count' => count($last), 'lost' => $lost, 'new' => $new], 'all_uids.log'); } oo::commonOprRedis('common')->delete($key); oo::commonOprRedis('common')->hMset($key, $uids); } /** * 判断当前玩家是否在线 * @param int $uid * @return bool */ public function isUidOnline($uid) { if($this->isUidCurOnline($uid)) { //先从本节点worker中获取 return true; } //从缓存中获取 return oo::commonOprRedis('common')->get(okeys::userOnline($uid)) ? true : false; } public function pushCur($uid,$cmd,$data,$from="php"){ if(!in_array($cmd,[ocmd::$send['ActivityChangeStatus_New'],ocmd::$send['Activity_timeCollect'],ocmd::$send['Activity_timeCollectBet']])&&$from!="go"){//非活动上下线推送(活动不使用通用topic) $nsqData =["type" => "user","uid"=>$uid,"flag" =>$uid, "cmd" => $cmd, "data" => $data]; funs::SendNsqMsg(ocmd::TopicPHPPush,[],$nsqData); } Gateway::$registerAddress = WORKER_REGISTER_URL; $send_msg = array('cmd'=>$cmd, 'data'=>$data, 'code' => 1); $str = json_encode($send_msg,JSON_NUMERIC_CHECK); if($this->encryp($uid)){ $str = base64_encode($str); $key = floor(strlen($str)/2)-1; $out = substr($str,0,$key).substr(md5(time()),0,16).substr($str,$key,strlen($str)); $out = substr($out,0,1).substr(md5(time()),0,1).substr($out,1,strlen($out)); }else{ $out = $str; } Gateway::sendToUid($uid, $out); } public function pushCurGroup($group,$cmd,$data){ Gateway::$registerAddress = WORKER_REGISTER_URL; $send_msg = array('cmd'=>$cmd,'data'=>$data); $str = json_encode($send_msg,JSON_NUMERIC_CHECK); $str = base64_encode($str); $key = floor(strlen($str)/2)-1; $out = substr($str,0,$key).substr(md5(time()),0,16).substr($str,$key,strlen($str)); $out = substr($out,0,1).substr(md5(time()),0,1).substr($out,1,strlen($out)); Gateway::sendToGroup($group, $out); } public function pushCurAll($cmd,$data,$from="php"){ if(!in_array($cmd,[ocmd::$send['ActivityChangeStatus_New'],ocmd::$send['Activity_timeCollect'],ocmd::$send['Activity_timeCollectBet']])&&$from!="go"){//非活动上下线推送(活动不使用通用topic) $nsqData =["type" => "broadcast","flag" =>"", "cmd" => $cmd, "data" => $data]; funs::SendNsqMsg(ocmd::TopicPHPPushAll,[],$nsqData); } Gateway::$registerAddress = WORKER_REGISTER_URL; $send_msg = array('cmd'=>$cmd,'data'=>$data); $str = json_encode($send_msg,JSON_NUMERIC_CHECK); $str = base64_encode($str); $key = floor(strlen($str)/2)-1; $out = substr($str,0,$key).substr(md5(time()),0,16).substr($str,$key,strlen($str)); $out = substr($out,0,1).substr(md5(time()),0,1).substr($out,1,strlen($out)); Gateway::sendToAll($out); } public function getCurAllUidList(){ Gateway::$registerAddress = WORKER_REGISTER_URL; return Gateway::getAllUidList(); } /** * 判断玩家是否当前节点在线 */ public function isUidCurOnline($uid){ try{ Gateway::$registerAddress = WORKER_REGISTER_URL; return Gateway::isUidOnline($uid); }catch (Exception $e){ return false; } } /** * Notes:获取在线ClientId数 * User: wsc * Time: 2020/12/10 16:53 * @return int */ public function getOnlineCount(){ try{ Gateway::$registerAddress = WORKER_REGISTER_URL; return Gateway::getAllClientIdCount(); }catch (Exception $e){ return 0; } } public function spinsFull($new = false){ Gateway::$registerAddress = WORKER_REGISTER_URL; //$start_time = microtime(); if($new){ //新逻辑 不每次都刷新体力 //1、获取没有满体力的用户 okeys::spinsFirebase() //2、计算用户满体力时间 (50 - 当前体力) / 5 / 恢复体力时间 + 体力恢复倒计时,写入新的队列缓存,并删除okeys::spinsFirebase() //3、从第二步骤写入的缓存中取出瞒住时间的用户发送消息,并删除该用户的缓存 $setTime = oo::commonOprModel('readconfig')->getCon('basic','restoreTime');//多久恢复一次体力 $setSpins = oo::commonOprModel('readconfig')->getCon('basic','restoreSpins');//每次恢复的体力 //$refreshSpins = oo::commonOprModel('readconfig')->getCon('basic','refreshSpins');//vip追加体力恢复上限 $time = time(); $pushArray = oo::commonOprRedis('common')->hGetAll(okeys::spinsFirebase()); oo::commonOprRedis('common')->delete(okeys::spinsFirebase()); foreach ($pushArray as $uid => $refreSpins){ $getUserInfos = oo::commonOprModel('member')->getUserAssetsInfo($uid); $maxSpins = oo::commonOprModel('member')->getLevelUpLimit($getUserInfos['levelId'],'spins') ?? 50; //VIP恢复体力加速 /* if(oo::commonOprModel('newvip')->checkVip($uid)){ $setSpinsTemp = $setSpins + $refreshSpins; }else{ $setSpinsTemp = $setSpins; } */ if(intval($getUserInfos['spins']) < $maxSpins && $refreSpins){ if(( $maxSpins - intval($getUserInfos['spins']) ) % $setSpins == 0){ $endTime = $time + (intval(( $maxSpins - intval($getUserInfos['spins']) ) / $setSpins) - 1) * $setTime + $refreSpins; }else{ $endTime = $time + intval(( $maxSpins - intval($getUserInfos['spins']) ) / $setSpins) * $setTime + $refreSpins; } oo::commonOprRedis('common')->hSet(okeys::spinsFirebaseEndTime(),$uid,$endTime); } } $endTimeRet = oo::commonOprRedis('common')->hGetAll(okeys::spinsFirebaseEndTime()); foreach($endTimeRet as $uid => $endTime){ if($time >= $endTime){ oo::commonOprRedis('common')->hDel(okeys::spinsFirebaseEndTime(),$uid); !IS_DEBUF && oo::commonOprModel('push')->pushNews($uid, 'push.title.spins', 'push.content.spins', [],'spins'); } } }else{ //原逻辑 每次都刷新体力 $pushArray = oo::commonOprRedis('common')->hKeys(okeys::spinsFirebase()); oo::commonOprRedis('common')->delete(okeys::spinsFirebase()); foreach ($pushArray as $row){ $refreSpins = oo::commonOprModel('user')->refreshSpins($row,1); if(!$refreSpins){ !IS_DEBUF && oo::commonOprModel('push')->pushNews($row, 'push.title.spins', 'push.content.spins', [],'spins'); } } } //var_dump("消耗时长:".(microtime() - $start_time)); } public function encryp($uid){ Gateway::$registerAddress = WORKER_REGISTER_URL; $clientArr = Gateway::getClientIdByUid($uid); if(empty($clientArr)){ return false; } foreach ($clientArr as $row){ $temp = Gateway::getSession($row); if(!empty($temp)){ return $temp[$row]["encryp"]; } } } }