WebSocketController.php 15 KB


  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Controller;
  4. use App\Amqp\Producer\MqProducer;
  5. use App\JsonRpc\ChatServiceInterface;
  6. use App\JsonRpc\UserServiceInterface;
  7. use App\Service\Message\ReceiveHandleService;
  8. use App\Service\RedisService;
  9. use Hyperf\Amqp\Producer;
  10. use Hyperf\Context\ApplicationContext as ContextApplicationContext;
  11. use Hyperf\Contract\OnCloseInterface;
  12. use Hyperf\Contract\OnMessageInterface;
  13. use Hyperf\Contract\OnOpenInterface;
  14. use Hyperf\Di\Annotation\Inject;
  15. use Hyperf\Engine\WebSocket\Response;
  16. use Phper666\JWTAuth\JWT;
  17. use swoole\Server;
  18. class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
  19. {
  20. #[Inject]
  21. protected JWT $jwt;
  22. /**
  23. * @var ChatServiceInterface
  24. */
  25. #[Inject]
  26. private $chatServiceClient;
  27. /**
  28. * @var UserServiceInterface
  29. */
  30. #[Inject]
  31. private $userServiceClient;
  32. /**
  33. * @Inject
  34. * @var ReceiveHandleService
  35. */
  36. protected $receiveHandle;
  37. protected $server;
  38. public function onMessage($server, $frame): void
  39. {
  40. //把数据推给前端
  41. $redisClient = new RedisService();
  42. $userId = $redisClient->findUser((string) $frame->fd);
  43. //存入队列
  44. $result = json_decode($frame->data, true);
  45. $result['user_id'] = $userId;
  46. // $show_id = $result['show_id'];
  47. var_dump($result, '-------------1----');
  48. $userInfo = $redisClient->getUserInfo((string) $userId);
  49. var_dump($userInfo, '-------------22----');
  50. if ($userInfo) {
  51. $userInfoArr = json_decode($userInfo);
  52. $result['user_avatar'] = $userInfoArr['avatar'];
  53. $result['user_name'] = $userInfoArr['user_name'];
  54. } else {
  55. $userInfos = $this->userServiceClient->getUserInfo((int) $userId);
  56. var_dump($userInfos, '-------------33----');
  57. $redisClient->setUserInfo($userId, $userInfos['data']);
  58. $result['user_avatar'] = $userInfos['data']['avatar'];
  59. $result['user_name'] = $userInfos['data']['user_name'];
  60. }
  61. $myFriends = $redisClient->getUserFriends((string) $userId);
  62. $myFriendsArr = [];
  63. if ($myFriends) {
  64. $myFriendsArr = json_decode($myFriends);
  65. } else {
  66. $myFriends = $this->chatServiceClient->getFriendsList(['user_id' => $userId, 'status' => 2]);
  67. $redisClient->setUserFriends($userId, $myFriends['data']);
  68. $myFriendsArr = $myFriends['data'];
  69. }
  70. //推送给前台
  71. if (isset($result['type']) && in_array($result['type'], ['offer', 'answer', 'ice-candidate'])) {
  72. // 是视频通话信令,走特殊处理逻辑
  73. $this->handleVideoSignaling($server, $frame, $userId, $result);
  74. return;
  75. }
  76. //组装数据+头像
  77. if (isset($result['talk_type']) && $result['talk_type'] == 1) {
  78. //判断$result['receiver_id']是否是好友
  79. $myFriendsID = array_column($myFriendsArr, 'friend_id');
  80. if (!in_array($result['receiver_id'], $myFriendsID)) {
  81. $myFriends = $this->chatServiceClient->getFriendsList(['user_id' => $userId, 'status' => 2]);
  82. $redisClient->setUserFriends($userId, $myFriends['data']);
  83. $myFriendsArrdata = $myFriends['data'];
  84. $myFriendsArrID = array_column($myFriendsArrdata, 'friend_id');
  85. if (!in_array($result['receiver_id'], $myFriendsArrID)) {
  86. $result['content'] = '您还不是好友,无法发送消息!';
  87. $server->push((int) $frame->fd, json_encode($result));
  88. return;
  89. }
  90. }
  91. //给自己推一条数据
  92. if ($server->isEstablished($frame->fd)) {
  93. $result['is_read'] = 1;
  94. $server->push((int) $frame->fd, json_encode($result));
  95. // 尝试连接
  96. try {
  97. $saiddata = $result;
  98. $saiddata['action'] = 'said';
  99. $message = new MqProducer($saiddata);
  100. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  101. $re = $producer->produce($message);
  102. var_dump('消息发送成功' . $frame->fd,$result);
  103. } catch (\Exception $e) {
  104. var_dump('消息发送失败: ' . $e->getMessage());
  105. $this->retry($message);
  106. }
  107. } else {
  108. //给自己发一条未读消息
  109. try {
  110. $saiddata = $result;
  111. $saiddata['action'] = 'said';
  112. $saiddata['is_read'] = 0;
  113. $message = new MqProducer($saiddata);
  114. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  115. $re = $producer->produce($message);
  116. var_dump('消息发送成功' . $frame->fd);
  117. } catch (\Exception $e) {
  118. var_dump('消息发送失败: ' . $e->getMessage());
  119. $this->retry($message);
  120. }
  121. }
  122. //给好友推送消息
  123. $fd = $redisClient->findFd((int) $result['receiver_id']);
  124. if ($server->isEstablished((int) $fd)) {
  125. $data = $result;
  126. $data['is_read'] = 0;
  127. $server->push((int) $fd, json_encode(value: $data));
  128. var_dump('消息给好友发送成功' . $fd,$data);
  129. // 尝试连接
  130. try {
  131. $chatdata = $result;
  132. $chatdata['action'] = 'recieved';
  133. $chatdata['receiver_id'] = $result['user_id'];
  134. $chatdata['user_id'] = $result['receiver_id'];
  135. // $chatdata['user_name'] = $result['user_name'];
  136. $chatdata['is_read'] = 0;
  137. $message = new MqProducer($chatdata);
  138. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  139. $re = $producer->produce($message);
  140. var_dump('消息发送成功#');
  141. } catch (\Exception $e) {
  142. var_dump('消息发送失败: ' . $e->getMessage());
  143. $this->retry($message);
  144. }
  145. } else {
  146. try {
  147. $chatdata = $result;
  148. $chatdata['action'] = 'recieved';
  149. $chatdata['receiver_id'] = $result['user_id'];
  150. $chatdata['user_id'] = $result['receiver_id'];
  151. // $chatdata['user_name'] = $result['user_name'];
  152. $chatdata['is_read'] = 0;
  153. $message = new MqProducer($chatdata);
  154. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  155. $re = $producer->produce($message);
  156. var_dump('消息发送成功!');
  157. } catch (\Exception $e) {
  158. var_dump('消息发送失败: ' . $e->getMessage());
  159. $this->retry($message);
  160. }
  161. }
  162. } else if (isset($result['talk_type']) && $result['talk_type'] == 2) {
  163. //根据群找到 群用户,群发一遍消息
  164. $groupUserList = $this->chatServiceClient->getGroupMembers(['group_id' => $result['receiver_id']]);
  165. var_dump($groupUserList['data'], '-----------------############-----------------');
  166. if ($groupUserList['data']) {
  167. $chatdata = $result;
  168. foreach ($groupUserList['data'] as $val) {
  169. $fd = $redisClient->findFd((int) $val['user_id']);
  170. if ($server->isEstablished((int) $fd)) {
  171. var_dump($fd, $val['user_id']);
  172. $server->push((int) $fd, json_encode($result));
  173. var_dump($val, '-------------44----');
  174. try {
  175. //分发说,只记录自己对自己
  176. if ($result['user_id'] == $val['user_id']) {
  177. $chatdata['receiver_id'] = $result['receiver_id'];
  178. $chatdata['user_id'] = $result['user_id'];
  179. $chatdata['group_receiver_id'] = $val['user_id'];
  180. $chatdata['is_read'] = 1;
  181. $chatdata['action'] = 'said';
  182. $chatdata['group_name'] = $val['group_name']??'';
  183. $message = new MqProducer($chatdata);
  184. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  185. $re = $producer->produce($message);
  186. }
  187. //分发接收,不记录自己
  188. if ($chatdata['user_id'] != $val['user_id']) {
  189. $chatdata['receiver_id'] = $result['receiver_id'];
  190. $chatdata['user_id'] = $val['user_id'];
  191. $chatdata['group_receiver_id'] = $result['user_id'];
  192. $chatdata['is_read'] = 0;
  193. $chatdata['action'] = 'recieved';
  194. $chatdata['group_name'] = $val['group_name']??'';
  195. $message = new MqProducer($chatdata);
  196. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  197. $re = $producer->produce($message);
  198. var_dump('消息发送成功@');
  199. }
  200. } catch (\Exception $e) {
  201. var_dump('消息发送失败: ' . $e->getMessage());
  202. }
  203. } else {
  204. try {
  205. //分发说,只记录自己对自己
  206. if ($result['user_id'] == $val['user_id']) {
  207. $chatdata['receiver_id'] = $result['receiver_id'];
  208. $chatdata['user_id'] = $result['user_id'];
  209. $chatdata['group_receiver_id'] = $val['user_id'];
  210. $chatdata['is_read'] = 0;
  211. $chatdata['action'] = 'said';
  212. $chatdata['group_name'] = $val['group_name']??'';
  213. $message = new MqProducer($chatdata);
  214. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  215. $re = $producer->produce($message);
  216. }
  217. //分发接收,不记录自己
  218. if ($chatdata['user_id'] != $val['user_id']) {
  219. $chatdata['receiver_id'] = $result['receiver_id'];
  220. $chatdata['user_id'] = $val['user_id'];
  221. $chatdata['group_receiver_id'] = $result['user_id'];
  222. $chatdata['is_read'] = 0;
  223. $chatdata['action'] = 'recieved';
  224. $chatdata['group_name'] = $val['group_name']??'';
  225. $message = new MqProducer($chatdata);
  226. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  227. $re = $producer->produce($message);
  228. var_dump('消息发送成功$');
  229. }
  230. } catch (\Exception $e) {
  231. var_dump('消息发送失败: ' . $e->getMessage());
  232. }
  233. }
  234. }
  235. }
  236. }
  237. }
  238. protected function handleVideoSignaling($server, $frame, $userId, $message)
  239. {
  240. $redisClient = new RedisService();
  241. $receiverId = $message['receiver_id'] ?? null;
  242. if (!$receiverId) {
  243. $server->push($frame->fd, json_encode(['error' => 'Missing receiver_id']));
  244. return;
  245. }
  246. // 获取接收方 fd
  247. $fd = $redisClient->findFd((int)$receiverId);
  248. // 判断接收者是否在线
  249. if ($server->isEstablished((int)$fd)) {
  250. // 转发消息给接收方
  251. $server->push((int)$fd, json_encode($message));
  252. } else {
  253. // 可选:缓存或通知对方不在线
  254. $server->push($frame->fd, json_encode([
  255. 'type' => 'offline',
  256. 'receiver_id' => $receiverId,
  257. 'content' => '对方不在线'
  258. ]));
  259. }
  260. }
  261. public function retry($message): void
  262. {
  263. $maxRetries = 10; // 最大重试次数
  264. $retryCount = 0;
  265. while ($retryCount < $maxRetries) {
  266. try {
  267. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  268. $re = $producer->produce($message);
  269. var_dump('重试消息发送成功');
  270. break;
  271. } catch (\Exception $e) {
  272. $retryCount++;
  273. if ($retryCount >= $maxRetries) {
  274. var_dump('达到最大重试次数,消息发送失败: ' . $e->getMessage());
  275. } else {
  276. var_dump('第 ' . $retryCount . ' 次重试失败: ' . $e->getMessage());
  277. }
  278. }
  279. }
  280. }
  281. public function onClose($server, int $fd, int $reactorId): void
  282. {
  283. // var_dump('closed::::::::::::::::::', $fd, "======", $reactorId, "+++++++++++");
  284. $redisClient = new RedisService();
  285. $userId = $redisClient->findUser((string) $fd);
  286. $redisClient->unbind((string) $fd, (int) $userId);
  287. }
  288. public function onOpen($server, $request): void
  289. {
  290. $token = $request->get['token'];
  291. $userInfo = $this->jwt->getClaimsByToken($token);
  292. $response = (new Response($server))->init($request);
  293. $fd = $response->getFd();
  294. $server->bind($fd, $userInfo['uid']);
  295. $redisClient = new RedisService();
  296. $redisClient->bind((string) $fd, $userInfo['uid']);
  297. $server->push($request->fd, json_encode([
  298. "event" => "connect",
  299. "content" => [
  300. "ping_interval" => 20,
  301. "ping_timeout" => 20 * 3,
  302. "content" => "连接成功",
  303. "fd" => $fd,
  304. "user_id" => $userInfo['uid'],
  305. ],
  306. ]));
  307. }
  308. }