123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- <?php
- /**
- * Notes:异步操作类
- * User : wsc
- * Date : 2020/1/9 18:14
- * Class async
- */
- class async
- {
- public $length;//队列长度
- public $mkRdsList='MK_REDIS_LIST';
- public function __construct(){
- $this->redis = oo::commonOprRedis('common');
- }
- /**
- * Notes:添加异步调用监听
- * User : wsc
- * Date : 2020/1/9 18:17
- * @param string $obj 调用对象和方法 如 oo::commonOprModel("a_test")->addOneLog
- * @param array $args 调用参数列表,数据组型,如参数为 1, array('type'=>1) 则传参 array(1, array('type'=>1))
- * @return bool
- */
- public function add($obj, $args){
- if( ( !$obj) || ( ! is_array( $args))){
- return false;
- }
- $sData = json_encode( array($obj, $args) );
- // if(false){
- // return $this->runCode( $sData);
- // }
- $this->lpush($sData);//直接添加到redis队列
- return true;
- }
- /**
- * Notes:入队
- * User : wsc
- * Date : 2020/1/9 18:52
- * @param $sData
- * @return int
- */
- public function lpush($sData){
- // oo::logs()->commlog( date('H:i:s').json_encode($sData), 'async/lpush');
- $this->length = $this->redis->lPush( $this->mkRdsList, $sData);
- if($this->length === false){
- $msg = "[lpush error:] line:" . __LINE__;
- oo::logs()->commlog( $msg, 'async/callback_error');
- }
- if($this->length > 1000){
- $msg = "[list length error:] line:" . __LINE__ . 'length:' . $this->length;
- oo::logs()->commlog( $msg, 'async/callback_error' );
- }
- return $this->length;
- }
- /**
- * Notes:取值
- * User : wsc
- * Date : 2020/1/9 18:52
- * @return string
- */
- public function rpop(){
- $res = (string) $this->redis->rPop( $this->mkRdsList );
- if( $res ){
- return $res;
- }
- $this->redis->close();
- return $res;
- }
- /**
- * 执行异步调用
- * @param $proc
- * @return bool
- */
- public function do_async($proc=0){
- $i = 0;
- while(true){
- if($i++ == 50){
- //需清缓存防止内存泄漏
- oo::destroy();
- break;
- }
- $sdata = $this->rpop();
- $sdata = trim( $sdata);
- if(!$sdata){
- //需清缓存防止内存泄漏
- oo::destroy();
- break;
- }
- // if(IS_DEBUF){
- // oo::logs()->commlog( array('data'=>$sdata,'redis'=>$this->redis->version), 'async/rpop' );
- // }
- $this->runCode( $sdata);
- // touch( PATH_DAT . "callbackdo.run.{$proc}");//运行记录
- }
- //关闭数据库,redis连接
- oo::closeDbAndNosql();
- return true;
- }
- /**
- * 实际执行代码
- * @param $sdata
- * @return mixed
- */
- public function runCode($sdata){
- $aData = json_decode($sdata, true);
- $args = '';
- $time1 = microtime(true);
- foreach($aData[1] as $k => $v){
- if( (! is_scalar($v)) || is_bool($v)){
- $param = var_export($v, true);
- $args .= $param . ',';
- }else{
- $param = (substr($v, 0, 5) == 'array') ? $v . ',' : "'{$v}',";
- $args .= $param;
- }
- }
- $args = substr($args, 0, -1);
- $scall = $aData[0] . '(' . $args . ')';
- $evalRet = eval("\$ret = $scall;");
- if($evalRet === false){
- $error = error_get_last();
- oo::logs()->commlog( array($scall, $error), 'async/callback_exec_error' );
- }
- $time2 = microtime(true);
- // if(IS_DEBUF){
- // oo::logs()->commlog( array('eval'=>$scall, 'ts'=>$time2-$time1), 'async/eval_log' );
- // }
- return true;
- }
- }
|