NacosDriver.php 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * This file is part of Hyperf.
  5. *
  6. * @link https://www.hyperf.io
  7. * @document https://hyperf.wiki
  8. * @contact group@hyperf.io
  9. * @license https://github.com/hyperf/hyperf/blob/master/LICENSE
  10. */
  11. namespace Hyperf\ServiceGovernanceNacos;
  12. use Hyperf\Codec\Json;
  13. use Hyperf\Contract\ConfigInterface;
  14. use Hyperf\Contract\StdoutLoggerInterface;
  15. use Hyperf\Coordinator\Constants;
  16. use Hyperf\Coordinator\CoordinatorManager;
  17. use Hyperf\Coroutine\Coroutine;
  18. use Hyperf\Nacos\Exception\RequestException;
  19. use Hyperf\ServiceGovernance\DriverInterface;
  20. use Psr\Container\ContainerInterface;
  21. use Psr\Http\Message\ResponseInterface;
  22. use Psr\Log\LoggerInterface;
  23. use Throwable;
  24. use function Hyperf\Support\retry;
  25. class NacosDriver implements DriverInterface
  26. {
  27. protected Client $client;
  28. protected LoggerInterface $logger;
  29. protected ConfigInterface $config;
  30. protected array $serviceRegistered = [];
  31. protected array $serviceCreated = [];
  32. protected array $registerHeartbeat = [];
  33. private array $metadata = [];
  34. public function __construct(protected ContainerInterface $container)
  35. {
  36. $this->client = $container->get(Client::class);
  37. $this->logger = $container->get(StdoutLoggerInterface::class);
  38. $this->config = $container->get(ConfigInterface::class);
  39. }
  40. public function isLongPolling(): bool
  41. {
  42. return false;
  43. }
  44. public function getNodes(string $uri, string $name, array $metadata): array
  45. {
  46. $response = $this->client->instance->list($name, [
  47. 'groupName' => $this->config->get('services.drivers.nacos.group_name'),
  48. 'namespaceId' => $this->config->get('services.drivers.nacos.namespace_id'),
  49. ]);
  50. if ($response->getStatusCode() !== 200) {
  51. throw new RequestException((string) $response->getBody(), $response->getStatusCode());
  52. }
  53. $data = Json::decode((string) $response->getBody());
  54. $hosts = $data['hosts'] ?? [];
  55. $nodes = [];
  56. foreach ($hosts as $node) {
  57. if (isset($node['ip'], $node['port']) && ($node['healthy'] ?? false)) {
  58. $nodes[] = [
  59. 'host' => $node['ip'],
  60. 'port' => $node['port'],
  61. 'weight' => $this->getWeight($node['weight'] ?? 1),
  62. ];
  63. }
  64. }
  65. return $nodes;
  66. }
  67. public function register(string $name, string $host, int $port, array $metadata): void
  68. {
  69. $this->setMetadata($name, $metadata);
  70. $ephemeral = (bool) $this->config->get('services.drivers.nacos.ephemeral');
  71. if (! $ephemeral && ! array_key_exists($name, $this->serviceCreated)) {
  72. $response = $this->client->service->create($name, [
  73. 'groupName' => $this->config->get('services.drivers.nacos.group_name'),
  74. 'namespaceId' => $this->config->get('services.drivers.nacos.namespace_id'),
  75. 'metadata' => $this->getMetadata($name),
  76. 'protectThreshold' => (float) $this->config->get('services.drivers.nacos.protect_threshold', 0),
  77. ]);
  78. if ($response->getStatusCode() !== 200 || (string) $response->getBody() !== 'ok') {
  79. throw new RequestException(sprintf('Failed to create nacos service %s , %s !', $name, $response->getBody()));
  80. }
  81. $this->serviceCreated[$name] = true;
  82. }
  83. $response = $this->client->instance->register($host, $port, $name, [
  84. 'groupName' => $this->config->get('services.drivers.nacos.group_name'),
  85. 'namespaceId' => $this->config->get('services.drivers.nacos.namespace_id'),
  86. 'metadata' => $this->getMetadata($name),
  87. 'ephemeral' => $ephemeral ? 'true' : 'false',
  88. ]);
  89. if ($response->getStatusCode() !== 200 || (string) $response->getBody() !== 'ok') {
  90. throw new RequestException(sprintf('Failed to create nacos instance %s:%d! for %s , %s ', $host, $port, $name, $response->getBody()));
  91. }
  92. $this->serviceRegistered[$name] = true;
  93. if ($ephemeral) {
  94. $this->registerHeartbeat($name, $host, $port);
  95. }
  96. }
  97. public function isRegistered(string $name, string $host, int $port, array $metadata): bool
  98. {
  99. if (array_key_exists($name, $this->serviceRegistered)) {
  100. return true;
  101. }
  102. $this->setMetadata($name, $metadata);
  103. $response = $this->client->service->detail(
  104. $name,
  105. $this->config->get('services.drivers.nacos.group_name'),
  106. $this->config->get('services.drivers.nacos.namespace_id')
  107. );
  108. if ($response->getStatusCode() === 404) {
  109. return false;
  110. }
  111. if (in_array($response->getStatusCode(), [400, 500], true) && strpos((string) $response->getBody(), 'not found') > 0) {
  112. return false;
  113. }
  114. if ($response->getStatusCode() !== 200) {
  115. throw new RequestException(sprintf('Failed to get nacos service %s!', $name), $response->getStatusCode());
  116. }
  117. $this->serviceCreated[$name] = true;
  118. $response = $this->client->instance->detail($host, $port, $name, [
  119. 'groupName' => $this->config->get('services.drivers.nacos.group_name'),
  120. 'namespaceId' => $this->config->get('services.drivers.nacos.namespace_id'),
  121. ]);
  122. if ($this->isNoIpsFound($response)) {
  123. return false;
  124. }
  125. if ($response->getStatusCode() !== 200) {
  126. throw new RequestException(sprintf('Failed to get nacos instance %s:%d for %s!', $host, $port, $name));
  127. }
  128. $this->serviceRegistered[$name] = true;
  129. if ($this->config->get('services.drivers.nacos.ephemeral')) {
  130. $this->registerHeartbeat($name, $host, $port);
  131. }
  132. return true;
  133. }
  134. protected function isNoIpsFound(ResponseInterface $response): bool
  135. {
  136. if ($response->getStatusCode() === 404) {
  137. return true;
  138. }
  139. if ($response->getStatusCode() === 500) {
  140. $messages = [
  141. 'no ips found',
  142. 'no matched ip',
  143. ];
  144. $body = (string) $response->getBody();
  145. foreach ($messages as $message) {
  146. if (str_contains($body, $message)) {
  147. return true;
  148. }
  149. }
  150. }
  151. return false;
  152. }
  153. protected function setMetadata(string $name, array $metadata)
  154. {
  155. $this->metadata[$name] = $metadata;
  156. }
  157. protected function getMetadata(string $name): ?string
  158. {
  159. if (empty($this->metadata[$name])) {
  160. return null;
  161. }
  162. unset($this->metadata[$name]['methodName']);
  163. return Json::encode($this->metadata[$name]);
  164. }
  165. protected function registerHeartbeat(string $name, string $host, int $port): void
  166. {
  167. $key = $name . $host . $port;
  168. if (isset($this->registerHeartbeat[$key])) {
  169. return;
  170. }
  171. $this->registerHeartbeat[$key] = true;
  172. Coroutine::create(function () use ($name, $host, $port) {
  173. retry(INF, function () use ($name, $host, $port) {
  174. $lightBeatEnabled = false;
  175. while (true) {
  176. try {
  177. $heartbeat = $this->config->get('services.drivers.nacos.heartbeat', 5);
  178. if (CoordinatorManager::until(Constants::WORKER_EXIT)->yield($heartbeat)) {
  179. break;
  180. }
  181. $groupName = $this->config->get('services.drivers.nacos.group_name');
  182. $response = $this->client->instance->beat(
  183. $name,
  184. [
  185. 'ip' => $host,
  186. 'port' => $port,
  187. 'serviceName' => $groupName . '@@' . $name,
  188. ],
  189. $groupName,
  190. $this->config->get('services.drivers.nacos.namespace_id'),
  191. null,
  192. $lightBeatEnabled
  193. );
  194. $result = Json::decode((string) $response->getBody());
  195. if ($response->getStatusCode() === 200) {
  196. $this->logger->debug(sprintf('Instance %s:%d heartbeat successfully, result code:%s', $host, $port, $result['code']));
  197. } else {
  198. $this->logger->error(sprintf('Instance %s:%d heartbeat failed! %s', $host, $port, (string) $response->getBody()));
  199. continue;
  200. }
  201. $lightBeatEnabled = false;
  202. if (isset($result['lightBeatEnabled'])) {
  203. $lightBeatEnabled = $result['lightBeatEnabled'];
  204. }
  205. if ($result['code'] == 20404) {
  206. $this->client->instance->register($host, $port, $name, [
  207. 'groupName' => $this->config->get('services.drivers.nacos.group_name'),
  208. 'namespaceId' => $this->config->get('services.drivers.nacos.namespace_id'),
  209. 'metadata' => $this->getMetadata($name),
  210. ]);
  211. }
  212. } catch (Throwable $exception) {
  213. $this->logger->error('The nacos heartbeat failed, caused by ' . $exception);
  214. throw $exception;
  215. }
  216. }
  217. });
  218. });
  219. }
  220. private function getWeight($weight): int
  221. {
  222. return intval(100 * $weight);
  223. }
  224. }