Pool.php 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * This file is part of Hyperf.
  5. *
  6. * @link https://www.hyperf.io
  7. * @document https://hyperf.wiki
  8. * @contact group@hyperf.io
  9. * @license https://github.com/hyperf/hyperf/blob/master/LICENSE
  10. */
  11. namespace Hyperf\Pool;
  12. use Hyperf\Contract\ConnectionInterface;
  13. use Hyperf\Contract\FrequencyInterface;
  14. use Hyperf\Contract\PoolInterface;
  15. use Hyperf\Contract\PoolOptionInterface;
  16. use Hyperf\Contract\StdoutLoggerInterface;
  17. use Psr\Container\ContainerInterface;
  18. use RuntimeException;
  19. use Throwable;
  20. use function Hyperf\Support\make;
  21. abstract class Pool implements PoolInterface
  22. {
  23. protected Channel $channel;
  24. protected PoolOptionInterface $option;
  25. protected int $currentConnections = 0;
  26. protected null|FrequencyInterface|LowFrequencyInterface $frequency = null;
  27. public function __construct(protected ContainerInterface $container, array $config = [])
  28. {
  29. $this->initOption($config);
  30. $this->channel = make(Channel::class, ['size' => $this->option->getMaxConnections()]);
  31. }
  32. public function get(): ConnectionInterface
  33. {
  34. $connection = $this->getConnection();
  35. try {
  36. if ($this->frequency instanceof FrequencyInterface) {
  37. $this->frequency->hit();
  38. }
  39. if ($this->frequency instanceof LowFrequencyInterface) {
  40. if ($this->frequency->isLowFrequency()) {
  41. $this->flush();
  42. }
  43. }
  44. } catch (Throwable $exception) {
  45. if ($this->container->has(StdoutLoggerInterface::class) && $logger = $this->container->get(StdoutLoggerInterface::class)) {
  46. $logger->error((string) $exception);
  47. }
  48. }
  49. return $connection;
  50. }
  51. public function release(ConnectionInterface $connection): void
  52. {
  53. $this->channel->push($connection);
  54. }
  55. public function flush(): void
  56. {
  57. $num = $this->getConnectionsInChannel();
  58. if ($num > 0) {
  59. while ($this->currentConnections > $this->option->getMinConnections() && $conn = $this->channel->pop(0.001)) {
  60. try {
  61. $conn->close();
  62. } catch (Throwable $exception) {
  63. if ($this->container->has(StdoutLoggerInterface::class) && $logger = $this->container->get(StdoutLoggerInterface::class)) {
  64. $logger->error((string) $exception);
  65. }
  66. } finally {
  67. --$this->currentConnections;
  68. --$num;
  69. }
  70. if ($num <= 0) {
  71. // Ignore connections queued during flushing.
  72. break;
  73. }
  74. }
  75. }
  76. }
  77. public function flushOne(bool $must = false): void
  78. {
  79. $num = $this->getConnectionsInChannel();
  80. if ($num > 0 && $conn = $this->channel->pop(0.001)) {
  81. if ($must || ! $conn->check()) {
  82. try {
  83. $conn->close();
  84. } catch (Throwable $exception) {
  85. if ($this->container->has(StdoutLoggerInterface::class) && $logger = $this->container->get(StdoutLoggerInterface::class)) {
  86. $logger->error((string) $exception);
  87. }
  88. } finally {
  89. --$this->currentConnections;
  90. }
  91. } else {
  92. $this->release($conn);
  93. }
  94. }
  95. }
  96. public function getCurrentConnections(): int
  97. {
  98. return $this->currentConnections;
  99. }
  100. public function getOption(): PoolOptionInterface
  101. {
  102. return $this->option;
  103. }
  104. public function getConnectionsInChannel(): int
  105. {
  106. return $this->channel->length();
  107. }
  108. protected function initOption(array $options = []): void
  109. {
  110. $this->option = make(PoolOption::class, [
  111. 'minConnections' => $options['min_connections'] ?? 1,
  112. 'maxConnections' => $options['max_connections'] ?? 10,
  113. 'connectTimeout' => $options['connect_timeout'] ?? 10.0,
  114. 'waitTimeout' => $options['wait_timeout'] ?? 3.0,
  115. 'heartbeat' => $options['heartbeat'] ?? -1,
  116. 'maxIdleTime' => $options['max_idle_time'] ?? 60.0,
  117. 'events' => $options['events'] ?? [],
  118. ]);
  119. }
  120. abstract protected function createConnection(): ConnectionInterface;
  121. private function getConnection(): ConnectionInterface
  122. {
  123. $num = $this->getConnectionsInChannel();
  124. try {
  125. if ($num === 0 && $this->currentConnections < $this->option->getMaxConnections()) {
  126. ++$this->currentConnections;
  127. return $this->createConnection();
  128. }
  129. } catch (Throwable $throwable) {
  130. --$this->currentConnections;
  131. throw $throwable;
  132. }
  133. $connection = $this->channel->pop($this->option->getWaitTimeout());
  134. if (! $connection instanceof ConnectionInterface) {
  135. throw new RuntimeException('Connection pool exhausted. Cannot establish new connection before wait_timeout.');
  136. }
  137. return $connection;
  138. }
  139. }