ConsulDriver.php 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * This file is part of Hyperf.
  5. *
  6. * @link https://www.hyperf.io
  7. * @document https://hyperf.wiki
  8. * @contact group@hyperf.io
  9. * @license https://github.com/hyperf/hyperf/blob/master/LICENSE
  10. */
  11. namespace Hyperf\ServiceGovernanceConsul;
  12. use Hyperf\Consul\AgentInterface;
  13. use Hyperf\Consul\Health;
  14. use Hyperf\Consul\HealthInterface;
  15. use Hyperf\Contract\ConfigInterface;
  16. use Hyperf\Contract\StdoutLoggerInterface;
  17. use Hyperf\Guzzle\ClientFactory;
  18. use Hyperf\ServiceGovernance\DriverInterface;
  19. use Hyperf\ServiceGovernance\Exception\ComponentRequiredException;
  20. use Psr\Container\ContainerInterface;
  21. use Psr\Log\LoggerInterface;
  22. use function Hyperf\Support\make;
  23. class ConsulDriver implements DriverInterface
  24. {
  25. protected LoggerInterface $logger;
  26. protected ConfigInterface $config;
  27. protected array $registeredServices = [];
  28. protected ?HealthInterface $health = null;
  29. public function __construct(protected ContainerInterface $container)
  30. {
  31. $this->logger = $container->get(StdoutLoggerInterface::class);
  32. $this->config = $container->get(ConfigInterface::class);
  33. }
  34. public function isLongPolling(): bool
  35. {
  36. return false;
  37. }
  38. public function getNodes(string $uri, string $name, array $metadata): array
  39. {
  40. $health = $this->createConsulHealth($uri);
  41. $services = $health->service($name)->json();
  42. $nodes = [];
  43. foreach ($services as $node) {
  44. $passing = true;
  45. $service = $node['Service'] ?? [];
  46. $checks = $node['Checks'] ?? [];
  47. if (isset($service['Meta']['Protocol']) && $metadata['protocol'] !== $service['Meta']['Protocol']) {
  48. // The node is invalid, if the protocol is not equal with the client's protocol.
  49. continue;
  50. }
  51. foreach ($checks as $check) {
  52. $status = $check['Status'] ?? false;
  53. if ($status !== 'passing') {
  54. $passing = false;
  55. }
  56. }
  57. if ($passing) {
  58. $address = $service['Address'] ?? '';
  59. $port = (int) ($service['Port'] ?? 0);
  60. // @TODO Get and set the weight property.
  61. $address && $port && $nodes[] = ['host' => $address, 'port' => $port];
  62. }
  63. }
  64. return $nodes;
  65. }
  66. public function register(string $name, string $host, int $port, array $metadata): void
  67. {
  68. $nextId = empty($metadata['id']) ? $this->generateId($this->getLastServiceId($name)) : $metadata['id'];
  69. $protocol = $metadata['protocol'];
  70. $deregisterCriticalServiceAfter = $this->config->get('services.drivers.consul.check.deregister_critical_service_after') ?? '90m';
  71. $interval = $this->config->get('services.drivers.consul.check.interval') ?? '1s';
  72. $requestBody = [
  73. 'Name' => $name,
  74. 'ID' => $nextId,
  75. 'Address' => $host,
  76. 'Port' => $port,
  77. 'Meta' => [
  78. 'Protocol' => $protocol,
  79. ],
  80. ];
  81. if ($protocol === 'jsonrpc-http') {
  82. $requestBody['Check'] = [
  83. 'DeregisterCriticalServiceAfter' => $deregisterCriticalServiceAfter,
  84. 'HTTP' => "http://{$host}:{$port}/",
  85. 'Interval' => $interval,
  86. ];
  87. }
  88. if (in_array($protocol, ['jsonrpc', 'jsonrpc-tcp-length-check', 'multiplex.default'], true)) {
  89. $requestBody['Check'] = [
  90. 'DeregisterCriticalServiceAfter' => $deregisterCriticalServiceAfter,
  91. 'TCP' => "{$host}:{$port}",
  92. 'Interval' => $interval,
  93. ];
  94. }
  95. if ($protocol === 'grpc') {
  96. $requestBody['Check'] = [
  97. 'DeregisterCriticalServiceAfter' => $deregisterCriticalServiceAfter,
  98. 'GRPC' => "{$host}:{$port}",
  99. 'GRPCUseTLS' => false,
  100. 'Interval' => $interval,
  101. ];
  102. }
  103. $response = $this->client()->registerService($requestBody);
  104. if ($response->getStatusCode() === 200) {
  105. $this->registeredServices[$name][$protocol][$host][$port] = true;
  106. $this->logger->info(sprintf('Service %s:%s register to the consul successfully.', $name, $nextId));
  107. } else {
  108. $this->logger->warning(sprintf('Service %s register to the consul failed.', $name));
  109. }
  110. }
  111. public function isRegistered(string $name, string $host, int $port, array $metadata): bool
  112. {
  113. $protocol = $metadata['protocol'];
  114. if (isset($this->registeredServices[$name][$protocol][$host][$port])) {
  115. return true;
  116. }
  117. $client = $this->client();
  118. $response = $client->services();
  119. if ($response->getStatusCode() !== 200) {
  120. $this->logger->warning(sprintf('Service %s register to the consul failed.', $name));
  121. return false;
  122. }
  123. $services = $response->json();
  124. $glue = ',';
  125. $tag = implode($glue, [$name, $host, $port, $protocol]);
  126. foreach ($services as $service) {
  127. if (! isset($service['Service'], $service['Address'], $service['Port'], $service['Meta']['Protocol'])) {
  128. continue;
  129. }
  130. $currentTag = implode($glue, [
  131. $service['Service'],
  132. $service['Address'],
  133. $service['Port'],
  134. $service['Meta']['Protocol'],
  135. ]);
  136. if ($currentTag === $tag) {
  137. $this->registeredServices[$name][$protocol][$host][$port] = true;
  138. return true;
  139. }
  140. }
  141. return false;
  142. }
  143. protected function client(): AgentInterface
  144. {
  145. return $this->container->get(ConsulAgent::class);
  146. }
  147. protected function getLastServiceId(string $name)
  148. {
  149. $maxId = -1;
  150. $lastService = $name;
  151. $services = $this->client()->services()->json();
  152. foreach ($services ?? [] as $id => $service) {
  153. if (isset($service['Service']) && $service['Service'] === $name) {
  154. $exploded = explode('-', (string) $id);
  155. $length = count($exploded);
  156. if ($length > 1 && is_numeric($exploded[$length - 1]) && $maxId < $exploded[$length - 1]) {
  157. $maxId = $exploded[$length - 1];
  158. $lastService = $service;
  159. }
  160. }
  161. }
  162. return $lastService['ID'] ?? $name;
  163. }
  164. protected function generateId(string $name)
  165. {
  166. $exploded = explode('-', $name);
  167. $length = count($exploded);
  168. $end = -1;
  169. if ($length > 1 && is_numeric($exploded[$length - 1])) {
  170. $end = $exploded[$length - 1];
  171. unset($exploded[$length - 1]);
  172. }
  173. $end = intval($end);
  174. ++$end;
  175. $exploded[] = $end;
  176. return implode('-', $exploded);
  177. }
  178. protected function createConsulHealth(string $baseUri): HealthInterface
  179. {
  180. if ($this->health instanceof HealthInterface) {
  181. return $this->health;
  182. }
  183. if (! class_exists(Health::class)) {
  184. throw new ComponentRequiredException('Component of \'hyperf/consul\' is required if you want the client fetch the nodes info from consul.');
  185. }
  186. $token = $this->config->get('services.drivers.consul.token', '');
  187. $options = [
  188. 'base_uri' => $baseUri,
  189. ];
  190. if (! empty($token)) {
  191. $options['headers'] = [
  192. 'X-Consul-Token' => $token,
  193. ];
  194. }
  195. return $this->health = make(Health::class, [
  196. 'clientFactory' => function () use ($options) {
  197. return $this->container->get(ClientFactory::class)->create($options);
  198. },
  199. ]);
  200. }
  201. }