App_Controller_WebSocketController.proxy.php 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. <?php
  2. declare (strict_types=1);
  3. namespace App\Controller;
  4. use App\Amqp\Producer\MqProducer;
  5. use Hyperf\Amqp\Producer;
  6. use Hyperf\Context\ApplicationContext as ContextApplicationContext;
  7. use Hyperf\Contract\OnCloseInterface;
  8. use Hyperf\Contract\OnMessageInterface;
  9. use Hyperf\Contract\OnOpenInterface;
  10. use Hyperf\Engine\WebSocket\Frame;
  11. use Hyperf\Engine\WebSocket\Response;
  12. use Hyperf\WebSocketServer\Annotation\MessageHandler;
  13. use Hyperf\WebSocketServer\Context\WebSocketContext;
  14. use Hyperf\WebSocketServer\Message\Text;
  15. use Hyperf\Di\Annotation\Inject;
  16. use Phper666\JWTAuth\JWT;
  17. use App\JsonRpc\ChatServiceInterface;
  18. use Hyperf\WebSocketServer\Constant\Opcode;
  19. use App\Service\RedisService;
  20. use App\Service\Message\ReceiveHandleService;
  21. use http\Client\Request;
  22. use App\Controller\AbstractController;
  23. class WebSocketController extends AbstractController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
  24. {
  25. use \Hyperf\Di\Aop\ProxyTrait;
  26. use \Hyperf\Di\Aop\PropertyHandlerTrait;
  27. function __construct(\Hyperf\Logger\LoggerFactory $loggerFactory)
  28. {
  29. if (method_exists(parent::class, '__construct')) {
  30. parent::__construct(...func_get_args());
  31. }
  32. $this->__handlePropertyHandler(__CLASS__);
  33. }
  34. #[Inject]
  35. protected JWT $jwt;
  36. /**
  37. * @var ChatServiceInterface
  38. */
  39. #[Inject]
  40. private $chatServiceClient;
  41. /**
  42. * @Inject
  43. * @var ReceiveHandleService
  44. */
  45. protected $receiveHandle;
  46. public function onMessage($server, $frame) : void
  47. {
  48. //把数据推给前端
  49. $redisClient = new RedisService();
  50. $userId = $redisClient->findUser((string) $frame->fd);
  51. var_dump("用户ID:::", $userId);
  52. //存入队列
  53. $result = json_decode($frame->data, true);
  54. $result['user_id'] = $userId;
  55. var_dump("接收到的数据:", $result);
  56. $message = new MqProducer($result);
  57. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  58. $producer->produce($message);
  59. foreach ($server->connections as $fd) {
  60. if ($server->isEstablished($fd)) {
  61. var_dump($fd);
  62. $server->push($fd, $frame->data);
  63. }
  64. }
  65. }
  66. public function onClose($server, int $fd, int $reactorId) : void
  67. {
  68. var_dump('closed::::::::::::::::::', $fd, "======", $reactorId, "+++++++++++");
  69. // $data = [
  70. // 'fd'=>$fd
  71. // ];
  72. // $this->chatServiceClient->delChatChannel($data);
  73. $redisClient = new RedisService();
  74. $userId = $redisClient->findUser((string) $fd);
  75. $redisClient->unbind((string) $fd, (int) $userId);
  76. }
  77. public function onOpen($server, $request) : void
  78. {
  79. $token = $request->get['token'];
  80. $userInfo = $this->jwt->getClaimsByToken($token);
  81. $response = (new Response($server))->init($request);
  82. $fd = $response->getFd();
  83. // var_dump("管道ID:",$fd);
  84. // $data = [
  85. // 'user_id'=>$userInfo['uid'],
  86. // 'fd'=>$fd
  87. // ];
  88. // var_dump(SERVER_RUN_ID,"+++++++++++++");
  89. // $this->chatServiceClient->addChatChannel($data);
  90. $server->bind($fd, $userInfo['uid']);
  91. $redisClient = new RedisService();
  92. $redisClient->bind((string) $fd, $userInfo['uid']);
  93. $server->push($request->fd, json_encode(["event" => "connect", "content" => ["ping_interval" => 20, "ping_timeout" => 20 * 3, "content" => "连接成功"]]));
  94. }
  95. }