WebSocketController.php 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Controller;
  4. use App\Amqp\Producer\MqProducer;
  5. use Hyperf\Amqp\Producer;
  6. use Hyperf\Context\ApplicationContext as ContextApplicationContext;
  7. use Hyperf\Contract\OnCloseInterface;
  8. use Hyperf\Contract\OnMessageInterface;
  9. use Hyperf\Contract\OnOpenInterface;
  10. use Hyperf\Engine\WebSocket\Frame;
  11. use Hyperf\Engine\WebSocket\Response;
  12. use Hyperf\WebSocketServer\Annotation\MessageHandler;
  13. use Hyperf\WebSocketServer\Context\WebSocketContext;
  14. use Hyperf\WebSocketServer\Message\Text;
  15. use Hyperf\Di\Annotation\Inject;
  16. use Phper666\JWTAuth\JWT;
  17. use App\JsonRpc\ChatServiceInterface;
  18. use Hyperf\WebSocketServer\Constant\Opcode;
  19. use App\Service\RedisService;
  20. use App\Service\Message\ReceiveHandleService;
  21. use http\Client\Request;
  22. use App\Controller\AbstractController;
  23. use App\JsonRpc\UserServiceInterface;
  24. class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
  25. {
  26. #[Inject]
  27. protected JWT $jwt;
  28. /**
  29. * @var ChatServiceInterface
  30. */
  31. #[Inject]
  32. private $chatServiceClient;
  33. /**
  34. * @var UserServiceInterface
  35. */
  36. #[Inject]
  37. private $userServiceClient;
  38. /**
  39. * @Inject
  40. * @var ReceiveHandleService
  41. */
  42. protected $receiveHandle;
  43. public function onMessage($server, $frame): void
  44. {
  45. //把数据推给前端
  46. $redisClient = new RedisService();
  47. $userId = $redisClient->findUser((string)$frame->fd);
  48. //存入队列
  49. $result = json_decode($frame->data, true);
  50. $result['user_id'] = $userId;
  51. $userInfo = $redisClient->getUserInfo($userId);
  52. if($userInfo){
  53. $userInfoArr = json_decode($userInfo);
  54. $result['user_avatar'] = $userInfoArr['avatar'];
  55. }else{
  56. $userInfos = $this->userServiceClient->getUserInfo((int)$userId);
  57. $redisClient->setUserInfo($userId,$userInfos['data']);
  58. $result['user_avatar'] = $userInfos['data']['avatar'];
  59. }
  60. var_dump("接收到的数据:",$result);
  61. $message = new MqProducer($result);
  62. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  63. $producer->produce($message);
  64. //推送给前台
  65. //组装数据+头像
  66. if($result['talk_type']==1){
  67. //给自己推一条数据
  68. if ($server->isEstablished($frame->fd)) {
  69. $server->push($frame->fd, json_encode($result));
  70. }
  71. //给好友推送消息
  72. $fd = $redisClient->findFd((int)$result['receiver_id']);
  73. // $result['user_id'] = $result['friend_id'];
  74. if ($server->isEstablished((int)$fd)) {
  75. $b = $server->push((int)$fd, json_encode($result));
  76. var_dump("推送成功:",$b);
  77. }
  78. }else if($result['talk_type']==2){
  79. //根据群找到 群用户,群发一遍消息
  80. $groupUserList = $this->chatServiceClient->getAllTalkGroupMember(['group_id'=>$result['receiver_id']]);
  81. var_dump($groupUserList['data']);
  82. if($groupUserList['data']){
  83. foreach ($groupUserList['data'] as $val){
  84. $fd = $redisClient->findFd((int)$val['user_id']);
  85. if ($server->isEstablished((int)$fd)) {
  86. $server->push((int)$fd, json_encode($result));
  87. }
  88. }
  89. }
  90. }
  91. }
  92. public function onClose($server, int $fd, int $reactorId): void
  93. {
  94. var_dump('closed::::::::::::::::::',$fd,"======",$reactorId,"+++++++++++");
  95. // $data = [
  96. // 'fd'=>$fd
  97. // ];
  98. // $this->chatServiceClient->delChatChannel($data);
  99. $redisClient = new RedisService();
  100. $userId = $redisClient->findUser((string)$fd);
  101. $redisClient->unbind((string)$fd,(int)$userId);
  102. }
  103. public function onOpen($server, $request): void
  104. {
  105. $token = $request->get['token'];
  106. $userInfo = $this->jwt->getClaimsByToken($token);
  107. $response = (new Response($server))->init($request);
  108. $fd = $response->getFd();
  109. // var_dump("管道ID:",$fd);
  110. // $data = [
  111. // 'user_id'=>$userInfo['uid'],
  112. // 'fd'=>$fd
  113. // ];
  114. // var_dump(SERVER_RUN_ID,"+++++++++++++");
  115. // $this->chatServiceClient->addChatChannel($data);
  116. $server->bind($fd,$userInfo['uid']);
  117. $redisClient = new RedisService();
  118. $redisClient->bind((string)$fd,$userInfo['uid']);
  119. $server->push($request->fd, json_encode([
  120. "event" => "connect",
  121. "content" => [
  122. "ping_interval" => 20,
  123. "ping_timeout" => 20 * 3,
  124. "content" =>"连接成功"
  125. ],
  126. ]));
  127. }
  128. }