JsonRpcPoolTransporter.php 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * This file is part of Hyperf.
  5. *
  6. * @link https://www.hyperf.io
  7. * @document https://hyperf.wiki
  8. * @contact group@hyperf.io
  9. * @license https://github.com/hyperf/hyperf/blob/master/LICENSE
  10. */
  11. namespace Hyperf\JsonRpc;
  12. use Hyperf\Context\ApplicationContext;
  13. use Hyperf\Context\Context;
  14. use Hyperf\Contract\StdoutLoggerInterface;
  15. use Hyperf\JsonRpc\Exception\ClientException;
  16. use Hyperf\JsonRpc\Pool\PoolFactory;
  17. use Hyperf\JsonRpc\Pool\RpcConnection;
  18. use Hyperf\LoadBalancer\LoadBalancerInterface;
  19. use Hyperf\LoadBalancer\Node;
  20. use Hyperf\Pool\Pool;
  21. use Hyperf\Rpc\Contract\TransporterInterface;
  22. use Hyperf\Rpc\Exception\RecvException;
  23. use Hyperf\Support\Exception\ExceptionThrower;
  24. use Throwable;
  25. use function Hyperf\Coroutine\defer;
  26. use function Hyperf\Support\retry;
  27. class JsonRpcPoolTransporter implements TransporterInterface
  28. {
  29. use RecvTrait;
  30. private ?LoadBalancerInterface $loadBalancer;
  31. /**
  32. * If $loadBalancer is null, will select a node in $nodes to request,
  33. * otherwise, use the nodes in $loadBalancer.
  34. *
  35. * @var Node[]
  36. */
  37. private array $nodes = [];
  38. private float $connectTimeout;
  39. private float $recvTimeout;
  40. private int $retryCount;
  41. /**
  42. * @var int millisecond
  43. */
  44. private int $retryInterval;
  45. private $config = [
  46. 'connect_timeout' => 5.0,
  47. 'settings' => [],
  48. 'pool' => [
  49. 'min_connections' => 1,
  50. 'max_connections' => 32,
  51. 'connect_timeout' => 10.0,
  52. 'wait_timeout' => 3.0,
  53. 'heartbeat' => -1,
  54. 'max_idle_time' => 60.0,
  55. ],
  56. 'recv_timeout' => 5.0,
  57. 'retry_count' => 2,
  58. 'retry_interval' => 100,
  59. ];
  60. public function __construct(protected PoolFactory $factory, array $config = [])
  61. {
  62. $this->config = array_replace_recursive($this->config, $config);
  63. $this->recvTimeout = $this->config['recv_timeout'] ?? 5.0;
  64. $this->connectTimeout = $this->config['connect_timeout'] ?? 5.0;
  65. $this->retryCount = $this->config['retry_count'] ?? 2;
  66. $this->retryInterval = $this->config['retry_interval'] ?? 100;
  67. }
  68. public function send(string $data)
  69. {
  70. $result = retry($this->retryCount, function () use ($data) {
  71. try {
  72. $client = $this->getConnection();
  73. if ($client->send($data) === false) {
  74. throw new ClientException('Send data failed. ' . $client->errMsg, $client->errCode);
  75. }
  76. return $this->recvAndCheck($client, $this->recvTimeout);
  77. } catch (Throwable $throwable) {
  78. if (isset($client)) {
  79. $client->close();
  80. }
  81. if ($throwable instanceof RecvException && $throwable->getCode() === SOCKET_ETIMEDOUT) {
  82. // Don't retry, when recv timeout.
  83. return new ExceptionThrower($throwable);
  84. }
  85. throw $throwable;
  86. }
  87. }, $this->retryInterval);
  88. if ($result instanceof ExceptionThrower) {
  89. throw $result->getThrowable();
  90. }
  91. return $result;
  92. }
  93. public function recv()
  94. {
  95. $client = $this->getConnection();
  96. return $this->recvAndCheck($client, $this->recvTimeout);
  97. }
  98. /**
  99. * Get RpcConnection from Context.
  100. */
  101. public function getConnection(): RpcConnection
  102. {
  103. $class = spl_object_hash($this) . '.Connection';
  104. /** @var RpcConnection $connection */
  105. $connection = Context::get($class);
  106. if (isset($connection)) {
  107. try {
  108. if (! $connection->check()) {
  109. // Try to reconnect the target server.
  110. $connection->reconnect();
  111. }
  112. return $connection;
  113. } catch (Throwable $exception) {
  114. $this->log($exception);
  115. }
  116. }
  117. $connection = $this->getPool()->get();
  118. defer(function () use ($connection) {
  119. $connection->release();
  120. });
  121. return Context::set($class, $connection->getConnection());
  122. }
  123. public function getPool(): Pool
  124. {
  125. $name = spl_object_hash($this) . '.Pool';
  126. $config = [
  127. 'connect_timeout' => $this->config['connect_timeout'],
  128. 'settings' => $this->config['settings'],
  129. 'pool' => $this->config['pool'],
  130. 'node' => function () {
  131. return $this->getNode();
  132. },
  133. ];
  134. return $this->factory->getPool($name, $config);
  135. }
  136. public function getLoadBalancer(): ?LoadBalancerInterface
  137. {
  138. return $this->loadBalancer;
  139. }
  140. public function setLoadBalancer(LoadBalancerInterface $loadBalancer): TransporterInterface
  141. {
  142. $this->loadBalancer = $loadBalancer;
  143. return $this;
  144. }
  145. /**
  146. * @param \Hyperf\LoadBalancer\Node[] $nodes
  147. */
  148. public function setNodes(array $nodes): self
  149. {
  150. $this->nodes = $nodes;
  151. return $this;
  152. }
  153. /**
  154. * @return \Hyperf\LoadBalancer\Node[]
  155. */
  156. public function getNodes(): array
  157. {
  158. return $this->nodes;
  159. }
  160. public function getConfig(): array
  161. {
  162. return $this->config;
  163. }
  164. /**
  165. * If the load balancer is exists, then the node will select by the load balancer,
  166. * otherwise will get a random node.
  167. */
  168. private function getNode(): Node
  169. {
  170. if ($this->loadBalancer instanceof LoadBalancerInterface) {
  171. return $this->loadBalancer->select();
  172. }
  173. return $this->nodes[array_rand($this->nodes)];
  174. }
  175. private function log($message)
  176. {
  177. $container = ApplicationContext::getContainer();
  178. if ($container->has(StdoutLoggerInterface::class) && $logger = $container->get(StdoutLoggerInterface::class)) {
  179. $logger->error((string) $message);
  180. }
  181. }
  182. }