123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- <?php
- declare(strict_types=1);
- /**
- * This file is part of Hyperf.
- *
- * @link https://www.hyperf.io
- * @document https://hyperf.wiki
- * @contact group@hyperf.io
- * @license https://github.com/hyperf/hyperf/blob/master/LICENSE
- */
- namespace Hyperf\Pool;
- use Hyperf\Contract\ConnectionInterface;
- use Hyperf\Contract\FrequencyInterface;
- use Hyperf\Contract\PoolInterface;
- use Hyperf\Contract\PoolOptionInterface;
- use Hyperf\Contract\StdoutLoggerInterface;
- use Psr\Container\ContainerInterface;
- use RuntimeException;
- use Throwable;
- use function Hyperf\Support\make;
- abstract class Pool implements PoolInterface
- {
- protected Channel $channel;
- protected PoolOptionInterface $option;
- protected int $currentConnections = 0;
- protected null|FrequencyInterface|LowFrequencyInterface $frequency = null;
- public function __construct(protected ContainerInterface $container, array $config = [])
- {
- $this->initOption($config);
- $this->channel = make(Channel::class, ['size' => $this->option->getMaxConnections()]);
- }
- public function get(): ConnectionInterface
- {
- $connection = $this->getConnection();
- try {
- if ($this->frequency instanceof FrequencyInterface) {
- $this->frequency->hit();
- }
- if ($this->frequency instanceof LowFrequencyInterface) {
- if ($this->frequency->isLowFrequency()) {
- $this->flush();
- }
- }
- } catch (Throwable $exception) {
- if ($this->container->has(StdoutLoggerInterface::class) && $logger = $this->container->get(StdoutLoggerInterface::class)) {
- $logger->error((string) $exception);
- }
- }
- return $connection;
- }
- public function release(ConnectionInterface $connection): void
- {
- $this->channel->push($connection);
- }
- public function flush(): void
- {
- $num = $this->getConnectionsInChannel();
- if ($num > 0) {
- while ($this->currentConnections > $this->option->getMinConnections() && $conn = $this->channel->pop(0.001)) {
- try {
- $conn->close();
- } catch (Throwable $exception) {
- if ($this->container->has(StdoutLoggerInterface::class) && $logger = $this->container->get(StdoutLoggerInterface::class)) {
- $logger->error((string) $exception);
- }
- } finally {
- --$this->currentConnections;
- --$num;
- }
- if ($num <= 0) {
- // Ignore connections queued during flushing.
- break;
- }
- }
- }
- }
- public function flushOne(bool $must = false): void
- {
- $num = $this->getConnectionsInChannel();
- if ($num > 0 && $conn = $this->channel->pop(0.001)) {
- if ($must || ! $conn->check()) {
- try {
- $conn->close();
- } catch (Throwable $exception) {
- if ($this->container->has(StdoutLoggerInterface::class) && $logger = $this->container->get(StdoutLoggerInterface::class)) {
- $logger->error((string) $exception);
- }
- } finally {
- --$this->currentConnections;
- }
- } else {
- $this->release($conn);
- }
- }
- }
- public function getCurrentConnections(): int
- {
- return $this->currentConnections;
- }
- public function getOption(): PoolOptionInterface
- {
- return $this->option;
- }
- public function getConnectionsInChannel(): int
- {
- return $this->channel->length();
- }
- protected function initOption(array $options = []): void
- {
- $this->option = make(PoolOption::class, [
- 'minConnections' => $options['min_connections'] ?? 1,
- 'maxConnections' => $options['max_connections'] ?? 10,
- 'connectTimeout' => $options['connect_timeout'] ?? 10.0,
- 'waitTimeout' => $options['wait_timeout'] ?? 3.0,
- 'heartbeat' => $options['heartbeat'] ?? -1,
- 'maxIdleTime' => $options['max_idle_time'] ?? 60.0,
- 'events' => $options['events'] ?? [],
- ]);
- }
- abstract protected function createConnection(): ConnectionInterface;
- private function getConnection(): ConnectionInterface
- {
- $num = $this->getConnectionsInChannel();
- try {
- if ($num === 0 && $this->currentConnections < $this->option->getMaxConnections()) {
- ++$this->currentConnections;
- return $this->createConnection();
- }
- } catch (Throwable $throwable) {
- --$this->currentConnections;
- throw $throwable;
- }
- $connection = $this->channel->pop($this->option->getWaitTimeout());
- if (! $connection instanceof ConnectionInterface) {
- throw new RuntimeException('Connection pool exhausted. Cannot establish new connection before wait_timeout.');
- }
- return $connection;
- }
- }
|