Concurrent.php 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * This file is part of Hyperf.
  5. *
  6. * @link https://www.hyperf.io
  7. * @document https://hyperf.wiki
  8. * @contact group@hyperf.io
  9. * @license https://github.com/hyperf/hyperf/blob/master/LICENSE
  10. */
  11. namespace Hyperf\Coroutine;
  12. use Hyperf\Context\ApplicationContext;
  13. use Hyperf\Contract\StdoutLoggerInterface;
  14. use Hyperf\Coroutine\Exception\InvalidArgumentException;
  15. use Hyperf\Engine\Channel;
  16. use Hyperf\ExceptionHandler\Formatter\FormatterInterface;
  17. use Throwable;
  18. /**
  19. * @method bool isFull()
  20. * @method bool isEmpty()
  21. */
  22. class Concurrent
  23. {
  24. protected Channel $channel;
  25. public function __construct(protected int $limit)
  26. {
  27. $this->channel = new Channel($limit);
  28. }
  29. public function __call($name, $arguments)
  30. {
  31. if (in_array($name, ['isFull', 'isEmpty'])) {
  32. return $this->channel->{$name}(...$arguments);
  33. }
  34. throw new InvalidArgumentException(sprintf('The method %s is not supported.', $name));
  35. }
  36. public function getLimit(): int
  37. {
  38. return $this->limit;
  39. }
  40. public function length(): int
  41. {
  42. return $this->channel->getLength();
  43. }
  44. public function getLength(): int
  45. {
  46. return $this->channel->getLength();
  47. }
  48. public function getRunningCoroutineCount(): int
  49. {
  50. return $this->getLength();
  51. }
  52. public function getChannel(): Channel
  53. {
  54. return $this->channel;
  55. }
  56. public function create(callable $callable): void
  57. {
  58. $this->channel->push(true);
  59. Coroutine::create(function () use ($callable) {
  60. try {
  61. $callable();
  62. } catch (Throwable $exception) {
  63. if (ApplicationContext::hasContainer()) {
  64. $container = ApplicationContext::getContainer();
  65. if ($container->has(StdoutLoggerInterface::class) && $container->has(FormatterInterface::class)) {
  66. $logger = $container->get(StdoutLoggerInterface::class);
  67. $formatter = $container->get(FormatterInterface::class);
  68. $logger->error($formatter->format($exception));
  69. }
  70. }
  71. } finally {
  72. $this->channel->pop();
  73. }
  74. });
  75. }
  76. }