AbstractServiceClient.php 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * This file is part of Hyperf.
  5. *
  6. * @link https://www.hyperf.io
  7. * @document https://hyperf.wiki
  8. * @contact group@hyperf.io
  9. * @license https://github.com/hyperf/hyperf/blob/master/LICENSE
  10. */
  11. namespace Hyperf\RpcClient;
  12. use Hyperf\Contract\ConfigInterface;
  13. use Hyperf\Contract\IdGeneratorInterface;
  14. use Hyperf\LoadBalancer\LoadBalancerInterface;
  15. use Hyperf\LoadBalancer\LoadBalancerManager;
  16. use Hyperf\LoadBalancer\Node;
  17. use Hyperf\Rpc\Contract\DataFormatterInterface;
  18. use Hyperf\Rpc\Contract\PathGeneratorInterface;
  19. use Hyperf\Rpc\IdGenerator;
  20. use Hyperf\Rpc\Protocol;
  21. use Hyperf\Rpc\ProtocolManager;
  22. use Hyperf\Rpc\Request;
  23. use Hyperf\RpcClient\Exception\RequestException;
  24. use Hyperf\ServiceGovernance\DriverInterface;
  25. use Hyperf\ServiceGovernance\DriverManager;
  26. use InvalidArgumentException;
  27. use Psr\Container\ContainerInterface;
  28. use function Hyperf\Support\make;
  29. abstract class AbstractServiceClient
  30. {
  31. /**
  32. * The service name of the target service.
  33. */
  34. protected string $serviceName = '';
  35. /**
  36. * The protocol of the target service, this protocol name
  37. * needs to register into \Hyperf\Rpc\ProtocolManager.
  38. */
  39. protected string $protocol = 'jsonrpc-http';
  40. /**
  41. * The load balancer of the client, this name of the load balancer
  42. * needs to register into \Hyperf\LoadBalancer\LoadBalancerManager.
  43. */
  44. protected string $loadBalancer = 'random';
  45. protected Client $client;
  46. protected LoadBalancerManager $loadBalancerManager;
  47. protected ?IdGeneratorInterface $idGenerator;
  48. protected PathGeneratorInterface $pathGenerator;
  49. protected DataFormatterInterface $dataFormatter;
  50. protected ConfigInterface $config;
  51. public function __construct(protected ContainerInterface $container)
  52. {
  53. $this->config = $this->container->get(ConfigInterface::class);
  54. $this->loadBalancerManager = $container->get(LoadBalancerManager::class);
  55. $protocol = new Protocol($container, $container->get(ProtocolManager::class), $this->protocol, $this->getOptions());
  56. $loadBalancer = $this->createLoadBalancer(...$this->createNodes());
  57. $transporter = $protocol->getTransporter()->setLoadBalancer($loadBalancer);
  58. $this->client = make(Client::class)
  59. ->setPacker($protocol->getPacker())
  60. ->setTransporter($transporter)
  61. ->setNormalizer($protocol->getNormalizer());
  62. $this->idGenerator = $this->getIdGenerator();
  63. $this->pathGenerator = $protocol->getPathGenerator();
  64. $this->dataFormatter = $protocol->getDataFormatter();
  65. }
  66. protected function __request(string $method, array $params, ?string $id = null)
  67. {
  68. if (! $id && $this->idGenerator instanceof IdGeneratorInterface) {
  69. $id = $this->idGenerator->generate();
  70. }
  71. $response = $this->client->send($this->__generateData($method, $params, $id));
  72. if (is_array($response)) {
  73. $response = $this->checkRequestIdAndTryAgain($response, $id);
  74. if (array_key_exists('result', $response)) {
  75. return $response['result'];
  76. }
  77. if (array_key_exists('error', $response)) {
  78. return $response['error'];
  79. }
  80. }
  81. throw new RequestException('Invalid response.');
  82. }
  83. protected function __generateRpcPath(string $methodName): string
  84. {
  85. if (! $this->serviceName) {
  86. throw new InvalidArgumentException('Parameter $serviceName missing.');
  87. }
  88. return $this->pathGenerator->generate($this->serviceName, $methodName);
  89. }
  90. protected function __generateData(string $methodName, array $params, null|int|string $id)
  91. {
  92. return $this->dataFormatter->formatRequest(new Request(
  93. $this->__generateRpcPath($methodName),
  94. $params,
  95. $id,
  96. [
  97. 'from' => $this->config->get('app_name'),
  98. ]
  99. ));
  100. }
  101. public function getServiceName(): string
  102. {
  103. return $this->serviceName;
  104. }
  105. protected function getIdGenerator(): IdGeneratorInterface
  106. {
  107. if ($this->container->has(IdGenerator\IdGeneratorInterface::class)) {
  108. return $this->container->get(IdGenerator\IdGeneratorInterface::class);
  109. }
  110. if ($this->container->has(IdGeneratorInterface::class)) {
  111. return $this->container->get(IdGeneratorInterface::class);
  112. }
  113. return $this->container->get(IdGenerator\UniqidIdGenerator::class);
  114. }
  115. protected function createLoadBalancer(array $nodes, ?callable $refresh = null, bool $isLongPolling = false): LoadBalancerInterface
  116. {
  117. $loadBalancer = $this->loadBalancerManager->getInstance($this->serviceName, $this->loadBalancer)->setNodes($nodes);
  118. $refresh && $loadBalancer->refresh($refresh, $isLongPolling ? 1 : 5000);
  119. return $loadBalancer;
  120. }
  121. protected function getOptions(): array
  122. {
  123. $consumer = $this->getConsumerConfig();
  124. return $consumer['options'] ?? [];
  125. }
  126. protected function getConsumerConfig(): array
  127. {
  128. // According to the registry config of the consumer, retrieve the nodes.
  129. $consumers = $this->config->get('services.consumers', []);
  130. $config = [];
  131. foreach ($consumers as $consumer) {
  132. if (isset($consumer['name']) && $consumer['name'] === $this->serviceName) {
  133. $config = $consumer;
  134. break;
  135. }
  136. }
  137. return $config;
  138. }
  139. /**
  140. * Create nodes the first time.
  141. *
  142. * @return array [array, callable]
  143. */
  144. protected function createNodes(): array
  145. {
  146. $consumer = $this->getConsumerConfig();
  147. $registryProtocol = $consumer['registry']['protocol'] ?? null;
  148. $registryAddress = $consumer['registry']['address'] ?? null;
  149. // Current $consumer is the config of the specified consumer.
  150. if (! empty($registryProtocol) && $this->container->has(DriverManager::class)) {
  151. $governance = $this->container->get(DriverManager::class)->get($registryProtocol);
  152. if (! $governance) {
  153. throw new InvalidArgumentException(sprintf('Invalid protocol of registry %s', $registryProtocol));
  154. }
  155. $nodes = $this->getNodes($governance, $registryAddress, []);
  156. $refreshCallback = function (array $beforeNodes = []) use ($governance, $registryAddress) {
  157. return $this->getNodes($governance, $registryAddress, $beforeNodes);
  158. };
  159. return [$nodes, $refreshCallback, $governance->isLongPolling()];
  160. }
  161. // Not exists the registry config, then looking for the 'nodes' property.
  162. if (isset($consumer['nodes'])) {
  163. $nodes = [];
  164. foreach ($consumer['nodes'] as $item) {
  165. if (isset($item['host'], $item['port'])) {
  166. if (! is_int($item['port'])) {
  167. throw new InvalidArgumentException(sprintf('Invalid node config [%s], the port option has to a integer.', implode(':', $item)));
  168. }
  169. $nodes[] = new Node($item['host'], $item['port'], $item['weight'] ?? 0, $item['path_prefix'] ?? '');
  170. }
  171. }
  172. return [$nodes, null, false];
  173. }
  174. throw new InvalidArgumentException('Config of registry or nodes missing.');
  175. }
  176. protected function getNodes(DriverInterface $governance, string $address, array $nodes = []): array
  177. {
  178. $nodeArray = $governance->getNodes($address, $this->serviceName, [
  179. 'protocol' => $this->protocol,
  180. 'nodes' => $nodes,
  181. ]);
  182. $nodes = [];
  183. foreach ($nodeArray as $node) {
  184. $nodes[] = new Node($node['host'], $node['port'], $node['weight'] ?? 0, $node['path_prefix'] ?? '');
  185. }
  186. return $nodes;
  187. }
  188. protected function checkRequestIdAndTryAgain(array $response, $id, int $again = 1): array
  189. {
  190. if (is_null($id)) {
  191. // If the request id is null then do not check.
  192. return $response;
  193. }
  194. if (isset($response['id']) && $response['id'] === $id) {
  195. return $response;
  196. }
  197. if ($again <= 0) {
  198. throw new RequestException(sprintf(
  199. 'Invalid response. Request id[%s] is not equal to response id[%s].',
  200. $id,
  201. $response['id'] ?? null
  202. ));
  203. }
  204. $response = $this->client->recv();
  205. --$again;
  206. return $this->checkRequestIdAndTryAgain($response, $id, $again);
  207. }
  208. }