WebSocketController.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. <?php
  2. declare (strict_types = 1);
  3. namespace App\Controller;
  4. use App\Amqp\Producer\MqProducer;
  5. use App\JsonRpc\ChatServiceInterface;
  6. use App\JsonRpc\UserServiceInterface;
  7. use App\Service\Message\ReceiveHandleService;
  8. use App\Service\RedisService;
  9. use Hyperf\Amqp\Producer;
  10. use Hyperf\Context\ApplicationContext as ContextApplicationContext;
  11. use Hyperf\Contract\OnCloseInterface;
  12. use Hyperf\Contract\OnMessageInterface;
  13. use Hyperf\Contract\OnOpenInterface;
  14. use Hyperf\Di\Annotation\Inject;
  15. use Hyperf\Engine\WebSocket\Response;
  16. use Phper666\JWTAuth\JWT;
  17. class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
  18. {
  19. #[Inject]
  20. protected JWT $jwt;
  21. /**
  22. * @var ChatServiceInterface
  23. */
  24. #[Inject]
  25. private $chatServiceClient;
  26. /**
  27. * @var UserServiceInterface
  28. */
  29. #[Inject]
  30. private $userServiceClient;
  31. /**
  32. * @Inject
  33. * @var ReceiveHandleService
  34. */
  35. protected $receiveHandle;
  36. public function onMessage($server, $frame): void
  37. {
  38. //把数据推给前端
  39. $redisClient = new RedisService();
  40. $userId = $redisClient->findUser((string) $frame->fd);
  41. //存入队列
  42. $result = json_decode($frame->data, true);
  43. $result['user_id'] = $userId;
  44. $userInfo = $redisClient->getUserInfo((string) $userId);
  45. if ($userInfo) {
  46. $userInfoArr = json_decode($userInfo);
  47. $result['user_avatar'] = $userInfoArr['avatar'];
  48. } else {
  49. $userInfos = $this->userServiceClient->getUserInfo((int) $userId);
  50. $redisClient->setUserInfo($userId, $userInfos['data']);
  51. $result['user_avatar'] = $userInfos['data']['avatar'];
  52. }
  53. $myFriends = $redisClient->getUserFriends((string) $userId);
  54. $myFriendsArr = [];
  55. if ($myFriends) {
  56. $myFriendsArr = json_decode($myFriends);
  57. } else {
  58. $myFriends = $this->chatServiceClient->getFriendsList(['user_id' => $userId, 'status' => 2]);
  59. $redisClient->setUserFriends($userId, $myFriends['data']);
  60. $myFriendsArr = $myFriends['data'];
  61. }
  62. //判断$result['receiver_id']是否是好友
  63. $myFriendsID = array_column($myFriendsArr, 'friend_id');
  64. if (!in_array($result['receiver_id'], $myFriendsID)) {
  65. $result['content'] = '您还不是好友,无法发送消息!';
  66. $server->push((int) $frame->fd, json_encode($result));
  67. return;
  68. }
  69. //推送给前台
  70. //组装数据+头像
  71. if ($result['talk_type'] == 1) {
  72. //给自己推一条数据
  73. if ($server->isEstablished($frame->fd)) {
  74. $result['is_read'] = 1;
  75. $server->push((int) $frame->fd, json_encode($result));
  76. // 尝试连接
  77. try {
  78. $saiddata = $result;
  79. $saiddata['action'] = 'said';
  80. $message = new MqProducer($saiddata);
  81. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  82. $re = $producer->produce($message);
  83. var_dump('消息发送成功' . $frame->fd);
  84. } catch (\Exception $e) {
  85. var_dump('消息发送失败: ' . $e->getMessage());
  86. }
  87. } else {
  88. //给自己发一条未读消息
  89. try {
  90. $saiddata = $result;
  91. $saiddata['action'] = 'said';
  92. $saiddata['is_read'] = 0;
  93. $message = new MqProducer($saiddata);
  94. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  95. $re = $producer->produce($message);
  96. var_dump('消息发送成功' . $frame->fd);
  97. } catch (\Exception $e) {
  98. var_dump('消息发送失败: ' . $e->getMessage());
  99. }
  100. }
  101. //给好友推送消息
  102. $fd = $redisClient->findFd((int) $result['receiver_id']);
  103. if ($server->isEstablished((int) $fd)) {
  104. $data = $result;
  105. $data['is_read'] = 1;
  106. $server->push((int) $fd, json_encode(value: $data));
  107. var_dump('消息给好友发送成功' . $fd);
  108. // 尝试连接
  109. try {
  110. $chatdata = $result;
  111. $chatdata['action'] = 'recieved';
  112. $chatdata['receiver_id'] = $result['user_id'];
  113. $chatdata['user_id'] = $result['receiver_id'];
  114. $message = new MqProducer($chatdata);
  115. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  116. $re = $producer->produce($message);
  117. var_dump('消息发送成功');
  118. } catch (\Exception $e) {
  119. var_dump('消息发送失败: ' . $e->getMessage());
  120. }
  121. } else {
  122. try {
  123. $chatdata = $result;
  124. $chatdata['action'] = 'recieved';
  125. $chatdata['receiver_id'] = $result['user_id'];
  126. $chatdata['user_id'] = $result['receiver_id'];
  127. $chatdata['is_read'] = 0;
  128. $message = new MqProducer($chatdata);
  129. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  130. $re = $producer->produce($message);
  131. var_dump('消息发送成功');
  132. } catch (\Exception $e) {
  133. var_dump('消息发送失败: ' . $e->getMessage());
  134. }
  135. }
  136. } else if ($result['talk_type'] == 2) {
  137. //根据群找到 群用户,群发一遍消息
  138. $groupUserList = $this->chatServiceClient->getGroupMembers(['group_id' => $result['receiver_id']]);
  139. var_dump($groupUserList['data']);
  140. if ($groupUserList['data']) {
  141. $chatdata = $result;
  142. foreach ($groupUserList['data'] as $val) {
  143. $fd = $redisClient->findFd((int) $val['user_id']);
  144. if ($server->isEstablished((int) $fd)) {
  145. var_dump($fd, $val['user_id']);
  146. $server->push((int) $fd, json_encode($result));
  147. try {
  148. //分发说,只记录自己对自己
  149. if ($result['user_id'] == $val['user_id']) {
  150. $chatdata['receiver_id'] = $result['receiver_id'];
  151. $chatdata['user_id'] = $result['user_id'];
  152. $chatdata['group_receiver_id'] = $val['user_id'];
  153. $chatdata['is_read'] = 1;
  154. $chatdata['action'] = 'said';
  155. $message = new MqProducer($chatdata);
  156. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  157. $re = $producer->produce($message);
  158. }
  159. //分发接收,不记录自己
  160. if ($chatdata['user_id'] != $val['user_id']) {
  161. $chatdata['receiver_id'] = $result['receiver_id'];
  162. $chatdata['user_id'] = $val['user_id'];
  163. $chatdata['group_receiver_id'] = $result['user_id'];
  164. $chatdata['is_read'] = 1;
  165. $chatdata['action'] = 'recieved';
  166. $message = new MqProducer($chatdata);
  167. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  168. $re = $producer->produce($message);
  169. var_dump('消息发送成功');
  170. }
  171. } catch (\Exception $e) {
  172. var_dump('消息发送失败: ' . $e->getMessage());
  173. }
  174. } else {
  175. try {
  176. //分发说,只记录自己对自己
  177. if ($result['user_id'] == $val['user_id']) {
  178. $chatdata['receiver_id'] = $result['receiver_id'];
  179. $chatdata['user_id'] = $result['user_id'];
  180. $chatdata['group_receiver_id'] = $val['user_id'];
  181. $chatdata['is_read'] = 0;
  182. $chatdata['action'] = 'said';
  183. $message = new MqProducer($chatdata);
  184. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  185. $re = $producer->produce($message);
  186. }
  187. //分发接收,不记录自己
  188. if ($chatdata['user_id'] != $val['user_id']) {
  189. $chatdata['receiver_id'] = $result['receiver_id'];
  190. $chatdata['user_id'] = $val['user_id'];
  191. $chatdata['group_receiver_id'] = $result['user_id'];
  192. $chatdata['is_read'] = 0;
  193. $chatdata['action'] = 'recieved';
  194. $message = new MqProducer($chatdata);
  195. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  196. $re = $producer->produce($message);
  197. var_dump('消息发送成功');
  198. }
  199. } catch (\Exception $e) {
  200. var_dump('消息发送失败: ' . $e->getMessage());
  201. }
  202. }
  203. }
  204. }
  205. }
  206. }
  207. public function onClose($server, int $fd, int $reactorId): void
  208. {
  209. var_dump('closed::::::::::::::::::', $fd, "======", $reactorId, "+++++++++++");
  210. $redisClient = new RedisService();
  211. $userId = $redisClient->findUser((string) $fd);
  212. $redisClient->unbind((string) $fd, (int) $userId);
  213. }
  214. public function onOpen($server, $request): void
  215. {
  216. $token = $request->get['token'];
  217. $userInfo = $this->jwt->getClaimsByToken($token);
  218. $response = (new Response($server))->init($request);
  219. $fd = $response->getFd();
  220. $server->bind($fd, $userInfo['uid']);
  221. $redisClient = new RedisService();
  222. $redisClient->bind((string) $fd, $userInfo['uid']);
  223. $server->push($request->fd, json_encode([
  224. "event" => "connect",
  225. "content" => [
  226. "ping_interval" => 20,
  227. "ping_timeout" => 20 * 3,
  228. "content" => "连接成功",
  229. "fd" => $fd,
  230. "user_id" => $userInfo['uid'],
  231. ],
  232. ]));
  233. }
  234. }