KeepaliveConnection.php 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * This file is part of Hyperf.
  5. *
  6. * @link https://www.hyperf.io
  7. * @document https://hyperf.wiki
  8. * @contact group@hyperf.io
  9. * @license https://github.com/hyperf/hyperf/blob/master/LICENSE
  10. */
  11. namespace Hyperf\Pool;
  12. use Closure;
  13. use Hyperf\Contract\ConnectionInterface;
  14. use Hyperf\Contract\StdoutLoggerInterface;
  15. use Hyperf\Coordinator\Timer;
  16. use Hyperf\Engine\Channel;
  17. use Hyperf\Pool\Exception\InvalidArgumentException;
  18. use Hyperf\Pool\Exception\SocketPopException;
  19. use Psr\Container\ContainerInterface;
  20. use Psr\Log\LoggerInterface;
  21. use Throwable;
  22. abstract class KeepaliveConnection implements ConnectionInterface
  23. {
  24. protected Timer $timer;
  25. protected Channel $channel;
  26. protected float $lastUseTime = 0.0;
  27. protected ?int $timerId = null;
  28. protected bool $connected = false;
  29. protected string $name = 'keepalive.connection';
  30. public function __construct(protected ContainerInterface $container, protected Pool $pool)
  31. {
  32. $this->timer = new Timer();
  33. }
  34. public function __destruct()
  35. {
  36. $this->clear();
  37. }
  38. public function release(): void
  39. {
  40. $this->pool->release($this);
  41. }
  42. public function getConnection()
  43. {
  44. throw new InvalidArgumentException('Please use call instead of getConnection.');
  45. }
  46. public function check(): bool
  47. {
  48. return $this->isConnected();
  49. }
  50. public function reconnect(): bool
  51. {
  52. $this->close();
  53. $connection = $this->getActiveConnection();
  54. $channel = new Channel(1);
  55. $channel->push($connection);
  56. $this->channel = $channel;
  57. $this->lastUseTime = microtime(true);
  58. $this->addHeartbeat();
  59. return true;
  60. }
  61. /**
  62. * @param bool $refresh refresh last use time or not
  63. * @return mixed
  64. */
  65. public function call(Closure $closure, bool $refresh = true)
  66. {
  67. if (! $this->isConnected()) {
  68. $this->reconnect();
  69. }
  70. $connection = $this->channel->pop($this->pool->getOption()->getWaitTimeout());
  71. if ($connection === false) {
  72. throw new SocketPopException(sprintf('Socket of %s is exhausted. Cannot establish socket before timeout.', $this->name));
  73. }
  74. try {
  75. $result = $closure($connection);
  76. if ($refresh) {
  77. $this->lastUseTime = microtime(true);
  78. }
  79. } finally {
  80. if ($this->isConnected()) {
  81. $this->channel->push($connection, 0.001);
  82. } else {
  83. // Unset and drop the connection.
  84. unset($connection);
  85. }
  86. }
  87. return $result;
  88. }
  89. public function isConnected(): bool
  90. {
  91. return $this->connected;
  92. }
  93. public function close(): bool
  94. {
  95. if ($this->isConnected()) {
  96. $this->call(function ($connection) {
  97. try {
  98. if ($this->isConnected()) {
  99. $this->sendClose($connection);
  100. }
  101. } finally {
  102. $this->clear();
  103. }
  104. }, false);
  105. }
  106. return true;
  107. }
  108. public function isTimeout(): bool
  109. {
  110. return $this->lastUseTime < microtime(true) - $this->pool->getOption()->getMaxIdleTime()
  111. && $this->channel->getLength() > 0;
  112. }
  113. protected function addHeartbeat()
  114. {
  115. $this->connected = true;
  116. $this->timerId = $this->timer->tick($this->getHeartbeatSeconds(), function () {
  117. try {
  118. if (! $this->isConnected()) {
  119. return;
  120. }
  121. if ($this->isTimeout()) {
  122. // The socket does not use in double of heartbeat.
  123. $this->close();
  124. return;
  125. }
  126. $this->heartbeat();
  127. } catch (Throwable $throwable) {
  128. $this->clear();
  129. if ($logger = $this->getLogger()) {
  130. $message = sprintf('Socket of %s heartbeat failed, %s', $this->name, $throwable);
  131. $logger->error($message);
  132. }
  133. }
  134. });
  135. }
  136. /**
  137. * @return int seconds
  138. */
  139. protected function getHeartbeatSeconds(): int
  140. {
  141. $heartbeat = $this->pool->getOption()->getHeartbeat();
  142. if ($heartbeat > 0) {
  143. return intval($heartbeat);
  144. }
  145. return 10;
  146. }
  147. protected function clear()
  148. {
  149. $this->connected = false;
  150. if ($this->timerId) {
  151. $this->timer->clear($this->timerId);
  152. $this->timerId = null;
  153. }
  154. }
  155. protected function getLogger(): ?LoggerInterface
  156. {
  157. if ($this->container->has(StdoutLoggerInterface::class)) {
  158. return $this->container->get(StdoutLoggerInterface::class);
  159. }
  160. return null;
  161. }
  162. protected function heartbeat(): void
  163. {
  164. }
  165. /**
  166. * Send close protocol.
  167. * @param mixed $connection
  168. */
  169. protected function sendClose($connection): void
  170. {
  171. }
  172. /**
  173. * Connect and return the active connection.
  174. * @return mixed
  175. */
  176. abstract protected function getActiveConnection();
  177. }