Channel.php 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * This file is part of Hyperf.
  5. *
  6. * @link https://www.hyperf.io
  7. * @document https://hyperf.wiki
  8. * @contact group@hyperf.io
  9. * @license https://github.com/hyperf/hyperf/blob/master/LICENSE
  10. */
  11. namespace Hyperf\Pool;
  12. use Hyperf\Contract\ConnectionInterface;
  13. use Hyperf\Coroutine\Coroutine;
  14. use Hyperf\Engine\Channel as CoChannel;
  15. use SplQueue;
  16. class Channel
  17. {
  18. protected CoChannel $channel;
  19. protected SplQueue $queue;
  20. public function __construct(protected int $size)
  21. {
  22. $this->channel = new CoChannel($size);
  23. $this->queue = new SplQueue();
  24. }
  25. public function pop(float $timeout): ConnectionInterface|false
  26. {
  27. if ($this->isCoroutine()) {
  28. return $this->channel->pop($timeout);
  29. }
  30. return $this->queue->shift();
  31. }
  32. public function push(ConnectionInterface $data): bool
  33. {
  34. if ($this->isCoroutine()) {
  35. return $this->channel->push($data);
  36. }
  37. $this->queue->push($data);
  38. return true;
  39. }
  40. public function length(): int
  41. {
  42. if ($this->isCoroutine()) {
  43. return $this->channel->getLength();
  44. }
  45. return $this->queue->count();
  46. }
  47. protected function isCoroutine(): bool
  48. {
  49. return Coroutine::id() > 0;
  50. }
  51. }