*/ protected array $configListenContexts = []; /** * @var array */ protected array $configListenHandlers = []; /** * @var array */ protected array $namingListenContexts = []; /** * @var array */ protected array $namingListenHandlers = []; protected int $streamId; public function __construct( protected Application $app, protected Config $config, protected ContainerInterface $container, protected string $namespaceId = '', protected string $module = 'config', ) { if ($this->container->has(StdoutLoggerInterface::class)) { $this->logger = $this->container->get(StdoutLoggerInterface::class); } $this->reconnect(); } public function request(RequestInterface $request, ?Client $client = null): Response { $payload = new Payload([ 'metadata' => new Metadata($this->getMetadata($request)), 'body' => new Any([ 'value' => Json::encode($request->getValue()), ]), ]); $client ??= $this->client; $response = $client->request( new Request('/Request/request', 'POST', Parser::serializeMessage($payload), $this->grpcDefaultHeaders()) ); return Response::jsonDeSerialize($response->getBody()); } public function write(int $streamId, RequestInterface $request, ?Client $client = null): bool { $payload = new Payload([ 'metadata' => new Metadata($this->getMetadata($request)), 'body' => new Any([ 'value' => Json::encode($request->getValue()), ]), ]); $client ??= $this->client; return $client->write($streamId, Parser::serializeMessage($payload)); } public function listenConfig(string $group, string $dataId, ListenHandlerInterface $callback, string $md5 = ''): void { $listenContext = new ListenContext($this->namespaceId, $group, $dataId, $md5); $this->configListenContexts[$listenContext->toKeyString()] = $listenContext; $this->configListenHandlers[$listenContext->toKeyString()] = $callback; } public function listenNaming(string $clusters, string $group, string $service, ListenHandlerInterface $callback): void { $serviceInfo = new ServiceInfo($service, $group, $clusters); if ($context = $this->namingListenContexts[$serviceInfo->toKeyString()] ?? null) { $callback->handle(new NotifySubscriberRequest(['requestId' => '', 'module' => 'naming', 'serviceInfo' => $context])); $this->namingListenHandlers[$serviceInfo->toKeyString()] = $callback; return; } $this->namingListenContexts[$serviceInfo->toKeyString()] = $serviceInfo; $this->namingListenHandlers[$serviceInfo->toKeyString()] = $callback; } public function listen(): void { $request = new ConfigBatchListenRequest(true, array_values($this->configListenContexts)); $response = $this->request($request); if ($response instanceof ConfigChangeBatchListenResponse) { $changedConfigs = $response->changedConfigs; foreach ($changedConfigs as $changedConfig) { $this->handleConfig($changedConfig->tenant, $changedConfig->group, $changedConfig->dataId); } } } protected function reconnect(): void { $this->client && $this->client->close(); $this->client = new Client( $this->config->getHost() . ':' . ($this->config->getPort() + 1000), [ 'heartbeat' => null, ] ); if ($this->logger) { $this->client->setLogger($this->logger); } $this->serverCheck(); $this->streamId = $this->bindStreamCall(); $this->healthCheck(); } protected function healthCheck() { go(function () { $client = $this->client; $heartbeat = $this->config->getGrpc()['heartbeat']; while ($heartbeat > 0 && $client->inLoop()) { if (CoordinatorManager::until(Constants::WORKER_EXIT)->yield($heartbeat)) { break; } $res = $this->request(new HealthCheckRequest(), $client); if ($res->errorCode !== 0) { $this->logger?->error('Health check failed, the result is ' . (string) $res); } } }); } protected function ip(): string { if ($this->container->has(IPReaderInterface::class)) { return $this->container->get(IPReaderInterface::class)->read(); } return Network::ip(); } protected function bindStreamCall(): int { $id = $this->client->send(new Request('/BiRequestStream/requestBiStream', 'POST', '', $this->grpcDefaultHeaders(), true)); go(function () use ($id) { $client = $this->client; while (true) { try { if (! $client->inLoop()) { break; } $response = $client->recv($id, -1); $response = Response::jsonDeSerialize($response->getBody()); match (true) { $response instanceof ConfigChangeNotifyRequest => $this->handleConfig( $response->tenant, $response->group, $response->dataId, $response ), $response instanceof NotifySubscriberRequest => $this->handleNaming($response), }; $this->listen(); } catch (Throwable $e) { ! $this->isWorkerExit() && $this->logger->error((string) $e); } } if (! $this->isWorkerExit()) { $this->reconnect(); $this->listen(); } }); $request = new ConnectionSetupRequest($this->namespaceId, $this->module); $this->write($id, $request); return $id; } protected function handleConfig(string $tenant, string $group, string $dataId, ?ConfigChangeNotifyRequest $request = null): void { $response = $this->request(new ConfigQueryRequest($tenant, $group, $dataId)); $key = ListenContext::getKeyString($tenant, $group, $dataId); if ($response instanceof ConfigQueryResponse) { if (isset($this->configListenContexts[$key])) { $this->configListenContexts[$key]->md5 = $response->getMd5(); $this->configListenHandlers[$key]?->handle($response); } if ($request && $ack = $this->configListenHandlers[$key]?->ack($request)) { $this->write($this->streamId, $ack); } } } protected function handleNaming(NotifySubscriberRequest $response): void { $serviceInfo = $response->serviceInfo; $key = $serviceInfo->toKeyString(); if (! isset($this->namingListenContexts[$key])) { $this->namingListenContexts[$key] = $serviceInfo; } if ($serviceInfo->lastRefTime > $this->namingListenContexts[$key]->lastRefTime) { $this->namingListenContexts[$key] = $serviceInfo; } if ($handler = $this->namingListenHandlers[$key] ?? null) { $handler->handle($response); $this->write($this->streamId, $handler->ack($response)); } else { $this->write($this->streamId, (new NamingPushRequestHandler(fn () => null))->ack($response)); } } /** * @deprecated since 3.1, use handleNaming instead. */ protected function hanldeNaming(NotifySubscriberRequest $response) { $this->handleNaming($response); } protected function serverCheck(): bool { $request = new ServerCheckRequest(); while (true) { try { $response = $this->request($request); if ($response->errorCode !== 0) { $this->logger?->error('Nacos check server failed.'); if (CoordinatorManager::until(Constants::WORKER_EXIT)->yield(5)) { break; } continue; } return true; } catch (Exception $exception) { $this->logger?->error((string) $exception); if (CoordinatorManager::until(Constants::WORKER_EXIT)->yield(5)) { break; } } } throw new ConnectToServerFailedException('the nacos server is not ready to work in 30 seconds, connect to server failed'); } private function isWorkerExit(): bool { return CoordinatorManager::until(Constants::WORKER_EXIT)->isClosing(); } private function getMetadata(RequestInterface $request): array { if ($token = $this->getAccessToken()) { return [ 'type' => $request->getType(), 'clientIp' => $this->ip(), 'headers' => [ 'accessToken' => $token, ], ]; } return [ 'type' => $request->getType(), 'clientIp' => $this->ip(), ]; } private function grpcDefaultHeaders(): array { return [ 'content-type' => 'application/grpc+proto', 'te' => 'trailers', 'user-agent' => 'Nacos-Hyperf-Client:v3.1', ]; } private function handleResponse(ResponseInterface $response): array { $statusCode = $response->getStatusCode(); $contents = (string) $response->getBody(); if ($statusCode !== 200) { throw new RequestException($contents, $statusCode); } return Json::decode($contents); } }