WebSocketController.php 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Controller;
  4. use App\Amqp\Producer\MqProducer;
  5. use Hyperf\Amqp\Producer;
  6. use Hyperf\Context\ApplicationContext as ContextApplicationContext;
  7. use Hyperf\Contract\OnCloseInterface;
  8. use Hyperf\Contract\OnMessageInterface;
  9. use Hyperf\Contract\OnOpenInterface;
  10. use Hyperf\Engine\WebSocket\Frame;
  11. use Hyperf\Engine\WebSocket\Response;
  12. use Hyperf\WebSocketServer\Annotation\MessageHandler;
  13. use Hyperf\WebSocketServer\Context\WebSocketContext;
  14. use Hyperf\WebSocketServer\Message\Text;
  15. use Hyperf\Di\Annotation\Inject;
  16. use Phper666\JWTAuth\JWT;
  17. use App\JsonRpc\ChatServiceInterface;
  18. use Hyperf\WebSocketServer\Constant\Opcode;
  19. use App\Service\RedisService;
  20. use App\Service\Message\ReceiveHandleService;
  21. use http\Client\Request;
  22. use App\Controller\AbstractController;
  23. class WebSocketController extends AbstractController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
  24. {
  25. #[Inject]
  26. protected JWT $jwt;
  27. /**
  28. * @var ChatServiceInterface
  29. */
  30. #[Inject]
  31. private $chatServiceClient;
  32. /**
  33. * @Inject
  34. * @var ReceiveHandleService
  35. */
  36. protected $receiveHandle;
  37. public function onMessage($server, $frame): void
  38. {
  39. //把数据推给前端
  40. $redisClient = new RedisService();
  41. $userId = $redisClient->findUser((string)$frame->fd);
  42. var_dump("用户ID:::",$userId);
  43. //存入队列
  44. $result = json_decode($frame->data, true);
  45. $result['user_id'] = $userId;
  46. var_dump("接收到的数据:",$result);
  47. $message = new MqProducer($result);
  48. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  49. $producer->produce($message);
  50. foreach ($server->connections as $fd) {
  51. if ($server->isEstablished($fd)) {
  52. var_dump($fd);
  53. $server->push($fd, $frame->data);
  54. }
  55. }
  56. }
  57. public function onClose($server, int $fd, int $reactorId): void
  58. {
  59. var_dump('closed::::::::::::::::::',$fd,"======",$reactorId,"+++++++++++");
  60. // $data = [
  61. // 'fd'=>$fd
  62. // ];
  63. // $this->chatServiceClient->delChatChannel($data);
  64. $redisClient = new RedisService();
  65. $userId = $redisClient->findUser((string)$fd);
  66. $redisClient->unbind((string)$fd,(int)$userId);
  67. }
  68. public function onOpen($server, $request): void
  69. {
  70. $token = $request->get['token'];
  71. $userInfo = $this->jwt->getClaimsByToken($token);
  72. $response = (new Response($server))->init($request);
  73. $fd = $response->getFd();
  74. // var_dump("管道ID:",$fd);
  75. // $data = [
  76. // 'user_id'=>$userInfo['uid'],
  77. // 'fd'=>$fd
  78. // ];
  79. // var_dump(SERVER_RUN_ID,"+++++++++++++");
  80. // $this->chatServiceClient->addChatChannel($data);
  81. $server->bind($fd,$userInfo['uid']);
  82. $redisClient = new RedisService();
  83. $redisClient->bind((string)$fd,$userInfo['uid']);
  84. $server->push($request->fd, json_encode([
  85. "event" => "connect",
  86. "content" => [
  87. "ping_interval" => 20,
  88. "ping_timeout" => 20 * 3,
  89. "content" =>"连接成功"
  90. ],
  91. ]));
  92. }
  93. }