WebSocketController.php 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Controller;
  4. use App\Amqp\Producer\MqProducer;
  5. use App\JsonRpc\ChatServiceInterface;
  6. use App\JsonRpc\UserServiceInterface;
  7. use App\Service\Message\ReceiveHandleService;
  8. use App\Service\RedisService;
  9. use Hyperf\Amqp\Producer;
  10. use Hyperf\Context\ApplicationContext as ContextApplicationContext;
  11. use Hyperf\Contract\OnCloseInterface;
  12. use Hyperf\Contract\OnMessageInterface;
  13. use Hyperf\Contract\OnOpenInterface;
  14. use Hyperf\Di\Annotation\Inject;
  15. use Hyperf\Engine\WebSocket\Response;
  16. use Phper666\JWTAuth\JWT;
  17. use swoole\Server;
  18. class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
  19. {
  20. #[Inject]
  21. protected JWT $jwt;
  22. /**
  23. * @var ChatServiceInterface
  24. */
  25. #[Inject]
  26. private $chatServiceClient;
  27. /**
  28. * @var UserServiceInterface
  29. */
  30. #[Inject]
  31. private $userServiceClient;
  32. /**
  33. * @Inject
  34. * @var ReceiveHandleService
  35. */
  36. protected $receiveHandle;
  37. protected $server;
  38. public function onMessage($server, $frame): void
  39. {
  40. //把数据推给前端
  41. $redisClient = new RedisService();
  42. $userId = $redisClient->findUser((string) $frame->fd);
  43. //存入队列
  44. $result = json_decode($frame->data, true);
  45. $result['user_id'] = $userId;
  46. // $show_id = $result['show_id'];
  47. var_dump($result, '-------------1----');
  48. $userInfo = $redisClient->getUserInfo((string) $userId);
  49. if ($userInfo) {
  50. $userInfoArr = json_decode($userInfo);
  51. $result['user_avatar'] = $userInfoArr['avatar'];
  52. } else {
  53. $userInfos = $this->userServiceClient->getUserInfo((int) $userId);
  54. $redisClient->setUserInfo($userId, $userInfos['data']);
  55. $result['user_avatar'] = $userInfos['data']['avatar'];
  56. }
  57. $myFriends = $redisClient->getUserFriends((string) $userId);
  58. $myFriendsArr = [];
  59. if ($myFriends) {
  60. $myFriendsArr = json_decode($myFriends);
  61. } else {
  62. $myFriends = $this->chatServiceClient->getFriendsList(['user_id' => $userId, 'status' => 2]);
  63. $redisClient->setUserFriends($userId, $myFriends['data']);
  64. $myFriendsArr = $myFriends['data'];
  65. }
  66. //推送给前台
  67. if (isset($result['type']) && in_array($result['type'], ['offer', 'answer', 'ice-candidate'])) {
  68. // 是视频通话信令,走特殊处理逻辑
  69. $this->handleVideoSignaling($server, $frame, $userId, $result);
  70. return;
  71. }
  72. //组装数据+头像
  73. if ($result['talk_type'] == 1) {
  74. //判断$result['receiver_id']是否是好友
  75. $myFriendsID = array_column($myFriendsArr, 'friend_id');
  76. if (!in_array($result['receiver_id'], $myFriendsID)) {
  77. $myFriends = $this->chatServiceClient->getFriendsList(['user_id' => $userId, 'status' => 2]);
  78. $redisClient->setUserFriends($userId, $myFriends['data']);
  79. $myFriendsArrdata = $myFriends['data'];
  80. $myFriendsArrID = array_column($myFriendsArrdata, 'friend_id');
  81. if (!in_array($result['receiver_id'], $myFriendsArrID)) {
  82. $result['content'] = '您还不是好友,无法发送消息!';
  83. $server->push((int) $frame->fd, json_encode($result));
  84. return;
  85. }
  86. }
  87. //给自己推一条数据
  88. if ($server->isEstablished($frame->fd)) {
  89. $result['is_read'] = 1;
  90. $server->push((int) $frame->fd, json_encode($result));
  91. // 尝试连接
  92. try {
  93. $saiddata = $result;
  94. $saiddata['action'] = 'said';
  95. $message = new MqProducer($saiddata);
  96. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  97. $re = $producer->produce($message);
  98. var_dump('消息发送成功' . $frame->fd);
  99. } catch (\Exception $e) {
  100. var_dump('消息发送失败: ' . $e->getMessage());
  101. $this->retry($message);
  102. }
  103. } else {
  104. //给自己发一条未读消息
  105. try {
  106. $saiddata = $result;
  107. $saiddata['action'] = 'said';
  108. $saiddata['is_read'] = 0;
  109. $message = new MqProducer($saiddata);
  110. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  111. $re = $producer->produce($message);
  112. var_dump('消息发送成功' . $frame->fd);
  113. } catch (\Exception $e) {
  114. var_dump('消息发送失败: ' . $e->getMessage());
  115. $this->retry($message);
  116. }
  117. }
  118. //给好友推送消息
  119. $fd = $redisClient->findFd((int) $result['receiver_id']);
  120. if ($server->isEstablished((int) $fd)) {
  121. $data = $result;
  122. $data['is_read'] = 0;
  123. $server->push((int) $fd, json_encode(value: $data));
  124. var_dump('消息给好友发送成功' . $fd);
  125. // 尝试连接
  126. try {
  127. $chatdata = $result;
  128. $chatdata['action'] = 'recieved';
  129. $chatdata['receiver_id'] = $result['user_id'];
  130. $chatdata['user_id'] = $result['receiver_id'];
  131. $chatdata['is_read'] = 0;
  132. $message = new MqProducer($chatdata);
  133. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  134. $re = $producer->produce($message);
  135. var_dump('消息发送成功');
  136. } catch (\Exception $e) {
  137. var_dump('消息发送失败: ' . $e->getMessage());
  138. $this->retry($message);
  139. }
  140. } else {
  141. try {
  142. $chatdata = $result;
  143. $chatdata['action'] = 'recieved';
  144. $chatdata['receiver_id'] = $result['user_id'];
  145. $chatdata['user_id'] = $result['receiver_id'];
  146. $chatdata['is_read'] = 0;
  147. $message = new MqProducer($chatdata);
  148. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  149. $re = $producer->produce($message);
  150. var_dump('消息发送成功');
  151. } catch (\Exception $e) {
  152. var_dump('消息发送失败: ' . $e->getMessage());
  153. $this->retry($message);
  154. }
  155. }
  156. } else if ($result['talk_type'] == 2) {
  157. //根据群找到 群用户,群发一遍消息
  158. $groupUserList = $this->chatServiceClient->getGroupMembers(['group_id' => $result['receiver_id']]);
  159. var_dump($groupUserList['data'], '----------------------------------');
  160. if ($groupUserList['data']) {
  161. $chatdata = $result;
  162. foreach ($groupUserList['data'] as $val) {
  163. $fd = $redisClient->findFd((int) $val['user_id']);
  164. if ($server->isEstablished((int) $fd)) {
  165. var_dump($fd, $val['user_id']);
  166. $server->push((int) $fd, json_encode($result));
  167. try {
  168. //分发说,只记录自己对自己
  169. if ($result['user_id'] == $val['user_id']) {
  170. $chatdata['receiver_id'] = $result['receiver_id'];
  171. $chatdata['user_id'] = $result['user_id'];
  172. $chatdata['group_receiver_id'] = $val['user_id'];
  173. $chatdata['is_read'] = 1;
  174. $chatdata['action'] = 'said';
  175. $message = new MqProducer($chatdata);
  176. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  177. $re = $producer->produce($message);
  178. }
  179. //分发接收,不记录自己
  180. if ($chatdata['user_id'] != $val['user_id']) {
  181. $chatdata['receiver_id'] = $result['receiver_id'];
  182. $chatdata['user_id'] = $val['user_id'];
  183. $chatdata['group_receiver_id'] = $result['user_id'];
  184. $chatdata['is_read'] = 0;
  185. $chatdata['action'] = 'recieved';
  186. $message = new MqProducer($chatdata);
  187. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  188. $re = $producer->produce($message);
  189. var_dump('消息发送成功');
  190. }
  191. } catch (\Exception $e) {
  192. var_dump('消息发送失败: ' . $e->getMessage());
  193. }
  194. } else {
  195. try {
  196. //分发说,只记录自己对自己
  197. if ($result['user_id'] == $val['user_id']) {
  198. $chatdata['receiver_id'] = $result['receiver_id'];
  199. $chatdata['user_id'] = $result['user_id'];
  200. $chatdata['group_receiver_id'] = $val['user_id'];
  201. $chatdata['is_read'] = 0;
  202. $chatdata['action'] = 'said';
  203. $message = new MqProducer($chatdata);
  204. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  205. $re = $producer->produce($message);
  206. }
  207. //分发接收,不记录自己
  208. if ($chatdata['user_id'] != $val['user_id']) {
  209. $chatdata['receiver_id'] = $result['receiver_id'];
  210. $chatdata['user_id'] = $val['user_id'];
  211. $chatdata['group_receiver_id'] = $result['user_id'];
  212. $chatdata['is_read'] = 0;
  213. $chatdata['action'] = 'recieved';
  214. $message = new MqProducer($chatdata);
  215. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  216. $re = $producer->produce($message);
  217. var_dump('消息发送成功');
  218. }
  219. } catch (\Exception $e) {
  220. var_dump('消息发送失败: ' . $e->getMessage());
  221. }
  222. }
  223. }
  224. }
  225. }
  226. }
  227. protected function handleVideoSignaling($server, $frame, $userId, $message)
  228. {
  229. $redisClient = new RedisService();
  230. $receiverId = $message['receiver_id'] ?? null;
  231. if (!$receiverId) {
  232. $server->push($frame->fd, json_encode(['error' => 'Missing receiver_id']));
  233. return;
  234. }
  235. // 获取接收方 fd
  236. $fd = $redisClient->findFd((int)$receiverId);
  237. // 判断接收者是否在线
  238. if ($server->isEstablished((int)$fd)) {
  239. // 转发消息给接收方
  240. $server->push((int)$fd, json_encode($message));
  241. } else {
  242. // 可选:缓存或通知对方不在线
  243. $server->push($frame->fd, json_encode([
  244. 'type' => 'offline',
  245. 'receiver_id' => $receiverId,
  246. 'content' => '对方不在线'
  247. ]));
  248. }
  249. }
  250. public function retry($message): void
  251. {
  252. $maxRetries = 10; // 最大重试次数
  253. $retryCount = 0;
  254. while ($retryCount < $maxRetries) {
  255. try {
  256. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  257. $re = $producer->produce($message);
  258. var_dump('重试消息发送成功');
  259. break;
  260. } catch (\Exception $e) {
  261. $retryCount++;
  262. if ($retryCount >= $maxRetries) {
  263. var_dump('达到最大重试次数,消息发送失败: ' . $e->getMessage());
  264. } else {
  265. var_dump('第 ' . $retryCount . ' 次重试失败: ' . $e->getMessage());
  266. }
  267. }
  268. }
  269. }
  270. public function onClose($server, int $fd, int $reactorId): void
  271. {
  272. var_dump('closed::::::::::::::::::', $fd, "======", $reactorId, "+++++++++++");
  273. $redisClient = new RedisService();
  274. $userId = $redisClient->findUser((string) $fd);
  275. $redisClient->unbind((string) $fd, (int) $userId);
  276. }
  277. public function onOpen($server, $request): void
  278. {
  279. $token = $request->get['token'];
  280. $userInfo = $this->jwt->getClaimsByToken($token);
  281. $response = (new Response($server))->init($request);
  282. $fd = $response->getFd();
  283. $server->bind($fd, $userInfo['uid']);
  284. $redisClient = new RedisService();
  285. $redisClient->bind((string) $fd, $userInfo['uid']);
  286. $server->push($request->fd, json_encode([
  287. "event" => "connect",
  288. "content" => [
  289. "ping_interval" => 20,
  290. "ping_timeout" => 20 * 3,
  291. "content" => "连接成功",
  292. "fd" => $fd,
  293. "user_id" => $userInfo['uid'],
  294. ],
  295. ]));
  296. }
  297. }