Core.php 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714
  1. <?php
  2. /**
  3. * Chrome Mozilla/5.0 (Windows NT 6.1) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.47 Safari/536.11
  4. * IE6 Mozilla/5.0 (Windows NT 6.1; rv:9.0.1) Gecko/20100101 Firefox/9.0.1
  5. * FF Mozilla/5.0 (Windows NT 6.1; WOW64; rv:24.0) Gecko/20100101 Firefox/24.0
  6. *
  7. * more useragent:http://www.useragentstring.com/
  8. *
  9. * @author admin@phpdr.net
  10. *
  11. */
  12. class CurlMulti_Core {
  13. // url
  14. const TASK_ITEM_URL = 0x01;
  15. // file
  16. const TASK_ITEM_FILE = 0x02;
  17. // arguments
  18. const TASK_ITEM_ARGS = 0x03;
  19. // operation, task level
  20. const TASK_ITEM_OPT = 0x04;
  21. // control options
  22. const TASK_ITEM_CTL = 0x05;
  23. // file pointer
  24. const TASK_FP = 0x06;
  25. // success callback
  26. const TASK_PROCESS = 0x07;
  27. // curl fail callback
  28. const TASK_FAIL = 0x08;
  29. // tryed times
  30. const TASK_TRYED = 0x09;
  31. // handler
  32. const TASK_CH = 0x0A;
  33. // global max thread num
  34. public $maxThread = 10;
  35. // Max thread by task type.Task type is specified in $item['ctl'] in add().If task has no type,$this->maxThreadNoType is maxThread-sum(maxThreadType).If less than 0 $this->maxThreadNoType is set to 0.
  36. public $maxThreadType = array ();
  37. // retry time(s) when task failed
  38. public $maxTry = 3;
  39. // operation, class level curl opt
  40. public $opt = array ();
  41. // cache options,dirLevel values is less than 3
  42. public $cache = array (
  43. 'enable' => false,
  44. 'enableDownload' => false,
  45. 'compress' => false,
  46. 'dir' => null,
  47. 'expire' => 86400,
  48. 'dirLevel' => 1
  49. );
  50. // stack or queue
  51. public $taskPoolType = 'queue';
  52. // eliminate duplicate for taskpool, will delete previous task and add new one
  53. public $taskOverride = false;
  54. // task callback,add() should be called in callback, $cbTask[0] is callback, $cbTask[1] is param.
  55. public $cbTask = null;
  56. // status callback
  57. public $cbInfo = null;
  58. // user callback
  59. public $cbUser = null;
  60. // common fail callback, called if no one specified
  61. public $cbFail = null;
  62. // is the loop running
  63. protected $isRunning = false;
  64. // max thread num no type
  65. protected $maxThreadNoType = null;
  66. // all added task was saved here first
  67. protected $taskPool = array ();
  68. // taskPool with high priority
  69. protected $taskPoolAhead = array ();
  70. // running task(s)
  71. protected $taskRunning = array ();
  72. // failed task need to retry
  73. protected $taskFail = array ();
  74. // handle of multi-thread curl
  75. private $mh = null;
  76. // user error
  77. private $userError = null;
  78. // if __construct called
  79. private $isConstructCalled = false;
  80. // running info
  81. private $info = array (
  82. 'all' => array (
  83. // process start time
  84. 'startTime' => null,
  85. // download start time
  86. 'startTimeDownload' => null,
  87. // the real multi-thread num
  88. 'activeNum' => null,
  89. // finished task in the queue
  90. 'queueNum' => null,
  91. // byte
  92. 'downloadSize' => 0,
  93. // finished task number,include failed task and cache
  94. 'finishNum' => 0,
  95. // The number of cache used
  96. 'cacheNum' => 0,
  97. // completely failed task number
  98. 'failNum' => 0,
  99. // task num has added
  100. 'taskNum' => 0,
  101. // $this->taskRunning size
  102. 'taskRunningNum' => 0,
  103. // task running num by type,
  104. 'taskRunningNumType' => array (),
  105. // task ruuning num no type
  106. 'taskRunningNumNoType' => 0,
  107. // $this->taskPool size
  108. 'taskPoolNum' => 0,
  109. // $this->taskFail size
  110. 'taskFailNum' => 0,
  111. // finish percent
  112. 'finishPercent' => 0,
  113. // time cost
  114. 'timeSpent' => 0,
  115. // download time cost
  116. 'timeSpentDownload' => 0,
  117. // curl task speed
  118. 'taskSpeedNoCache' => 0,
  119. // network speed, bytes
  120. 'downloadSpeed' => 0
  121. ),
  122. 'running' => array ()
  123. );
  124. function __construct() {
  125. $this->isConstructCalled = true;
  126. if (version_compare ( PHP_VERSION, '5.1.0' ) < 0) {
  127. throw new CurlMulti_Exception ( 'PHP 5.1.0+ is needed' );
  128. }
  129. }
  130. /**
  131. * add a task to taskPool
  132. *
  133. * @param array $item
  134. * array('url'=>'',['file'=>'',['opt'=>array(),['args'=>array(),['ctl'=>array('type'=>'','ahead'=>false,'cache'=>array('enable'=>bool,'expire'=>0),'close'=>true))]]]])
  135. * @param mixed $process
  136. * success callback,for callback first param array('info'=>,'content'=>), second param $item[args]
  137. * @param mixed $fail
  138. * curl fail callback,for callback first param array('error'=>array(0=>code,1=>msg),'info'=>array),second param $item[args];
  139. * @throws CurlMulti_Exception
  140. * @return CurlMulti_Core
  141. */
  142. function add(array $item, $process = null, $fail = null) {
  143. // check
  144. if (! is_array ( $item )) {
  145. user_error ( 'item must be array, item is ' . gettype ( $item ), E_USER_WARNING );
  146. } else {
  147. if (! isset ( $item ['url'] ) && ! empty ( $item ['opt'] [CURLOPT_URL] )) {
  148. $item ['url'] = $item ['opt'] [CURLOPT_URL];
  149. }
  150. $item ['url'] = trim ( $item ['url'] );
  151. if (empty ( $item ['url'] )) {
  152. user_error ( "url can't be empty, url=$item[url]", E_USER_WARNING );
  153. } else {
  154. // replace space with + to avoid some curl problems
  155. $item ['url'] = str_replace ( ' ', '+', $item ['url'] );
  156. if (array_key_exists ( 'fragment', parse_url ( $item ['url'] ) )) {
  157. $pos = strrpos ( $item ['url'], '#' );
  158. $item ['url'] = substr ( $item ['url'], 0, $pos );
  159. }
  160. // fix
  161. if (empty ( $item ['file'] ))
  162. $item ['file'] = null;
  163. if (empty ( $item ['opt'] ))
  164. $item ['opt'] = array ();
  165. if (! array_key_exists ( 'args', $item ))
  166. $item ['args'] = null;
  167. if (empty ( $item ['ctl'] )) {
  168. $item ['ctl'] = array ();
  169. }
  170. if (! isset ( $item ['ctl'] ['cache'] ) || ! isset ( $item ['ctl'] ['cache'] ['enable'] )) {
  171. $item ['ctl'] ['cache'] = array (
  172. 'enable' => false,
  173. 'expire' => 0
  174. );
  175. }
  176. if (! isset ( $item ['ctl'] ['ahead'] )) {
  177. $item ['ctl'] ['ahead'] = false;
  178. }
  179. if (empty ( $process )) {
  180. $process = null;
  181. }
  182. if (empty ( $fail )) {
  183. $fail = null;
  184. }
  185. $task = array ();
  186. $task [self::TASK_ITEM_URL] = $item ['url'];
  187. $task [self::TASK_ITEM_FILE] = $item ['file'];
  188. $task [self::TASK_ITEM_ARGS] = array (
  189. $item ['args']
  190. );
  191. $task [self::TASK_ITEM_OPT] = $item ['opt'];
  192. $task [self::TASK_ITEM_CTL] = $item ['ctl'];
  193. $task [self::TASK_PROCESS] = $process;
  194. $task [self::TASK_FAIL] = $fail;
  195. $task [self::TASK_TRYED] = 0;
  196. $task [self::TASK_CH] = null;
  197. $this->addTaskPool ( $task );
  198. $this->info ['all'] ['taskNum'] ++;
  199. }
  200. }
  201. return $this;
  202. }
  203. /**
  204. * add task to taskPool
  205. *
  206. * @param unknown $task
  207. */
  208. private function addTaskPool($task) {
  209. // uniq
  210. if ($this->taskOverride) {
  211. foreach ( array (
  212. 'taskPoolAhead',
  213. 'taskPool'
  214. ) as $v ) {
  215. foreach ( $this->$v as $k1 => $v1 ) {
  216. if ($v1 [self::TASK_ITEM_URL] == $task [self::TASK_ITEM_URL]) {
  217. $t = &$this->$v;
  218. unset ( $t [$k1] );
  219. }
  220. }
  221. }
  222. }
  223. // add
  224. if (true == $task [self::TASK_ITEM_CTL] ['ahead']) {
  225. $this->taskPoolAhead [] = $task;
  226. } else {
  227. $this->taskPool [] = $task;
  228. }
  229. }
  230. /**
  231. * Perform the actual task(s).
  232. *
  233. * @param
  234. * mixed callback control the persist
  235. */
  236. function start($persist = null) {
  237. if ($this->isRunning) {
  238. throw new CurlMulti_Exception ( __CLASS__ . ' is running !' );
  239. }
  240. if (false === $this->isConstructCalled) {
  241. throw new CurlMulti_Exception ( __CLASS__ . ' __construct is not called' );
  242. }
  243. $this->mh = curl_multi_init ();
  244. $this->info ['all'] ['startTime'] = time ();
  245. $this->info ['all'] ['timeStartDownload'] = null;
  246. $this->info ['all'] ['downloadSize'] = 0;
  247. $this->info ['all'] ['finishNum'] = 0;
  248. $this->info ['all'] ['cacheNum'] = 0;
  249. $this->info ['all'] ['failNum'] = 0;
  250. $this->info ['all'] ['taskNum'] = 0;
  251. $this->info ['all'] ['taskRunningNumNoType'] = 0;
  252. $this->setThreadData ();
  253. $this->isRunning = true;
  254. $this->addTask ();
  255. do {
  256. $this->exec ();
  257. curl_multi_select ( $this->mh );
  258. $this->callCbInfo ();
  259. if (isset ( $this->cbUser )) {
  260. call_user_func ( $this->cbUser );
  261. }
  262. // useful for persist
  263. $this->addTask ();
  264. while ( false != ($curlInfo = curl_multi_info_read ( $this->mh, $this->info ['all'] ['queueNum'] )) ) {
  265. $ch = $curlInfo ['handle'];
  266. $task = $this->taskRunning [( int ) $ch];
  267. $info = curl_getinfo ( $ch );
  268. $this->info ['all'] ['downloadSize'] += $info ['size_download'];
  269. if (isset ( $task [self::TASK_FP] )) {
  270. fclose ( $task [self::TASK_FP] );
  271. }
  272. if ($curlInfo ['result'] == CURLE_OK) {
  273. $param = array ();
  274. $param ['info'] = $info;
  275. $param ['ext'] = array (
  276. 'ch' => $ch
  277. );
  278. if (! isset ( $task [self::TASK_ITEM_FILE] )) {
  279. $param ['content'] = curl_multi_getcontent ( $ch );
  280. }
  281. }
  282. curl_multi_remove_handle ( $this->mh, $ch );
  283. // must close first,other wise download may be not commpleted in process callback
  284. if (! array_key_exists ( 'close', $task [self::TASK_ITEM_CTL] ) || $task [self::TASK_ITEM_CTL] ['close'] == true) {
  285. curl_close ( $ch );
  286. }
  287. if ($curlInfo ['result'] == CURLE_OK) {
  288. $this->process ( $task, $param, false );
  289. }
  290. // error handle
  291. $callFail = false;
  292. if ($curlInfo ['result'] !== CURLE_OK || isset ( $this->userError )) {
  293. if ($task [self::TASK_TRYED] >= $this->maxTry) {
  294. // user error
  295. if (isset ( $this->userError )) {
  296. $err = array (
  297. 'error' => $this->userError
  298. );
  299. } else {
  300. $err = array (
  301. 'error' => array (
  302. $curlInfo ['result'],
  303. curl_error ( $ch )
  304. )
  305. );
  306. }
  307. $err ['info'] = $info;
  308. if (isset ( $task [self::TASK_FAIL] ) || isset ( $this->cbFail )) {
  309. array_unshift ( $task [self::TASK_ITEM_ARGS], $err );
  310. $callFail = true;
  311. } else {
  312. user_error ( "Error " . implode ( ', ', $err ['error'] ) . ", url=$info[url]", E_USER_WARNING );
  313. }
  314. $this->info ['all'] ['failNum'] ++;
  315. } else {
  316. $task [self::TASK_TRYED] ++;
  317. $task [self::TASK_ITEM_CTL] ['useCache'] = false;
  318. $this->taskFail [] = $task;
  319. $this->info ['all'] ['taskNum'] ++;
  320. }
  321. if (isset ( $this->userError )) {
  322. unset ( $this->userError );
  323. }
  324. }
  325. if ($callFail) {
  326. if (isset ( $task [self::TASK_FAIL] )) {
  327. call_user_func_array ( $task [self::TASK_FAIL], $task [self::TASK_ITEM_ARGS] );
  328. } elseif (isset ( $this->cbFail )) {
  329. call_user_func_array ( $this->cbFail, $task [self::TASK_ITEM_ARGS] );
  330. }
  331. }
  332. unset ( $this->taskRunning [( int ) $ch] );
  333. if (array_key_exists ( 'type', $task [self::TASK_ITEM_CTL] )) {
  334. $this->info ['all'] ['taskRunningNumType'] [$task [self::TASK_ITEM_CTL] ['type']] --;
  335. } else {
  336. $this->info ['all'] ['taskRunningNumNoType'] --;
  337. }
  338. $this->addTask ();
  339. $this->info ['all'] ['finishNum'] ++;
  340. // if $this->info['all']['queueNum'] grow very fast there will be no efficiency lost,because outer $this->exec() won't be executed.
  341. $this->exec ();
  342. $this->callCbInfo ();
  343. if (isset ( $this->cbUser )) {
  344. call_user_func ( $this->cbUser );
  345. }
  346. }
  347. } while ( $this->info ['all'] ['activeNum'] || $this->info ['all'] ['queueNum'] || ! empty ( $this->taskFail ) || ! empty ( $this->taskRunning ) || ! empty ( $this->taskPool ) || (isset ( $persist ) && true == call_user_func ( $persist )) );
  348. $this->callCbInfo ( true );
  349. curl_multi_close ( $this->mh );
  350. unset ( $this->mh );
  351. $this->isRunning = false;
  352. }
  353. /**
  354. * call $this->cbInfo
  355. */
  356. private function callCbInfo($force = false) {
  357. static $lastTime;
  358. if (! isset ( $lastTime )) {
  359. $lastTime = time ();
  360. }
  361. $now = time ();
  362. if (($force || $now - $lastTime > 0) && isset ( $this->cbInfo )) {
  363. $lastTime = $now;
  364. $this->info ['all'] ['taskPoolNum'] = count ( $this->taskPool );
  365. $this->info ['all'] ['taskRunningNum'] = count ( $this->taskRunning );
  366. $this->info ['all'] ['taskFailNum'] = count ( $this->taskFail );
  367. if ($this->info ['all'] ['taskNum'] > 0) {
  368. $this->info ['all'] ['finishPercent'] = round ( $this->info ['all'] ['finishNum'] / $this->info ['all'] ['taskNum'], 4 );
  369. }
  370. $this->info ['all'] ['timeSpent'] = time () - $this->info ['all'] ['startTime'];
  371. if (isset ( $this->info ['all'] ['timeStartDownload'] )) {
  372. $this->info ['all'] ['timeSpentDownload'] = time () - $this->info ['all'] ['timeStartDownload'];
  373. }
  374. if ($this->info ['all'] ['timeSpentDownload'] > 0) {
  375. $this->info ['all'] ['taskSpeedNoCache'] = round ( ($this->info ['all'] ['finishNum'] - $this->info ['all'] ['cacheNum']) / $this->info ['all'] ['timeSpentDownload'], 2 );
  376. $this->info ['all'] ['downloadSpeed'] = round ( $this->info ['all'] ['downloadSize'] / $this->info ['all'] ['timeSpentDownload'], 2 );
  377. }
  378. // running
  379. $this->info ['running'] = array ();
  380. foreach ( $this->taskRunning as $k => $v ) {
  381. $this->info ['running'] [$k] = curl_getinfo ( $v [self::TASK_CH] );
  382. }
  383. call_user_func_array ( $this->cbInfo, array (
  384. $this->info
  385. ) );
  386. }
  387. }
  388. /**
  389. * set $this->maxThreadNoType, $this->info['all']['taskRunningNumType'], $this->info['all']['taskRunningNumNoType'] etc
  390. */
  391. private function setThreadData() {
  392. $this->maxThreadNoType = $this->maxThread - array_sum ( $this->maxThreadType );
  393. if ($this->maxThreadNoType < 0) {
  394. $this->maxThreadNoType = 0;
  395. }
  396. // unset none exitst type num
  397. foreach ( $this->info ['all'] ['taskRunningNumType'] as $k => $v ) {
  398. if ($v == 0 && ! array_key_exists ( $k, $this->maxThreadType )) {
  399. unset ( $this->info ['all'] ['taskRunningNumType'] [$k] );
  400. }
  401. }
  402. // init type num
  403. foreach ( $this->maxThreadType as $k => $v ) {
  404. if ($v == 0) {
  405. user_error ( 'maxThreadType[' . $k . '] is 0, task of this type will never be added!', E_USER_WARNING );
  406. }
  407. if (! array_key_exists ( $k, $this->info ['all'] ['taskRunningNumType'] )) {
  408. $this->info ['all'] ['taskRunningNumType'] [$k] = 0;
  409. }
  410. }
  411. }
  412. /**
  413. * curl_multi_exec()
  414. */
  415. private function exec() {
  416. while ( curl_multi_exec ( $this->mh, $this->info ['all'] ['activeNum'] ) === CURLM_CALL_MULTI_PERFORM ) {
  417. }
  418. }
  419. /**
  420. * add a task to curl, keep $this->maxThread concurrent automatically
  421. */
  422. private function addTask() {
  423. $c = $this->maxThread - count ( $this->taskRunning );
  424. $isTaskPoolAdd = true;
  425. while ( $c > 0 ) {
  426. $task = array ();
  427. // search failed first
  428. if (! empty ( $this->taskFail )) {
  429. $task = array_pop ( $this->taskFail );
  430. } else {
  431. // cbTask
  432. if ($isTaskPoolAdd && ! empty ( $this->cbTask ) && empty ( $this->taskPool )) {
  433. if (! isset ( $this->cbTask [1] )) {
  434. $this->cbTask [1] = array ();
  435. }
  436. call_user_func_array ( $this->cbTask [0], array (
  437. $this->cbTask [1]
  438. ) );
  439. if (empty ( $this->taskPool )) {
  440. $isTaskPoolAdd = false;
  441. }
  442. }
  443. if (! empty ( $this->taskPoolAhead )) {
  444. $task = array_pop ( $this->taskPoolAhead );
  445. } elseif (! empty ( $this->taskPool )) {
  446. if ($this->taskPoolType == 'stack') {
  447. $task = array_pop ( $this->taskPool );
  448. } elseif ($this->taskPoolType == 'queue') {
  449. $task = array_shift ( $this->taskPool );
  450. } else {
  451. throw new CurlMulti_Exception ( 'taskPoolType not found, taskPoolType=' . $this->taskPoolType );
  452. }
  453. }
  454. }
  455. $noAdd = false;
  456. $cache = null;
  457. if (! empty ( $task )) {
  458. if (true == $task [self::TASK_ITEM_CTL] ['cache'] ['enable'] || $this->cache ['enable']) {
  459. $cache = $this->cache ( $task );
  460. if (null !== $cache) {
  461. if (isset ( $task [self::TASK_ITEM_FILE] )) {
  462. file_put_contents ( $task [self::TASK_ITEM_FILE], $cache ['content'], LOCK_EX );
  463. unset ( $cache ['content'] );
  464. }
  465. $this->process ( $task, $cache, true );
  466. $this->info ['all'] ['cacheNum'] ++;
  467. $this->info ['all'] ['finishNum'] ++;
  468. $this->callCbInfo ();
  469. }
  470. }
  471. if (! $cache) {
  472. $this->setThreadData ();
  473. if (array_key_exists ( 'type', $task [self::TASK_ITEM_CTL] ) && ! array_key_exists ( $task [self::TASK_ITEM_CTL] ['type'], $this->maxThreadType )) {
  474. user_error ( 'task was set to notype because type was not set in $this->maxThreadType, type=' . $task [self::TASK_ITEM_CTL] ['type'], E_USER_WARNING );
  475. unset ( $task [self::TASK_ITEM_CTL] ['type'] );
  476. }
  477. if (array_key_exists ( 'type', $task [self::TASK_ITEM_CTL] )) {
  478. $maxThread = $this->maxThreadType [$task [self::TASK_ITEM_CTL] ['type']];
  479. $isNoType = false;
  480. } else {
  481. $maxThread = $this->maxThreadNoType;
  482. $isNoType = true;
  483. }
  484. if ($isNoType && $maxThread == 0) {
  485. user_error ( 'task was disgarded because maxThreadNoType=0, url=' . $task [self::TASK_ITEM_URL], E_USER_WARNING );
  486. }
  487. if (($isNoType && $this->info ['all'] ['taskRunningNumNoType'] < $maxThread) || (! $isNoType && $this->info ['all'] ['taskRunningNumType'] [$task [self::TASK_ITEM_CTL] ['type']] < $maxThread)) {
  488. $task [self::TASK_CH] = $this->curlInit ( $task [self::TASK_ITEM_URL] );
  489. // is a download task?
  490. if (isset ( $task [self::TASK_ITEM_FILE] )) {
  491. // curl can create the last level directory
  492. $dir = dirname ( $task [self::TASK_ITEM_FILE] );
  493. if (! file_exists ( $dir )) {
  494. mkdir ( $dir, 0777 );
  495. }
  496. $task [self::TASK_FP] = fopen ( $task [self::TASK_ITEM_FILE], 'w' );
  497. curl_setopt ( $task [self::TASK_CH], CURLOPT_FILE, $task [self::TASK_FP] );
  498. }
  499. // single task curl option
  500. if (isset ( $task [self::TASK_ITEM_OPT] )) {
  501. foreach ( $task [self::TASK_ITEM_OPT] as $k => $v ) {
  502. curl_setopt ( $task [self::TASK_CH], $k, $v );
  503. }
  504. }
  505. $this->taskRunning [( int ) $task [self::TASK_CH]] = $task;
  506. if (! isset ( $this->info ['all'] ['timeStartDownload'] )) {
  507. $this->info ['all'] ['timeStartDownload'] = time ();
  508. }
  509. if ($isNoType) {
  510. $this->info ['all'] ['taskRunningNumNoType'] ++;
  511. } else {
  512. $this->info ['all'] ['taskRunningNumType'] [$task [self::TASK_ITEM_CTL] ['type']] ++;
  513. }
  514. curl_multi_add_handle ( $this->mh, $task [self::TASK_CH] );
  515. } else {
  516. // rotate task to pool
  517. if ($task [self::TASK_TRYED] > 0) {
  518. array_unshift ( $this->taskFail, $task );
  519. } else {
  520. array_unshift ( $this->taskPool, $task );
  521. }
  522. $noAdd = true;
  523. }
  524. }
  525. }
  526. if (! $cache || $noAdd) {
  527. $c --;
  528. }
  529. }
  530. }
  531. /**
  532. * do process
  533. *
  534. * @param unknown $task
  535. * @param unknown $r
  536. * @param unknown $isCache
  537. */
  538. private function process($task, $r, $isCache) {
  539. array_unshift ( $task [self::TASK_ITEM_ARGS], $r );
  540. if (isset ( $task [self::TASK_PROCESS] )) {
  541. $userRes = call_user_func_array ( $task [self::TASK_PROCESS], $task [self::TASK_ITEM_ARGS] );
  542. }
  543. if (! isset ( $userRes )) {
  544. $userRes = true;
  545. }
  546. array_shift ( $task [self::TASK_ITEM_ARGS] );
  547. // backoff
  548. if (false === $userRes) {
  549. if (false == $this->cache ['enable'] && false == $task [self::TASK_ITEM_CTL] ['cache'] ['enable']) {
  550. $task [self::TASK_ITEM_CTL] ['cache'] = array (
  551. 'enable' => true,
  552. 'expire' => 3600
  553. );
  554. }
  555. $this->addTaskPool ( $task );
  556. }
  557. // write cache
  558. if (false == $isCache && false == isset ( $this->userError ) && (true == $task [self::TASK_ITEM_CTL] ['cache'] ['enable']) || $this->cache ['enable']) {
  559. $this->cache ( $task, $r );
  560. }
  561. }
  562. /**
  563. * set or get file cache
  564. *
  565. * @param string $url
  566. * @param mixed $content
  567. * array('info','content')
  568. * @return return array|null|boolean
  569. */
  570. private function cache($task, $content = null) {
  571. if (! isset ( $this->cache ['dir'] ))
  572. throw new CurlMulti_Exception ( 'Cache dir is not defined' );
  573. $url = $task [self::TASK_ITEM_URL];
  574. $suffix = '';
  575. if (! empty ( $task [self::TASK_ITEM_OPT] [CURLOPT_POSTFIELDS] )) {
  576. $post = $task [self::TASK_ITEM_OPT] [CURLOPT_POSTFIELDS];
  577. if (is_array ( $post )) {
  578. $post = http_build_query ( $post );
  579. }
  580. $suffix .= $post;
  581. }
  582. $key = md5 ( $url . $suffix );
  583. $file = rtrim ( $this->cache ['dir'], '/' ) . '/';
  584. $isDownload = isset ( $task [self::TASK_ITEM_FILE] );
  585. if (isset ( $this->cache ['dirLevel'] ) && $this->cache ['dirLevel'] != 0) {
  586. if ($this->cache ['dirLevel'] == 1) {
  587. $file .= substr ( $key, 0, 3 ) . '/' . substr ( $key, 3 );
  588. } elseif ($this->cache ['dirLevel'] == 2) {
  589. $file .= substr ( $key, 0, 3 ) . '/' . substr ( $key, 3, 3 ) . '/' . substr ( $key, 6 );
  590. } else {
  591. throw new CurlMulti_Exception ( 'cache dirLevel is invalid, dirLevel=' . $this->cache ['dirLevel'] );
  592. }
  593. } else {
  594. $file .= $key;
  595. }
  596. $r = null;
  597. if (! isset ( $content )) {
  598. if (file_exists ( $file )) {
  599. if (true == $task [self::TASK_ITEM_CTL] ['cache'] ['enable']) {
  600. $expire = $task [self::TASK_ITEM_CTL] ['cache'] ['expire'];
  601. } else {
  602. $expire = $this->cache ['expire'];
  603. }
  604. if (time () - filemtime ( $file ) < $expire) {
  605. $r = file_get_contents ( $file );
  606. if ($this->cache ['compress']) {
  607. $r = gzuncompress ( $r );
  608. }
  609. $r = unserialize ( $r );
  610. if ($isDownload) {
  611. $r ['content'] = base64_decode ( $r ['content'] );
  612. }
  613. }
  614. }
  615. } else {
  616. $r = false;
  617. // check main cache directory
  618. if (! is_dir ( $this->cache ['dir'] )) {
  619. throw new CurlMulti_Exception ( "Cache dir doesn't exists" );
  620. } else {
  621. $dir = dirname ( $file );
  622. // level 1 subdir
  623. if (isset ( $this->cache ['dirLevel'] ) && $this->cache ['dirLevel'] > 1) {
  624. $dir1 = dirname ( $dir );
  625. if (! is_dir ( $dir1 ) && ! mkdir ( $dir1 )) {
  626. throw new CurlMulti_Exception ( 'Create dir failed, dir=' . $dir1 );
  627. }
  628. }
  629. if (! is_dir ( $dir ) && ! mkdir ( $dir )) {
  630. throw new CurlMulti_Exception ( 'Create dir failed, dir=' . $dir );
  631. }
  632. if ($isDownload) {
  633. $content ['content'] = base64_encode ( file_get_contents ( $task [self::TASK_ITEM_FILE] ) );
  634. }
  635. $content = serialize ( $content );
  636. if ($this->cache ['compress']) {
  637. $content = gzcompress ( $content );
  638. }
  639. if (file_put_contents ( $file, $content, LOCK_EX )) {
  640. $r = true;
  641. } else {
  642. throw new CurlMulti_Exception ( 'Write cache file failed' );
  643. }
  644. }
  645. }
  646. return $r;
  647. }
  648. /**
  649. * user error for current callback
  650. * not curl error
  651. * must be called in process callback
  652. *
  653. * @param unknown $msg
  654. */
  655. function error($msg) {
  656. $this->userError = array (
  657. CURLE_OK,
  658. $msg
  659. );
  660. }
  661. /**
  662. * return a default $ch initialized with global opt
  663. *
  664. * @param unknown $url
  665. * @return resource
  666. */
  667. function getch($url = null) {
  668. return $this->curlInit ( $url );
  669. }
  670. /**
  671. * get curl handle
  672. *
  673. * @param string $url
  674. * @return resource
  675. */
  676. private function curlInit($url = null) {
  677. $ch = curl_init ();
  678. $opt = array ();
  679. if (isset ( $url )) {
  680. $opt [CURLOPT_URL] = $url;
  681. }
  682. $opt [CURLOPT_HEADER] = false;
  683. $opt [CURLOPT_CONNECTTIMEOUT] = 10;
  684. $opt [CURLOPT_TIMEOUT] = 30;
  685. $opt [CURLOPT_AUTOREFERER] = true;
  686. $opt [CURLOPT_USERAGENT] = 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.47 Safari/536.11';
  687. $opt [CURLOPT_RETURNTRANSFER] = true;
  688. $opt [CURLOPT_FOLLOWLOCATION] = true;
  689. $opt [CURLOPT_MAXREDIRS] = 10;
  690. // user defined opt
  691. if (! empty ( $this->opt )) {
  692. foreach ( $this->opt as $k => $v ) {
  693. $opt [$k] = $v;
  694. }
  695. }
  696. curl_setopt_array ( $ch, $opt );
  697. return $ch;
  698. }
  699. }