params = $params; } public function handle() { // var_dump("消费进程"); // $parallel = new Parallel(15); $collector = new CollectorService(); // $parallel->add(function () use ($collector){ // sleep(1); // $collector->goCrawler($this->params); //// var_dump("消费消息队列:",$this->params,$result); // $id = Coroutine::id(); // var_dump("开启携程:",$id); // return $id; // }); // try { // $results = $parallel->wait(); // var_dump("协程返回数据:",$results); // }catch (ParallelExecutionException $e){ // var_dump($e->getResults());// 获取协程中的返回值。 // var_dump($e->getThrowables());// 获取协程中出现的异常。 // } } public function BeforeHandle() { var_dump("========BeforeHandle========="); } public function AfterHandle() { var_dump("========AfterHandle=========="); } public function FailedHandle() { var_dump("=======FailedHandle======="); } public function RetryHandle() { var_dump("=========RetryHandle======="); } public function QueueLength() { var_dump("=====QueueLength=========="); } }