workerman.php 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. <?php
  2. require_once __DIR__ . '/../vendor/autoload.php';
  3. use \GatewayWorker\Lib\Gateway;
  4. use \Workerman\Lib\Timer;
  5. /**
  6. * Workerman
  7. */
  8. class ModelWorkerman{
  9. /**
  10. * redis消息订阅,redis队列逐条消费
  11. */
  12. public function pushMsgSubscribe() {
  13. $ret = oo::commonOprRedis('server')->setnx(okeys::subcribeLock(SERVER_INT_IP), 1);
  14. if(!$ret){ //等待上一次消息处理玩,防止进程阻塞
  15. oo::commonOprRedis('server')->delete(okeys::subcribeLock(SERVER_INT_IP)); //解锁
  16. usleep(500000);
  17. return;
  18. }
  19. // $msg = oo::commonOprRedis('server')->rpop(okeys::pushServer(SERVER_INT_IP));
  20. // if(empty($msg)){ //队列中没有消息了
  21. // oo::commonOprRedis('server')->delete(okeys::subcribeLock(SERVER_INT_IP)); //解锁
  22. // return;
  23. // }
  24. $i=1000;
  25. while($i){
  26. $msg = oo::commonOprRedis('server')->rpop(okeys::pushServer(SERVER_INT_IP));
  27. if(empty($msg)){ //队列中没有消息了
  28. break;
  29. }
  30. $this->send($msg);
  31. $i--;
  32. }
  33. oo::commonOprRedis('server')->delete(okeys::subcribeLock(SERVER_INT_IP)); //解锁
  34. return true;
  35. }
  36. public function send($msg){
  37. $data = (array)json_decode($msg, true);
  38. try{
  39. if(!isset($data['type'], $data['cmd'], $data['data'], $data['flag'])){
  40. oo::logs()->debug3(date('Y-m-d H:i:s').'-订阅异常,参数不完整:'.json_encode($data), 'pushmsg.log');
  41. return;
  42. }
  43. switch($data['type']){
  44. case 'user': //单播
  45. $this->pushCur($data['flag'], $data['cmd'], $data['data'],$data['from']);
  46. break;
  47. case 'group': //组播
  48. $this->pushCurGroup($data['flag'], $data['cmd'], $data['data']);
  49. break;
  50. case 'broadcast': //广播
  51. $this->pushCurAll($data['cmd'], $data['data'],$data['from']);
  52. break;
  53. case 'function': //获取当前服务器所有在线玩家信息,并执行object->action
  54. $fun = $data['flag'];
  55. $model = oo::commonOprModel($data['cmd']);
  56. if($model == -1){
  57. oo::logs()->debug3(date('Y-m-d H:i:s').'-订阅异常,对象不存在:'.$data['cmd'], 'pushmsg_function.log');
  58. return;
  59. }
  60. if(!method_exists($model, $fun)){
  61. oo::logs()->debug3(date('Y-m-d H:i:s').'-订阅异常,方法不存在:'.$fun, 'pushmsg_function.log');
  62. return;
  63. }
  64. //下线日志太多,不记录日志了
  65. if($data['flag'] == 'noticeOtherOffline'){
  66. return;
  67. }
  68. oo::logs()->debug3(date('Y-m-d H:i:s').'-订阅到了函数方法,参数:'.$msg, 'pushmsg_function.log');
  69. oo::commonOprModel($data['cmd'])->$fun(...$data['data']);
  70. break;
  71. }
  72. }catch(Exception $e){
  73. oo::logs()->debug3(date('Y-m-d H:i:s').'-订阅异常:'.$e->getMessage().'消息:'.$msg, 'pushmsg_function.log');
  74. return;
  75. }
  76. }
  77. /**
  78. * 玩家上线,广播通知其他服务器下线该玩家
  79. * @param int $uid
  80. * @param string $ip
  81. */
  82. public function noticeOtherOffline($uid, $curClientId) {
  83. //Gateway::$registerAddress = WORKER_REGISTER_URL;
  84. if(!Gateway::isUidOnline($uid)){
  85. return;
  86. }
  87. $clientIds = Gateway::getClientIdByUid($uid);
  88. if(!count($clientIds)){
  89. return;
  90. }
  91. //$str = $this->encryp(json_encode(['cmd'=>array_search('Repeat_Login_Rec',ocmd::$Cmd),'code'=>1]));
  92. foreach($clientIds as $row){
  93. $session = isset($_SESSION[$row]) ? $_SESSION[$row] : '';
  94. if($row != $curClientId && Gateway::isOnline($row)){
  95. //Gateway::sendToClient($row, $str);
  96. Gateway::closeClient($row);
  97. unset($_SESSION[$row]); //清除session
  98. oo::logs()->debug3(['uid'=>$uid,'curClientId' => $curClientId,'OnlineClientId'=>$row,'session'=>$session,], 'UserBindOther');
  99. }
  100. }
  101. }
  102. /**
  103. * 设置玩家在线,不同服之间能获取玩家在线状态(10分钟)
  104. * @param int $uid
  105. */
  106. public function online($uid) {
  107. oo::commonOprRedis('common')->setex(okeys::userOnline($uid), true, 60 * 10);
  108. }
  109. /**
  110. * 设置玩家下线
  111. * @param int $uid
  112. */
  113. public function offline($uid) {
  114. oo::commonOprRedis('user')->delete(okeys::userWsReq($uid));
  115. oo::commonOprRedis('common')->delete(okeys::userOnline($uid));
  116. }
  117. public static function getWebsocketServers() {
  118. if(!isset(oo::$config['workerservers'])){
  119. return [];
  120. }
  121. return oo::$config['workerservers'];
  122. }
  123. /**
  124. * 单播
  125. * @param string $uid
  126. * @param string $cmd
  127. * @param array $data
  128. */
  129. public function push($uid,$cmd,$data,$isFast=false){
  130. $servers = self::getWebsocketServers();
  131. if(oo::commonOprModel('Workerman')->isUidCurOnline($uid) || empty($servers)){
  132. //在当前节点,则直接推送
  133. $this->pushCur($uid, $cmd, $data);
  134. return;
  135. }
  136. if(!IS_DEBUF){//正式环境不再推送
  137. return;
  138. }
  139. foreach($servers as $ip){
  140. if(!IS_DEBUF){//测试服只有单节点,故兼容golang可看作其他节点
  141. if($ip == SERVER_INT_IP){ //当前节点,不用操作
  142. continue;
  143. }
  144. }
  145. if($isFast){
  146. oo::commonOprRedis('server')->rpush(okeys::pushServer($ip), json_encode([
  147. "type" => "user", "flag" => $uid, "cmd" => $cmd, "data" => $data
  148. ]));
  149. }else{
  150. oo::commonOprRedis('server')->lpush(okeys::pushServer($ip), json_encode([
  151. "type" => "user", "flag" => $uid, "cmd" => $cmd, "data" => $data
  152. ]));
  153. }
  154. }
  155. }
  156. /**
  157. * 推送执行指定的函数信息
  158. * @param string $method model 如:activitynew
  159. * @param string $fun function 如:sendRankRewardFasebasePushNsq
  160. * @param string $params 函数参数,如array(1, 0)
  161. */
  162. public function pushFun($method, $fun, array $params = []){
  163. $servers = self::getWebsocketServers();
  164. if(empty($servers)){
  165. return;
  166. }
  167. if(!IS_DEBUF){//正式环境不再推送
  168. return;
  169. }
  170. foreach($servers as $ip){
  171. if($fun == 'noticeOtherOffline'){
  172. if($ip == SERVER_INT_IP){
  173. continue; //本机不用处理
  174. }
  175. //需要马上处理的消息
  176. oo::commonOprRedis('server')->rpush(okeys::pushServer($ip), json_encode([
  177. 'type' => 'function', 'cmd' => $method, 'flag' => $fun, 'data' => $params
  178. ]));
  179. continue;
  180. }
  181. oo::commonOprRedis('server')->lpush(okeys::pushServer($ip), json_encode([
  182. 'type' => 'function', 'cmd' => $method, 'flag' => $fun, 'data' => $params
  183. ]));
  184. }
  185. }
  186. /**
  187. * 群播
  188. * @param string $group
  189. * @param string $cmd
  190. * @param $data
  191. */
  192. public function pushGroup($group,$cmd,$data){
  193. $servers = self::getWebsocketServers();
  194. if(empty($servers)){
  195. $this->pushCurGroup($group,$cmd,$data);
  196. return;
  197. }
  198. if(!IS_DEBUF){//正式环境不再推送
  199. return;
  200. }
  201. foreach($servers as $ip){
  202. if($ip == SERVER_INT_IP){ //当前节点,直接推送
  203. $this->pushCurGroup($group,$cmd,$data);
  204. continue;
  205. }
  206. oo::commonOprRedis('server')->lpush(okeys::pushServer($ip), json_encode([
  207. "type" => "group", "flag" => $group, "cmd" => $cmd, "data" => $data
  208. ]));
  209. }
  210. }
  211. /**
  212. * 广播
  213. * @param string $cmd
  214. * @param array $data
  215. */
  216. public function pushAll($cmd,$data){
  217. $servers = self::getWebsocketServers();
  218. if(empty($servers)){
  219. $this->pushCurAll($cmd,$data);
  220. return;
  221. }
  222. if(!IS_DEBUF){//正式环境不再推送
  223. return;
  224. }
  225. foreach($servers as $ip){
  226. if($ip == SERVER_INT_IP){ //当前节点,直接推送
  227. $this->pushCurAll($cmd,$data);
  228. continue;
  229. }
  230. oo::commonOprRedis('server')->lpush(okeys::pushServer($ip), json_encode([
  231. "type" => "broadcast", "flag" => "", "cmd" => $cmd, "data" => $data
  232. ]));
  233. }
  234. }
  235. /**
  236. * 获取所有玩家uid列表(此接口改版后只做统计用,需要全服推送请使用队列中的broadcast方案)
  237. */
  238. public function getAllUidList(){
  239. $servers = self::getWebsocketServers();
  240. $members = [];
  241. foreach($servers as $ip){
  242. $tmp = oo::commonOprRedis('common')->hGetAll($this->getOnlineUidsKey(0, $ip));
  243. if(empty($tmp)){ //防止读的那个瞬间key被delete,重新读一个写key
  244. $tmp = oo::commonOprRedis('common')->hGetAll($this->getOnlineUidsKey(1, $ip));
  245. }
  246. $members = $tmp ? $members+$tmp : $members;
  247. }
  248. $members = $members ? array_unique($members) : $members;
  249. return $members;
  250. }
  251. /**
  252. * 获取key,每30秒切换一次读写key标记
  253. * @param int $type (0:读key,1:写key)
  254. * @param string $ip
  255. * @return string
  256. */
  257. public function getOnlineUidsKey($type = 0, $ip = '') {
  258. $s = date('s');
  259. if($type == 1){
  260. $index = $s < 30 ? 1 : 0;
  261. }else{
  262. $index = $s < 30 ? 0 : 1;
  263. }
  264. $ip = $ip ? $ip : SERVER_INT_IP;
  265. return okeys::onUidlines($ip, $index);
  266. }
  267. /**
  268. * 获取上一次更新时间
  269. */
  270. public function getLastSveTime($ip = '') {
  271. $ip = $ip ? $ip : SERVER_INT_IP;
  272. return (int)oo::commonOprRedis('common')->get(okeys::onUidlinesSaveTimes($ip));
  273. }
  274. /**
  275. * 检查当前worker进程是否需要更新当前在线数量(timer:30秒触发一次)
  276. */
  277. public function checkNeedSave() {
  278. $lastTime = $this->getLastSveTime();
  279. if(time() - $lastTime < 25){ //定时器每30秒跑一次,测试服上进程慢,调宽松一点
  280. return false;
  281. }
  282. return true;
  283. }
  284. /**
  285. * 更新当前节点所有玩家uid列表
  286. * 存2个redis,保证读写分离
  287. * @param array $uids
  288. */
  289. public function saveAllUids($uids = []) {
  290. //保存上次更新时间
  291. oo::commonOprRedis('common')->setex(okeys::onUidlinesSaveTimes(SERVER_INT_IP), time(), 60);
  292. $key = $this->getOnlineUidsKey(1);
  293. //临时添加测试在线玩家uid
  294. if(IS_DEBUF){
  295. $last = oo::commonOprRedis('common')->hGetAll($key);
  296. $lost = array_values(array_diff($last, $uids)); //每30秒流失的uid
  297. $new = array_values(array_diff($uids, $last)); //每30秒新进来的uid,检查新进来的是不是从其它节点过来的uid
  298. oo::logs()->debug3(['date'=> date('Y-m-d H:i:s', time()), 'key' => $key, 'count' => count($uids), 'last_count' => count($last), 'lost' => $lost, 'new' => $new], 'all_uids.log');
  299. //正式服下降超过30人记录日志
  300. }elseif(oo::commonOprRedis('common')->hLen($key) - count($uids) > 15){
  301. $last = oo::commonOprRedis('common')->hGetAll($key);
  302. $lost = array_values(array_diff($last, $uids)); //每30秒流失的uid
  303. $new = array_values(array_diff($uids, $last)); //每30秒新进来的uid,检查新进来的是不是从其它节点过来的uid
  304. oo::logs()->debug3(['date'=> date('Y-m-d H:i:s', time()), 'key' => $key, 'count' => count($uids), 'last_count' => count($last), 'lost' => $lost, 'new' => $new], 'all_uids.log');
  305. }
  306. oo::commonOprRedis('common')->delete($key);
  307. oo::commonOprRedis('common')->hMset($key, $uids);
  308. }
  309. /**
  310. * 判断当前玩家是否在线
  311. * @param int $uid
  312. * @return bool
  313. */
  314. public function isUidOnline($uid) {
  315. if($this->isUidCurOnline($uid)) { //先从本节点worker中获取
  316. return true;
  317. }
  318. //从缓存中获取
  319. return oo::commonOprRedis('common')->get(okeys::userOnline($uid)) ? true : false;
  320. }
  321. public function pushCur($uid,$cmd,$data,$from="php"){
  322. if(!in_array($cmd,[ocmd::$send['ActivityChangeStatus_New'],ocmd::$send['Activity_timeCollect'],ocmd::$send['Activity_timeCollectBet']])&&$from!="go"){//非活动上下线推送(活动不使用通用topic)
  323. $nsqData =["type" => "user","uid"=>$uid,"flag" =>$uid, "cmd" => $cmd, "data" => $data];
  324. funs::SendNsqMsg(ocmd::TopicPHPPush,[],$nsqData);
  325. }
  326. Gateway::$registerAddress = WORKER_REGISTER_URL;
  327. $send_msg = array('cmd'=>$cmd, 'data'=>$data, 'code' => 1);
  328. $str = json_encode($send_msg,JSON_NUMERIC_CHECK);
  329. if($this->encryp($uid)){
  330. $str = base64_encode($str);
  331. $key = floor(strlen($str)/2)-1;
  332. $out = substr($str,0,$key).substr(md5(time()),0,16).substr($str,$key,strlen($str));
  333. $out = substr($out,0,1).substr(md5(time()),0,1).substr($out,1,strlen($out));
  334. }else{
  335. $out = $str;
  336. }
  337. Gateway::sendToUid($uid, $out);
  338. }
  339. public function pushCurGroup($group,$cmd,$data){
  340. Gateway::$registerAddress = WORKER_REGISTER_URL;
  341. $send_msg = array('cmd'=>$cmd,'data'=>$data);
  342. $str = json_encode($send_msg,JSON_NUMERIC_CHECK);
  343. $str = base64_encode($str);
  344. $key = floor(strlen($str)/2)-1;
  345. $out = substr($str,0,$key).substr(md5(time()),0,16).substr($str,$key,strlen($str));
  346. $out = substr($out,0,1).substr(md5(time()),0,1).substr($out,1,strlen($out));
  347. Gateway::sendToGroup($group, $out);
  348. }
  349. public function pushCurAll($cmd,$data,$from="php"){
  350. if(!in_array($cmd,[ocmd::$send['ActivityChangeStatus_New'],ocmd::$send['Activity_timeCollect'],ocmd::$send['Activity_timeCollectBet']])&&$from!="go"){//非活动上下线推送(活动不使用通用topic)
  351. $nsqData =["type" => "broadcast","flag" =>"", "cmd" => $cmd, "data" => $data];
  352. funs::SendNsqMsg(ocmd::TopicPHPPushAll,[],$nsqData);
  353. }
  354. Gateway::$registerAddress = WORKER_REGISTER_URL;
  355. $send_msg = array('cmd'=>$cmd,'data'=>$data);
  356. $str = json_encode($send_msg,JSON_NUMERIC_CHECK);
  357. $str = base64_encode($str);
  358. $key = floor(strlen($str)/2)-1;
  359. $out = substr($str,0,$key).substr(md5(time()),0,16).substr($str,$key,strlen($str));
  360. $out = substr($out,0,1).substr(md5(time()),0,1).substr($out,1,strlen($out));
  361. Gateway::sendToAll($out);
  362. }
  363. public function getCurAllUidList(){
  364. Gateway::$registerAddress = WORKER_REGISTER_URL;
  365. return Gateway::getAllUidList();
  366. }
  367. /**
  368. * 判断玩家是否当前节点在线
  369. */
  370. public function isUidCurOnline($uid){
  371. try{
  372. Gateway::$registerAddress = WORKER_REGISTER_URL;
  373. return Gateway::isUidOnline($uid);
  374. }catch (Exception $e){
  375. return false;
  376. }
  377. }
  378. /**
  379. * Notes:获取在线ClientId数
  380. * User: wsc
  381. * Time: 2020/12/10 16:53
  382. * @return int
  383. */
  384. public function getOnlineCount(){
  385. try{
  386. Gateway::$registerAddress = WORKER_REGISTER_URL;
  387. return Gateway::getAllClientIdCount();
  388. }catch (Exception $e){
  389. return 0;
  390. }
  391. }
  392. public function spinsFull($new = false){
  393. Gateway::$registerAddress = WORKER_REGISTER_URL;
  394. //$start_time = microtime();
  395. if($new){
  396. //新逻辑 不每次都刷新体力
  397. //1、获取没有满体力的用户 okeys::spinsFirebase()
  398. //2、计算用户满体力时间 (50 - 当前体力) / 5 / 恢复体力时间 + 体力恢复倒计时,写入新的队列缓存,并删除okeys::spinsFirebase()
  399. //3、从第二步骤写入的缓存中取出瞒住时间的用户发送消息,并删除该用户的缓存
  400. $setTime = oo::commonOprModel('readconfig')->getCon('basic','restoreTime');//多久恢复一次体力
  401. $setSpins = oo::commonOprModel('readconfig')->getCon('basic','restoreSpins');//每次恢复的体力
  402. //$refreshSpins = oo::commonOprModel('readconfig')->getCon('basic','refreshSpins');//vip追加体力恢复上限
  403. $time = time();
  404. $pushArray = oo::commonOprRedis('common')->hGetAll(okeys::spinsFirebase());
  405. oo::commonOprRedis('common')->delete(okeys::spinsFirebase());
  406. foreach ($pushArray as $uid => $refreSpins){
  407. $getUserInfos = oo::commonOprModel('member')->getUserAssetsInfo($uid);
  408. $maxSpins = oo::commonOprModel('member')->getLevelUpLimit($getUserInfos['levelId'],'spins') ?? 50;
  409. //VIP恢复体力加速
  410. /*
  411. if(oo::commonOprModel('newvip')->checkVip($uid)){
  412. $setSpinsTemp = $setSpins + $refreshSpins;
  413. }else{
  414. $setSpinsTemp = $setSpins;
  415. }
  416. */
  417. if(intval($getUserInfos['spins']) < $maxSpins && $refreSpins){
  418. if(( $maxSpins - intval($getUserInfos['spins']) ) % $setSpins == 0){
  419. $endTime = $time + (intval(( $maxSpins - intval($getUserInfos['spins']) ) / $setSpins) - 1) * $setTime + $refreSpins;
  420. }else{
  421. $endTime = $time + intval(( $maxSpins - intval($getUserInfos['spins']) ) / $setSpins) * $setTime + $refreSpins;
  422. }
  423. oo::commonOprRedis('common')->hSet(okeys::spinsFirebaseEndTime(),$uid,$endTime);
  424. }
  425. }
  426. $endTimeRet = oo::commonOprRedis('common')->hGetAll(okeys::spinsFirebaseEndTime());
  427. foreach($endTimeRet as $uid => $endTime){
  428. if($time >= $endTime){
  429. oo::commonOprRedis('common')->hDel(okeys::spinsFirebaseEndTime(),$uid);
  430. !IS_DEBUF && oo::commonOprModel('push')->pushNews($uid, 'push.title.spins', 'push.content.spins', [],'spins');
  431. }
  432. }
  433. }else{
  434. //原逻辑 每次都刷新体力
  435. $pushArray = oo::commonOprRedis('common')->hKeys(okeys::spinsFirebase());
  436. oo::commonOprRedis('common')->delete(okeys::spinsFirebase());
  437. foreach ($pushArray as $row){
  438. $refreSpins = oo::commonOprModel('user')->refreshSpins($row,1);
  439. if(!$refreSpins){
  440. !IS_DEBUF && oo::commonOprModel('push')->pushNews($row, 'push.title.spins', 'push.content.spins', [],'spins');
  441. }
  442. }
  443. }
  444. //var_dump("消耗时长:".(microtime() - $start_time));
  445. }
  446. public function encryp($uid){
  447. Gateway::$registerAddress = WORKER_REGISTER_URL;
  448. $clientArr = Gateway::getClientIdByUid($uid);
  449. if(empty($clientArr)){
  450. return false;
  451. }
  452. foreach ($clientArr as $row){
  453. $temp = Gateway::getSession($row);
  454. if(!empty($temp)){
  455. return $temp[$row]["encryp"];
  456. }
  457. }
  458. }
  459. }