Server.php 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * This file is part of Hyperf.
  5. *
  6. * @link https://www.hyperf.io
  7. * @document https://hyperf.wiki
  8. * @contact group@hyperf.io
  9. * @license https://github.com/hyperf/hyperf/blob/master/LICENSE
  10. */
  11. namespace Hyperf\RpcServer;
  12. use Hyperf\Context\RequestContext;
  13. use Hyperf\Context\ResponseContext;
  14. use Hyperf\Contract\ConfigInterface;
  15. use Hyperf\Contract\DispatcherInterface;
  16. use Hyperf\Contract\MiddlewareInitializerInterface;
  17. use Hyperf\Contract\OnReceiveInterface;
  18. use Hyperf\Coordinator\Constants;
  19. use Hyperf\Coordinator\CoordinatorManager;
  20. use Hyperf\ExceptionHandler\ExceptionHandlerDispatcher;
  21. use Hyperf\HttpMessage\Stream\SwooleStream;
  22. use Hyperf\HttpServer\Contract\CoreMiddlewareInterface;
  23. use Hyperf\HttpServer\Exception\Handler\HttpExceptionHandler;
  24. use Hyperf\Rpc\Context as RpcContext;
  25. use Hyperf\Rpc\Protocol;
  26. use Hyperf\RpcServer\Event\RequestHandled;
  27. use Hyperf\RpcServer\Event\RequestReceived;
  28. use Hyperf\RpcServer\Event\RequestTerminated;
  29. use Hyperf\Server\Option;
  30. use Hyperf\Server\ServerFactory;
  31. use Hyperf\Server\ServerManager;
  32. use Psr\Container\ContainerInterface;
  33. use Psr\EventDispatcher\EventDispatcherInterface;
  34. use Psr\Http\Message\ResponseInterface;
  35. use Psr\Log\LoggerInterface;
  36. use Swoole\Coroutine\Server\Connection;
  37. use Swoole\Server as SwooleServer;
  38. use Swow\Psr7\Message\ResponsePlusInterface;
  39. use Swow\Psr7\Message\ServerRequestPlusInterface;
  40. use Throwable;
  41. use function Hyperf\Coroutine\defer;
  42. abstract class Server implements OnReceiveInterface, MiddlewareInitializerInterface
  43. {
  44. protected array $middlewares = [];
  45. protected array $exceptionHandlers = [];
  46. protected ?CoreMiddlewareInterface $coreMiddleware = null;
  47. protected ?string $serverName = null;
  48. protected ?Protocol $protocol = null;
  49. protected ?EventDispatcherInterface $event = null;
  50. protected ?Option $option = null;
  51. public function __construct(
  52. protected ContainerInterface $container,
  53. protected DispatcherInterface $dispatcher,
  54. protected ExceptionHandlerDispatcher $exceptionHandlerDispatcher,
  55. protected LoggerInterface $logger
  56. ) {
  57. if ($this->container->has(EventDispatcherInterface::class)) {
  58. $this->event = $this->container->get(EventDispatcherInterface::class);
  59. }
  60. }
  61. public function initCoreMiddleware(string $serverName): void
  62. {
  63. $this->serverName = $serverName;
  64. $this->coreMiddleware = $this->createCoreMiddleware();
  65. $config = $this->container->get(ConfigInterface::class);
  66. $this->middlewares = $config->get('middlewares.' . $serverName, []);
  67. $this->exceptionHandlers = $config->get('exceptions.handler.' . $serverName, $this->getDefaultExceptionHandler());
  68. $this->initOption();
  69. }
  70. public function onReceive($server, int $fd, int $reactorId, string $data): void
  71. {
  72. $request = $response = null;
  73. try {
  74. CoordinatorManager::until(Constants::WORKER_START)->yield();
  75. // Initialize PSR-7 Request and Response objects.
  76. RequestContext::set($request = $this->buildRequest($fd, $reactorId, $data));
  77. ResponseContext::set($response = $this->buildResponse($fd, $server));
  78. $request = $this->coreMiddleware->dispatch($request);
  79. $middlewares = $this->middlewares;
  80. $this->option?->isEnableRequestLifecycle() && $this->event?->dispatch(new RequestReceived(
  81. request: $request,
  82. response: $response,
  83. serverName: $this->serverName
  84. ));
  85. $response = $this->dispatcher->dispatch($request, $middlewares, $this->coreMiddleware);
  86. } catch (Throwable $throwable) {
  87. // Delegate the exception to exception handler.
  88. $exceptionHandlerDispatcher = $this->container->get(ExceptionHandlerDispatcher::class);
  89. $response = $exceptionHandlerDispatcher->dispatch($throwable, $this->exceptionHandlers);
  90. } finally {
  91. if (isset($request) && $this->option?->isEnableRequestLifecycle()) {
  92. defer(fn () => $this->event?->dispatch(new RequestTerminated(
  93. request: $request,
  94. response: $response ?? null,
  95. exception: $throwable ?? null,
  96. serverName: $this->serverName
  97. )));
  98. $this->event?->dispatch(new RequestHandled(
  99. request: $request,
  100. response: $response ?? null,
  101. exception: $throwable ?? null,
  102. serverName: $this->serverName
  103. ));
  104. }
  105. if (! $response instanceof ResponseInterface) {
  106. $response = $this->transferToResponse($response);
  107. }
  108. if ($response) {
  109. $this->send($server, $fd, $response);
  110. }
  111. }
  112. }
  113. public function onConnect($server, int $fd)
  114. {
  115. // $server is the main server object, not the server object that this callback on.
  116. /* @var \Swoole\Server\Port */
  117. [$type, $port] = ServerManager::get($this->serverName);
  118. $this->logger->debug(sprintf('Connect to %s:%d', $port->host, $port->port));
  119. }
  120. public function onClose($server, int $fd)
  121. {
  122. // $server is the main server object, not the server object that this callback on.
  123. /* @var \Swoole\Server\Port */
  124. [$type, $port] = ServerManager::get($this->serverName);
  125. $this->logger->debug(sprintf('Close on %s:%d', $port->host, $port->port));
  126. }
  127. protected function getDefaultExceptionHandler(): array
  128. {
  129. return [
  130. HttpExceptionHandler::class,
  131. ];
  132. }
  133. /**
  134. * @param Connection|SwooleServer $server
  135. */
  136. protected function send($server, int $fd, ResponseInterface $response): void
  137. {
  138. if ($server instanceof SwooleServer) {
  139. $server->send($fd, (string) $response->getBody());
  140. } elseif ($server instanceof Connection) {
  141. $server->send((string) $response->getBody());
  142. }
  143. }
  144. abstract protected function createCoreMiddleware(): CoreMiddlewareInterface;
  145. abstract protected function buildRequest(int $fd, int $reactorId, string $data): ServerRequestPlusInterface;
  146. abstract protected function buildResponse(int $fd, $server): ResponsePlusInterface;
  147. protected function transferToResponse($response): ?ResponseInterface
  148. {
  149. return ResponseContext::getOrNull()?->setBody(new SwooleStream($response));
  150. }
  151. protected function getContext()
  152. {
  153. return $this->container->get(RpcContext::class);
  154. }
  155. protected function initOption(): void
  156. {
  157. $ports = $this->container->get(ServerFactory::class)->getConfig()?->getServers();
  158. if (! $ports) {
  159. return;
  160. }
  161. foreach ($ports as $port) {
  162. if ($port->getName() === $this->serverName) {
  163. $this->option = $port->getOptions();
  164. }
  165. }
  166. $this->option ??= Option::make([]);
  167. $this->option->setMustSortMiddlewaresByMiddlewares($this->middlewares);
  168. }
  169. }