12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- <?php
- declare(strict_types=1);
- namespace App\Job;
- use Hyperf\AsyncQueue\Job;
- use App\JsonRpc\CollectorService;
- use Hyperf\Coroutine\Exception\ParallelExecutionException;
- use Hyperf\Coroutine\Coroutine;
- use Hyperf\Coroutine\Parallel;
- /**
- * @Job(name="import")
- */
- class GatherExampleJob extends Job
- {
- public $params;
- /**
- * 任务执行失败后的重试次数,即最大执行次数为 $maxAttempts+1 次
- */
- protected int $maxAttempts = 2;
- public function __construct($params)
- {
- // 这里最好是普通数据,不要使用携带 IO 的对象,比如 PDO 对象
- $this->params = $params;
- }
- public function handle()
- {
- // var_dump("消费进程");
- // $parallel = new Parallel(15);
- $collector = new CollectorService();
- // $parallel->add(function () use ($collector){
- // sleep(1);
- //
- $collector->goCrawler($this->params);
- //// var_dump("消费消息队列:",$this->params,$result);
- // $id = Coroutine::id();
- // var_dump("开启携程:",$id);
- // return $id;
- // });
- // try {
- // $results = $parallel->wait();
- // var_dump("协程返回数据:",$results);
- // }catch (ParallelExecutionException $e){
- // var_dump($e->getResults());// 获取协程中的返回值。
- // var_dump($e->getThrowables());// 获取协程中出现的异常。
- // }
- }
- public function BeforeHandle()
- {
- var_dump("========BeforeHandle=========");
- }
- public function AfterHandle()
- {
- var_dump("========AfterHandle==========");
- }
- public function FailedHandle()
- {
- var_dump("=======FailedHandle=======");
- }
- public function RetryHandle()
- {
- var_dump("=========RetryHandle=======");
- }
- public function QueueLength()
- {
- var_dump("=====QueueLength==========");
- }
- }
|