123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494 |
- <?php
- require_once __DIR__ . '/../vendor/autoload.php';
- use \GatewayWorker\Lib\Gateway;
- use \Workerman\Lib\Timer;
- /**
- * Workerman
- */
- class ModelWorkerman{
- /**
- * redis消息订阅,redis队列逐条消费
- */
- public function pushMsgSubscribe() {
- $ret = oo::commonOprRedis('server')->setnx(okeys::subcribeLock(SERVER_INT_IP), 1);
- if(!$ret){ //等待上一次消息处理玩,防止进程阻塞
- oo::commonOprRedis('server')->delete(okeys::subcribeLock(SERVER_INT_IP)); //解锁
- usleep(500000);
- return;
- }
- // $msg = oo::commonOprRedis('server')->rpop(okeys::pushServer(SERVER_INT_IP));
- // if(empty($msg)){ //队列中没有消息了
- // oo::commonOprRedis('server')->delete(okeys::subcribeLock(SERVER_INT_IP)); //解锁
- // return;
- // }
- $i=1000;
- while($i){
- $msg = oo::commonOprRedis('server')->rpop(okeys::pushServer(SERVER_INT_IP));
- if(empty($msg)){ //队列中没有消息了
- break;
- }
- $this->send($msg);
- $i--;
- }
- oo::commonOprRedis('server')->delete(okeys::subcribeLock(SERVER_INT_IP)); //解锁
- return true;
- }
- public function send($msg){
- $data = (array)json_decode($msg, true);
- try{
- if(!isset($data['type'], $data['cmd'], $data['data'], $data['flag'])){
- oo::logs()->debug3(date('Y-m-d H:i:s').'-订阅异常,参数不完整:'.json_encode($data), 'pushmsg.log');
- return;
- }
- switch($data['type']){
- case 'user': //单播
- $this->pushCur($data['flag'], $data['cmd'], $data['data'],$data['from']);
- break;
- case 'group': //组播
- $this->pushCurGroup($data['flag'], $data['cmd'], $data['data']);
- break;
- case 'broadcast': //广播
- $this->pushCurAll($data['cmd'], $data['data'],$data['from']);
- break;
- case 'function': //获取当前服务器所有在线玩家信息,并执行object->action
- $fun = $data['flag'];
- $model = oo::commonOprModel($data['cmd']);
- if($model == -1){
- oo::logs()->debug3(date('Y-m-d H:i:s').'-订阅异常,对象不存在:'.$data['cmd'], 'pushmsg_function.log');
- return;
- }
- if(!method_exists($model, $fun)){
- oo::logs()->debug3(date('Y-m-d H:i:s').'-订阅异常,方法不存在:'.$fun, 'pushmsg_function.log');
- return;
- }
- //下线日志太多,不记录日志了
- if($data['flag'] == 'noticeOtherOffline'){
- return;
- }
- oo::logs()->debug3(date('Y-m-d H:i:s').'-订阅到了函数方法,参数:'.$msg, 'pushmsg_function.log');
- oo::commonOprModel($data['cmd'])->$fun(...$data['data']);
- break;
- }
- }catch(Exception $e){
- oo::logs()->debug3(date('Y-m-d H:i:s').'-订阅异常:'.$e->getMessage().'消息:'.$msg, 'pushmsg_function.log');
- return;
- }
- }
- /**
- * 玩家上线,广播通知其他服务器下线该玩家
- * @param int $uid
- * @param string $ip
- */
- public function noticeOtherOffline($uid, $curClientId) {
- //Gateway::$registerAddress = WORKER_REGISTER_URL;
- if(!Gateway::isUidOnline($uid)){
- return;
- }
- $clientIds = Gateway::getClientIdByUid($uid);
- if(!count($clientIds)){
- return;
- }
- //$str = $this->encryp(json_encode(['cmd'=>array_search('Repeat_Login_Rec',ocmd::$Cmd),'code'=>1]));
- foreach($clientIds as $row){
- $session = isset($_SESSION[$row]) ? $_SESSION[$row] : '';
- if($row != $curClientId && Gateway::isOnline($row)){
- //Gateway::sendToClient($row, $str);
- Gateway::closeClient($row);
- unset($_SESSION[$row]); //清除session
- oo::logs()->debug3(['uid'=>$uid,'curClientId' => $curClientId,'OnlineClientId'=>$row,'session'=>$session,], 'UserBindOther');
- }
- }
- }
- /**
- * 设置玩家在线,不同服之间能获取玩家在线状态(10分钟)
- * @param int $uid
- */
- public function online($uid) {
- oo::commonOprRedis('common')->setex(okeys::userOnline($uid), true, 60 * 10);
- }
- /**
- * 设置玩家下线
- * @param int $uid
- */
- public function offline($uid) {
- oo::commonOprRedis('user')->delete(okeys::userWsReq($uid));
- oo::commonOprRedis('common')->delete(okeys::userOnline($uid));
- }
- public static function getWebsocketServers() {
- if(!isset(oo::$config['workerservers'])){
- return [];
- }
- return oo::$config['workerservers'];
- }
- /**
- * 单播
- * @param string $uid
- * @param string $cmd
- * @param array $data
- */
- public function push($uid,$cmd,$data,$isFast=false){
- $servers = self::getWebsocketServers();
- if(oo::commonOprModel('Workerman')->isUidCurOnline($uid) || empty($servers)){
- //在当前节点,则直接推送
- $this->pushCur($uid, $cmd, $data);
- return;
- }
- if(!IS_DEBUF){//正式环境不再推送
- return;
- }
- foreach($servers as $ip){
- if(!IS_DEBUF){//测试服只有单节点,故兼容golang可看作其他节点
- if($ip == SERVER_INT_IP){ //当前节点,不用操作
- continue;
- }
- }
- if($isFast){
- oo::commonOprRedis('server')->rpush(okeys::pushServer($ip), json_encode([
- "type" => "user", "flag" => $uid, "cmd" => $cmd, "data" => $data
- ]));
- }else{
- oo::commonOprRedis('server')->lpush(okeys::pushServer($ip), json_encode([
- "type" => "user", "flag" => $uid, "cmd" => $cmd, "data" => $data
- ]));
- }
- }
- }
- /**
- * 推送执行指定的函数信息
- * @param string $method model 如:activitynew
- * @param string $fun function 如:sendRankRewardFasebasePushNsq
- * @param string $params 函数参数,如array(1, 0)
- */
- public function pushFun($method, $fun, array $params = []){
- $servers = self::getWebsocketServers();
- if(empty($servers)){
- return;
- }
- if(!IS_DEBUF){//正式环境不再推送
- return;
- }
- foreach($servers as $ip){
- if($fun == 'noticeOtherOffline'){
- if($ip == SERVER_INT_IP){
- continue; //本机不用处理
- }
- //需要马上处理的消息
- oo::commonOprRedis('server')->rpush(okeys::pushServer($ip), json_encode([
- 'type' => 'function', 'cmd' => $method, 'flag' => $fun, 'data' => $params
- ]));
- continue;
- }
- oo::commonOprRedis('server')->lpush(okeys::pushServer($ip), json_encode([
- 'type' => 'function', 'cmd' => $method, 'flag' => $fun, 'data' => $params
- ]));
- }
- }
- /**
- * 群播
- * @param string $group
- * @param string $cmd
- * @param $data
- */
- public function pushGroup($group,$cmd,$data){
- $servers = self::getWebsocketServers();
- if(empty($servers)){
- $this->pushCurGroup($group,$cmd,$data);
- return;
- }
- if(!IS_DEBUF){//正式环境不再推送
- return;
- }
- foreach($servers as $ip){
- if($ip == SERVER_INT_IP){ //当前节点,直接推送
- $this->pushCurGroup($group,$cmd,$data);
- continue;
- }
- oo::commonOprRedis('server')->lpush(okeys::pushServer($ip), json_encode([
- "type" => "group", "flag" => $group, "cmd" => $cmd, "data" => $data
- ]));
- }
- }
- /**
- * 广播
- * @param string $cmd
- * @param array $data
- */
- public function pushAll($cmd,$data){
- $servers = self::getWebsocketServers();
- if(empty($servers)){
- $this->pushCurAll($cmd,$data);
- return;
- }
- if(!IS_DEBUF){//正式环境不再推送
- return;
- }
- foreach($servers as $ip){
- if($ip == SERVER_INT_IP){ //当前节点,直接推送
- $this->pushCurAll($cmd,$data);
- continue;
- }
- oo::commonOprRedis('server')->lpush(okeys::pushServer($ip), json_encode([
- "type" => "broadcast", "flag" => "", "cmd" => $cmd, "data" => $data
- ]));
- }
- }
- /**
- * 获取所有玩家uid列表(此接口改版后只做统计用,需要全服推送请使用队列中的broadcast方案)
- */
- public function getAllUidList(){
- $servers = self::getWebsocketServers();
- $members = [];
- foreach($servers as $ip){
- $tmp = oo::commonOprRedis('common')->hGetAll($this->getOnlineUidsKey(0, $ip));
- if(empty($tmp)){ //防止读的那个瞬间key被delete,重新读一个写key
- $tmp = oo::commonOprRedis('common')->hGetAll($this->getOnlineUidsKey(1, $ip));
- }
- $members = $tmp ? $members+$tmp : $members;
- }
- $members = $members ? array_unique($members) : $members;
- return $members;
- }
- /**
- * 获取key,每30秒切换一次读写key标记
- * @param int $type (0:读key,1:写key)
- * @param string $ip
- * @return string
- */
- public function getOnlineUidsKey($type = 0, $ip = '') {
- $s = date('s');
- if($type == 1){
- $index = $s < 30 ? 1 : 0;
- }else{
- $index = $s < 30 ? 0 : 1;
- }
- $ip = $ip ? $ip : SERVER_INT_IP;
- return okeys::onUidlines($ip, $index);
- }
- /**
- * 获取上一次更新时间
- */
- public function getLastSveTime($ip = '') {
- $ip = $ip ? $ip : SERVER_INT_IP;
- return (int)oo::commonOprRedis('common')->get(okeys::onUidlinesSaveTimes($ip));
- }
- /**
- * 检查当前worker进程是否需要更新当前在线数量(timer:30秒触发一次)
- */
- public function checkNeedSave() {
- $lastTime = $this->getLastSveTime();
- if(time() - $lastTime < 25){ //定时器每30秒跑一次,测试服上进程慢,调宽松一点
- return false;
- }
- return true;
- }
- /**
- * 更新当前节点所有玩家uid列表
- * 存2个redis,保证读写分离
- * @param array $uids
- */
- public function saveAllUids($uids = []) {
- //保存上次更新时间
- oo::commonOprRedis('common')->setex(okeys::onUidlinesSaveTimes(SERVER_INT_IP), time(), 60);
- $key = $this->getOnlineUidsKey(1);
- //临时添加测试在线玩家uid
- if(IS_DEBUF){
- $last = oo::commonOprRedis('common')->hGetAll($key);
- $lost = array_values(array_diff($last, $uids)); //每30秒流失的uid
- $new = array_values(array_diff($uids, $last)); //每30秒新进来的uid,检查新进来的是不是从其它节点过来的uid
- oo::logs()->debug3(['date'=> date('Y-m-d H:i:s', time()), 'key' => $key, 'count' => count($uids), 'last_count' => count($last), 'lost' => $lost, 'new' => $new], 'all_uids.log');
- //正式服下降超过30人记录日志
- }elseif(oo::commonOprRedis('common')->hLen($key) - count($uids) > 15){
- $last = oo::commonOprRedis('common')->hGetAll($key);
- $lost = array_values(array_diff($last, $uids)); //每30秒流失的uid
- $new = array_values(array_diff($uids, $last)); //每30秒新进来的uid,检查新进来的是不是从其它节点过来的uid
- oo::logs()->debug3(['date'=> date('Y-m-d H:i:s', time()), 'key' => $key, 'count' => count($uids), 'last_count' => count($last), 'lost' => $lost, 'new' => $new], 'all_uids.log');
- }
- oo::commonOprRedis('common')->delete($key);
- oo::commonOprRedis('common')->hMset($key, $uids);
- }
- /**
- * 判断当前玩家是否在线
- * @param int $uid
- * @return bool
- */
- public function isUidOnline($uid) {
- if($this->isUidCurOnline($uid)) { //先从本节点worker中获取
- return true;
- }
- //从缓存中获取
- return oo::commonOprRedis('common')->get(okeys::userOnline($uid)) ? true : false;
- }
- public function pushCur($uid,$cmd,$data,$from="php"){
- if(!in_array($cmd,[ocmd::$send['ActivityChangeStatus_New'],ocmd::$send['Activity_timeCollect'],ocmd::$send['Activity_timeCollectBet']])&&$from!="go"){//非活动上下线推送(活动不使用通用topic)
- $nsqData =["type" => "user","uid"=>$uid,"flag" =>$uid, "cmd" => $cmd, "data" => $data];
- funs::SendNsqMsg(ocmd::TopicPHPPush,[],$nsqData);
- }
- Gateway::$registerAddress = WORKER_REGISTER_URL;
- $send_msg = array('cmd'=>$cmd, 'data'=>$data, 'code' => 1);
- $str = json_encode($send_msg,JSON_NUMERIC_CHECK);
- if($this->encryp($uid)){
- $str = base64_encode($str);
- $key = floor(strlen($str)/2)-1;
- $out = substr($str,0,$key).substr(md5(time()),0,16).substr($str,$key,strlen($str));
- $out = substr($out,0,1).substr(md5(time()),0,1).substr($out,1,strlen($out));
- }else{
- $out = $str;
- }
- Gateway::sendToUid($uid, $out);
- }
- public function pushCurGroup($group,$cmd,$data){
- Gateway::$registerAddress = WORKER_REGISTER_URL;
- $send_msg = array('cmd'=>$cmd,'data'=>$data);
- $str = json_encode($send_msg,JSON_NUMERIC_CHECK);
- $str = base64_encode($str);
- $key = floor(strlen($str)/2)-1;
- $out = substr($str,0,$key).substr(md5(time()),0,16).substr($str,$key,strlen($str));
- $out = substr($out,0,1).substr(md5(time()),0,1).substr($out,1,strlen($out));
- Gateway::sendToGroup($group, $out);
- }
- public function pushCurAll($cmd,$data,$from="php"){
- if(!in_array($cmd,[ocmd::$send['ActivityChangeStatus_New'],ocmd::$send['Activity_timeCollect'],ocmd::$send['Activity_timeCollectBet']])&&$from!="go"){//非活动上下线推送(活动不使用通用topic)
- $nsqData =["type" => "broadcast","flag" =>"", "cmd" => $cmd, "data" => $data];
- funs::SendNsqMsg(ocmd::TopicPHPPushAll,[],$nsqData);
- }
- Gateway::$registerAddress = WORKER_REGISTER_URL;
- $send_msg = array('cmd'=>$cmd,'data'=>$data);
- $str = json_encode($send_msg,JSON_NUMERIC_CHECK);
- $str = base64_encode($str);
- $key = floor(strlen($str)/2)-1;
- $out = substr($str,0,$key).substr(md5(time()),0,16).substr($str,$key,strlen($str));
- $out = substr($out,0,1).substr(md5(time()),0,1).substr($out,1,strlen($out));
- Gateway::sendToAll($out);
- }
- public function getCurAllUidList(){
- Gateway::$registerAddress = WORKER_REGISTER_URL;
- return Gateway::getAllUidList();
- }
- /**
- * 判断玩家是否当前节点在线
- */
- public function isUidCurOnline($uid){
- try{
- Gateway::$registerAddress = WORKER_REGISTER_URL;
- return Gateway::isUidOnline($uid);
- }catch (Exception $e){
- return false;
- }
- }
- /**
- * Notes:获取在线ClientId数
- * User: wsc
- * Time: 2020/12/10 16:53
- * @return int
- */
- public function getOnlineCount(){
- try{
- Gateway::$registerAddress = WORKER_REGISTER_URL;
- return Gateway::getAllClientIdCount();
- }catch (Exception $e){
- return 0;
- }
- }
- public function spinsFull($new = false){
- Gateway::$registerAddress = WORKER_REGISTER_URL;
- //$start_time = microtime();
- if($new){
- //新逻辑 不每次都刷新体力
- //1、获取没有满体力的用户 okeys::spinsFirebase()
- //2、计算用户满体力时间 (50 - 当前体力) / 5 / 恢复体力时间 + 体力恢复倒计时,写入新的队列缓存,并删除okeys::spinsFirebase()
- //3、从第二步骤写入的缓存中取出瞒住时间的用户发送消息,并删除该用户的缓存
- $setTime = oo::commonOprModel('readconfig')->getCon('basic','restoreTime');//多久恢复一次体力
- $setSpins = oo::commonOprModel('readconfig')->getCon('basic','restoreSpins');//每次恢复的体力
- //$refreshSpins = oo::commonOprModel('readconfig')->getCon('basic','refreshSpins');//vip追加体力恢复上限
- $time = time();
- $pushArray = oo::commonOprRedis('common')->hGetAll(okeys::spinsFirebase());
- oo::commonOprRedis('common')->delete(okeys::spinsFirebase());
- foreach ($pushArray as $uid => $refreSpins){
- $getUserInfos = oo::commonOprModel('member')->getUserAssetsInfo($uid);
- $maxSpins = oo::commonOprModel('member')->getLevelUpLimit($getUserInfos['levelId'],'spins') ?? 50;
- //VIP恢复体力加速
- /*
- if(oo::commonOprModel('newvip')->checkVip($uid)){
- $setSpinsTemp = $setSpins + $refreshSpins;
- }else{
- $setSpinsTemp = $setSpins;
- }
- */
- if(intval($getUserInfos['spins']) < $maxSpins && $refreSpins){
- if(( $maxSpins - intval($getUserInfos['spins']) ) % $setSpins == 0){
- $endTime = $time + (intval(( $maxSpins - intval($getUserInfos['spins']) ) / $setSpins) - 1) * $setTime + $refreSpins;
- }else{
- $endTime = $time + intval(( $maxSpins - intval($getUserInfos['spins']) ) / $setSpins) * $setTime + $refreSpins;
- }
- oo::commonOprRedis('common')->hSet(okeys::spinsFirebaseEndTime(),$uid,$endTime);
- }
- }
- $endTimeRet = oo::commonOprRedis('common')->hGetAll(okeys::spinsFirebaseEndTime());
- foreach($endTimeRet as $uid => $endTime){
- if($time >= $endTime){
- oo::commonOprRedis('common')->hDel(okeys::spinsFirebaseEndTime(),$uid);
- !IS_DEBUF && oo::commonOprModel('push')->pushNews($uid, 'push.title.spins', 'push.content.spins', [],'spins');
- }
- }
- }else{
- //原逻辑 每次都刷新体力
- $pushArray = oo::commonOprRedis('common')->hKeys(okeys::spinsFirebase());
- oo::commonOprRedis('common')->delete(okeys::spinsFirebase());
- foreach ($pushArray as $row){
- $refreSpins = oo::commonOprModel('user')->refreshSpins($row,1);
- if(!$refreSpins){
- !IS_DEBUF && oo::commonOprModel('push')->pushNews($row, 'push.title.spins', 'push.content.spins', [],'spins');
- }
- }
- }
- //var_dump("消耗时长:".(microtime() - $start_time));
- }
- public function encryp($uid){
- Gateway::$registerAddress = WORKER_REGISTER_URL;
- $clientArr = Gateway::getClientIdByUid($uid);
- if(empty($clientArr)){
- return false;
- }
- foreach ($clientArr as $row){
- $temp = Gateway::getSession($row);
- if(!empty($temp)){
- return $temp[$row]["encryp"];
- }
- }
- }
- }
|