App_Controller_WebSocketController.proxy.php 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. <?php
  2. declare (strict_types=1);
  3. namespace App\Controller;
  4. use App\Amqp\Producer\MqProducer;
  5. use Hyperf\Amqp\Producer;
  6. use Hyperf\Context\ApplicationContext as ContextApplicationContext;
  7. use Hyperf\Contract\OnCloseInterface;
  8. use Hyperf\Contract\OnMessageInterface;
  9. use Hyperf\Contract\OnOpenInterface;
  10. use Hyperf\Engine\WebSocket\Frame;
  11. use Hyperf\Engine\WebSocket\Response;
  12. use Hyperf\WebSocketServer\Annotation\MessageHandler;
  13. use Hyperf\WebSocketServer\Context\WebSocketContext;
  14. use Hyperf\WebSocketServer\Message\Text;
  15. use Hyperf\Di\Annotation\Inject;
  16. use Phper666\JWTAuth\JWT;
  17. use App\JsonRpc\ChatServiceInterface;
  18. use Hyperf\WebSocketServer\Constant\Opcode;
  19. use App\Service\RedisService;
  20. use App\Service\Message\ReceiveHandleService;
  21. use http\Client\Request;
  22. use App\Controller\AbstractController;
  23. use App\JsonRpc\UserServiceInterface;
  24. class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
  25. {
  26. use \Hyperf\Di\Aop\ProxyTrait;
  27. use \Hyperf\Di\Aop\PropertyHandlerTrait;
  28. function __construct()
  29. {
  30. $this->__handlePropertyHandler(__CLASS__);
  31. }
  32. #[Inject]
  33. protected JWT $jwt;
  34. /**
  35. * @var ChatServiceInterface
  36. */
  37. #[Inject]
  38. private $chatServiceClient;
  39. /**
  40. * @var UserServiceInterface
  41. */
  42. #[Inject]
  43. private $userServiceClient;
  44. /**
  45. * @Inject
  46. * @var ReceiveHandleService
  47. */
  48. protected $receiveHandle;
  49. public function onMessage($server, $frame) : void
  50. {
  51. //把数据推给前端
  52. $redisClient = new RedisService();
  53. $userId = $redisClient->findUser((string) $frame->fd);
  54. //存入队列
  55. $result = json_decode($frame->data, true);
  56. $result['user_id'] = $userId;
  57. $userInfo = $redisClient->getUserInfo($userId);
  58. if ($userInfo) {
  59. $userInfoArr = json_decode($userInfo);
  60. $result['user_avatar'] = $userInfoArr['avatar'];
  61. } else {
  62. $userInfos = $this->userServiceClient->getUserInfo((int) $userId);
  63. $redisClient->setUserInfo($userId, $userInfos['data']);
  64. $result['user_avatar'] = $userInfos['data']['avatar'];
  65. }
  66. var_dump("接收到的数据:", $result);
  67. $message = new MqProducer($result);
  68. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  69. $producer->produce($message);
  70. //推送给前台
  71. //组装数据+头像
  72. if ($result['talk_type'] == 1) {
  73. //给自己推一条数据
  74. if ($server->isEstablished($frame->fd)) {
  75. $server->push($frame->fd, json_encode($result));
  76. }
  77. //给好友推送消息
  78. $fd = $redisClient->findFd((int) $result['receiver_id']);
  79. // $result['user_id'] = $result['friend_id'];
  80. if ($server->isEstablished((int) $fd)) {
  81. $b = $server->push((int) $fd, json_encode($result));
  82. var_dump("推送成功:", $b);
  83. }
  84. } else {
  85. if ($result['talk_type'] == 2) {
  86. //根据群找到 群用户,群发一遍消息
  87. $groupUserList = $this->chatServiceClient->getAllTalkGroupMember(['group_id' => $result['receiver_id']]);
  88. var_dump($groupUserList['data']);
  89. if ($groupUserList['data']) {
  90. foreach ($groupUserList['data'] as $val) {
  91. $fd = $redisClient->findFd((int) $val['user_id']);
  92. if ($server->isEstablished((int) $fd)) {
  93. $server->push((int) $fd, json_encode($result));
  94. }
  95. }
  96. }
  97. }
  98. }
  99. }
  100. public function onClose($server, int $fd, int $reactorId) : void
  101. {
  102. var_dump('closed::::::::::::::::::', $fd, "======", $reactorId, "+++++++++++");
  103. // $data = [
  104. // 'fd'=>$fd
  105. // ];
  106. // $this->chatServiceClient->delChatChannel($data);
  107. $redisClient = new RedisService();
  108. $userId = $redisClient->findUser((string) $fd);
  109. $redisClient->unbind((string) $fd, (int) $userId);
  110. }
  111. public function onOpen($server, $request) : void
  112. {
  113. $token = $request->get['token'];
  114. $userInfo = $this->jwt->getClaimsByToken($token);
  115. $response = (new Response($server))->init($request);
  116. $fd = $response->getFd();
  117. // var_dump("管道ID:",$fd);
  118. // $data = [
  119. // 'user_id'=>$userInfo['uid'],
  120. // 'fd'=>$fd
  121. // ];
  122. // var_dump(SERVER_RUN_ID,"+++++++++++++");
  123. // $this->chatServiceClient->addChatChannel($data);
  124. $server->bind($fd, $userInfo['uid']);
  125. $redisClient = new RedisService();
  126. $redisClient->bind((string) $fd, $userInfo['uid']);
  127. $server->push($request->fd, json_encode(["event" => "connect", "content" => ["ping_interval" => 20, "ping_timeout" => 20 * 3, "content" => "连接成功"]]));
  128. }
  129. }