async.php 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. <?php
  2. /**
  3. * Notes:异步操作类
  4. * User : wsc
  5. * Date : 2020/1/9 18:14
  6. * Class async
  7. */
  8. class async
  9. {
  10. public $length;//队列长度
  11. public $mkRdsList='MK_REDIS_LIST';
  12. public function __construct(){
  13. $this->redis = oo::commonOprRedis('common');
  14. }
  15. /**
  16. * Notes:添加异步调用监听
  17. * User : wsc
  18. * Date : 2020/1/9 18:17
  19. * @param string $obj 调用对象和方法 如 oo::commonOprModel("a_test")->addOneLog
  20. * @param array $args 调用参数列表,数据组型,如参数为 1, array('type'=>1) 则传参 array(1, array('type'=>1))
  21. * @return bool
  22. */
  23. public function add($obj, $args){
  24. if( ( !$obj) || ( ! is_array( $args))){
  25. return false;
  26. }
  27. $sData = json_encode( array($obj, $args) );
  28. // if(false){
  29. // return $this->runCode( $sData);
  30. // }
  31. $this->lpush($sData);//直接添加到redis队列
  32. return true;
  33. }
  34. /**
  35. * Notes:入队
  36. * User : wsc
  37. * Date : 2020/1/9 18:52
  38. * @param $sData
  39. * @return int
  40. */
  41. public function lpush($sData){
  42. // oo::logs()->commlog( date('H:i:s').json_encode($sData), 'async/lpush');
  43. $this->length = $this->redis->lPush( $this->mkRdsList, $sData);
  44. if($this->length === false){
  45. $msg = "[lpush error:] line:" . __LINE__;
  46. oo::logs()->commlog( $msg, 'async/callback_error');
  47. }
  48. if($this->length > 1000){
  49. $msg = "[list length error:] line:" . __LINE__ . 'length:' . $this->length;
  50. oo::logs()->commlog( $msg, 'async/callback_error' );
  51. }
  52. return $this->length;
  53. }
  54. /**
  55. * Notes:取值
  56. * User : wsc
  57. * Date : 2020/1/9 18:52
  58. * @return string
  59. */
  60. public function rpop(){
  61. $res = (string) $this->redis->rPop( $this->mkRdsList );
  62. if( $res ){
  63. return $res;
  64. }
  65. $this->redis->close();
  66. return $res;
  67. }
  68. /**
  69. * 执行异步调用
  70. * @param $proc
  71. * @return bool
  72. */
  73. public function do_async($proc=0){
  74. $i = 0;
  75. while(true){
  76. if($i++ == 50){
  77. //需清缓存防止内存泄漏
  78. oo::destroy();
  79. break;
  80. }
  81. $sdata = $this->rpop();
  82. $sdata = trim( $sdata);
  83. if(!$sdata){
  84. //需清缓存防止内存泄漏
  85. oo::destroy();
  86. break;
  87. }
  88. // if(IS_DEBUF){
  89. // oo::logs()->commlog( array('data'=>$sdata,'redis'=>$this->redis->version), 'async/rpop' );
  90. // }
  91. $this->runCode( $sdata);
  92. // touch( PATH_DAT . "callbackdo.run.{$proc}");//运行记录
  93. }
  94. //关闭数据库,redis连接
  95. oo::closeDbAndNosql();
  96. return true;
  97. }
  98. /**
  99. * 实际执行代码
  100. * @param $sdata
  101. * @return mixed
  102. */
  103. public function runCode($sdata){
  104. $aData = json_decode($sdata, true);
  105. $args = '';
  106. $time1 = microtime(true);
  107. foreach($aData[1] as $k => $v){
  108. if( (! is_scalar($v)) || is_bool($v)){
  109. $param = var_export($v, true);
  110. $args .= $param . ',';
  111. }else{
  112. $param = (substr($v, 0, 5) == 'array') ? $v . ',' : "'{$v}',";
  113. $args .= $param;
  114. }
  115. }
  116. $args = substr($args, 0, -1);
  117. $scall = $aData[0] . '(' . $args . ')';
  118. $evalRet = eval("\$ret = $scall;");
  119. if($evalRet === false){
  120. $error = error_get_last();
  121. oo::logs()->commlog( array($scall, $error), 'async/callback_exec_error' );
  122. }
  123. $time2 = microtime(true);
  124. // if(IS_DEBUF){
  125. // oo::logs()->commlog( array('eval'=>$scall, 'ts'=>$time2-$time1), 'async/eval_log' );
  126. // }
  127. return true;
  128. }
  129. }