GatherExampleJob.php 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Job;
  4. use Hyperf\AsyncQueue\Job;
  5. use App\JsonRpc\CollectorService;
  6. use Hyperf\Coroutine\Exception\ParallelExecutionException;
  7. use Hyperf\Coroutine\Coroutine;
  8. use Hyperf\Coroutine\Parallel;
  9. /**
  10. * @Job(name="import")
  11. */
  12. class GatherExampleJob extends Job
  13. {
  14. public $params;
  15. /**
  16. * 任务执行失败后的重试次数,即最大执行次数为 $maxAttempts+1 次
  17. */
  18. protected int $maxAttempts = 2;
  19. public function __construct($params)
  20. {
  21. // 这里最好是普通数据,不要使用携带 IO 的对象,比如 PDO 对象
  22. $this->params = $params;
  23. }
  24. public function handle()
  25. {
  26. // var_dump("消费进程");
  27. // $parallel = new Parallel(15);
  28. $collector = new CollectorService();
  29. // $parallel->add(function () use ($collector){
  30. // sleep(1);
  31. //
  32. $collector->goCrawler($this->params);
  33. //// var_dump("消费消息队列:",$this->params,$result);
  34. // $id = Coroutine::id();
  35. // var_dump("开启携程:",$id);
  36. // return $id;
  37. // });
  38. // try {
  39. // $results = $parallel->wait();
  40. // var_dump("协程返回数据:",$results);
  41. // }catch (ParallelExecutionException $e){
  42. // var_dump($e->getResults());// 获取协程中的返回值。
  43. // var_dump($e->getThrowables());// 获取协程中出现的异常。
  44. // }
  45. }
  46. public function BeforeHandle()
  47. {
  48. var_dump("========BeforeHandle=========");
  49. }
  50. public function AfterHandle()
  51. {
  52. var_dump("========AfterHandle==========");
  53. }
  54. public function FailedHandle()
  55. {
  56. var_dump("=======FailedHandle=======");
  57. }
  58. public function RetryHandle()
  59. {
  60. var_dump("=========RetryHandle=======");
  61. }
  62. public function QueueLength()
  63. {
  64. var_dump("=====QueueLength==========");
  65. }
  66. }