Parallel.php 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * This file is part of Hyperf.
  5. *
  6. * @link https://www.hyperf.io
  7. * @document https://hyperf.wiki
  8. * @contact group@hyperf.io
  9. * @license https://github.com/hyperf/hyperf/blob/master/LICENSE
  10. */
  11. namespace Hyperf\Coroutine;
  12. use Hyperf\Coroutine\Exception\ParallelExecutionException;
  13. use Hyperf\Engine\Channel;
  14. use Throwable;
  15. use function sprintf;
  16. class Parallel
  17. {
  18. /**
  19. * @var callable[]
  20. */
  21. protected array $callbacks = [];
  22. protected ?Channel $concurrentChannel = null;
  23. protected array $results = [];
  24. /**
  25. * @var Throwable[]
  26. */
  27. protected array $throwables = [];
  28. /**
  29. * @param int $concurrent if $concurrent is equal to 0, that means unlimit
  30. */
  31. public function __construct(int $concurrent = 0)
  32. {
  33. if ($concurrent > 0) {
  34. $this->concurrentChannel = new Channel($concurrent);
  35. }
  36. }
  37. public function add(callable $callable, $key = null)
  38. {
  39. if (is_null($key)) {
  40. $this->callbacks[] = $callable;
  41. } else {
  42. $this->callbacks[$key] = $callable;
  43. }
  44. }
  45. public function wait(bool $throw = true): array
  46. {
  47. $wg = new WaitGroup();
  48. $wg->add(count($this->callbacks));
  49. foreach ($this->callbacks as $key => $callback) {
  50. $this->concurrentChannel && $this->concurrentChannel->push(true);
  51. $this->results[$key] = null;
  52. Coroutine::create(function () use ($callback, $key, $wg) {
  53. try {
  54. $this->results[$key] = $callback();
  55. } catch (Throwable $throwable) {
  56. $this->throwables[$key] = $throwable;
  57. unset($this->results[$key]);
  58. } finally {
  59. $this->concurrentChannel && $this->concurrentChannel->pop();
  60. $wg->done();
  61. }
  62. });
  63. }
  64. $wg->wait();
  65. if ($throw && ($throwableCount = count($this->throwables)) > 0) {
  66. $message = 'Detecting ' . $throwableCount . ' throwable occurred during parallel execution:' . PHP_EOL . $this->formatThrowables($this->throwables);
  67. $executionException = new ParallelExecutionException($message);
  68. $executionException->setResults($this->results);
  69. $executionException->setThrowables($this->throwables);
  70. unset($this->results, $this->throwables);
  71. throw $executionException;
  72. }
  73. return $this->results;
  74. }
  75. public function count(): int
  76. {
  77. return count($this->callbacks);
  78. }
  79. public function clear(): void
  80. {
  81. $this->callbacks = [];
  82. $this->results = [];
  83. $this->throwables = [];
  84. }
  85. /**
  86. * Format throwables into a nice list.
  87. *
  88. * @param Throwable[] $throwables
  89. */
  90. private function formatThrowables(array $throwables): string
  91. {
  92. $output = '';
  93. foreach ($throwables as $key => $value) {
  94. $output .= sprintf('(%s) %s: %s' . PHP_EOL . '%s' . PHP_EOL, $key, get_class($value), $value->getMessage(), $value->getTraceAsString());
  95. }
  96. return $output;
  97. }
  98. }