WebSocketController.php 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Controller;
  4. use App\Amqp\Producer\MqProducer;
  5. use App\JsonRpc\ChatServiceInterface;
  6. use App\JsonRpc\UserServiceInterface;
  7. use App\Service\Message\ReceiveHandleService;
  8. use App\Service\RedisService;
  9. use Hyperf\Amqp\Producer;
  10. use Hyperf\Context\ApplicationContext as ContextApplicationContext;
  11. use Hyperf\Contract\OnCloseInterface;
  12. use Hyperf\Contract\OnMessageInterface;
  13. use Hyperf\Contract\OnOpenInterface;
  14. use Hyperf\Di\Annotation\Inject;
  15. use Hyperf\Engine\WebSocket\Response;
  16. use Hyperf\Redis\Redis;
  17. use Phper666\JWTAuth\JWT;
  18. use swoole\Server;
  19. use Hyperf\Coroutine\Coroutine;
  20. class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
  21. {
  22. #[Inject]
  23. protected JWT $jwt;
  24. #[Inject]
  25. private ChatServiceInterface $chatServiceClient;
  26. #[Inject]
  27. private UserServiceInterface $userServiceClient;
  28. #[Inject]
  29. protected ReceiveHandleService $receiveHandle;
  30. #[Inject]
  31. protected Redis $redis;
  32. protected $server;
  33. // 持久连接配置
  34. private const HEARTBEAT_INTERVAL = 30; // 心跳间隔(秒)
  35. private const CONNECTION_TIMEOUT = 90; // 连接超时(秒)
  36. private const MAX_RECONNECT_ATTEMPTS = 5; // 最大重连次数
  37. // 存储连接信息
  38. private static $connections = [];
  39. private static $heartbeatTasks = [];
  40. public function onMessage($server, $frame): void
  41. {
  42. $redisClient = new RedisService();
  43. $userId = $redisClient->findUser((string) $frame->fd);
  44. // 处理心跳消息
  45. if ($frame->data === 'ping') {
  46. $server->push($frame->fd, 'pong');
  47. $this->updateLastActivity($frame->fd);
  48. return;
  49. }
  50. if ($frame->data === 'pong') {
  51. $this->updateLastActivity($frame->fd);
  52. $this->redis->hSet('websocket:heartbeat', (string)$frame->fd, time());
  53. return;
  54. }
  55. // 处理重连请求
  56. if (strpos($frame->data, 'reconnect') === 0) {
  57. $this->handleReconnect($server, $frame);
  58. return;
  59. }
  60. //存入队列
  61. $result = json_decode($frame->data, true);
  62. $result['user_id'] = $userId;
  63. // 更新最后活动时间
  64. $this->updateLastActivity($frame->fd);
  65. // $show_id = $result['show_id'];
  66. // var_dump($result, '-------------1----');
  67. $userInfo = $redisClient->getUserInfo((string) $userId);
  68. // var_dump($userInfo, '-------------22----');
  69. if ($userInfo) {
  70. $userInfoArr = json_decode($userInfo);
  71. $result['user_avatar'] = $userInfoArr['avatar'];
  72. $result['user_name'] = $userInfoArr['user_name'];
  73. } else {
  74. $userInfos = $this->userServiceClient->getUserInfo((int) $userId);
  75. // var_dump($userInfos, '-------------33----');
  76. $redisClient->setUserInfo($userId, $userInfos['data']);
  77. $result['user_avatar'] = $userInfos['data']['avatar'];
  78. $result['user_name'] = $userInfos['data']['user_name'];
  79. }
  80. $myFriends = $redisClient->getUserFriends((string) $userId);
  81. $myFriendsArr = [];
  82. if ($myFriends) {
  83. $myFriendsArr = json_decode($myFriends);
  84. } else {
  85. $myFriends = $this->chatServiceClient->getFriendsList(['user_id' => $userId, 'status' => 2]);
  86. $redisClient->setUserFriends($userId, $myFriends['data']);
  87. $myFriendsArr = $myFriends['data'];
  88. }
  89. //推送给前台
  90. if (isset($result['type']) && in_array($result['type'], ['offer', 'answer', 'ice-candidate'])) {
  91. // 是视频通话信令,走特殊处理逻辑
  92. $this->handleVideoSignaling($server, $frame, $userId, $result);
  93. return;
  94. }
  95. //组装数据+头像
  96. if (isset($result['talk_type']) && $result['talk_type'] == 1) {
  97. //判断$result['receiver_id']是否是好友
  98. $myFriendsID = array_column($myFriendsArr, 'friend_id');
  99. if (!in_array($result['receiver_id'], $myFriendsID)) {
  100. $myFriends = $this->chatServiceClient->getFriendsList(['user_id' => $userId, 'status' => 2]);
  101. $redisClient->setUserFriends($userId, $myFriends['data']);
  102. $myFriendsArrdata = $myFriends['data'];
  103. $myFriendsArrID = array_column($myFriendsArrdata, 'friend_id');
  104. if (!in_array($result['receiver_id'], $myFriendsArrID)) {
  105. $result['content'] = '您还不是好友,无法发送消息!';
  106. $server->push((int) $frame->fd, json_encode($result));
  107. return;
  108. }
  109. }
  110. //给自己推一条数据
  111. if ($server->isEstablished($frame->fd)) {
  112. $result['is_read'] = 1;
  113. $server->push((int) $frame->fd, json_encode($result));
  114. // 尝试连接
  115. try {
  116. $saiddata = $result;
  117. $saiddata['action'] = 'said';
  118. $message = new MqProducer($saiddata);
  119. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  120. $re = $producer->produce($message);
  121. var_dump('消息发送成功' . $frame->fd,$result);
  122. } catch (\Exception $e) {
  123. var_dump('消息发送失败: ' . $e->getMessage());
  124. $this->retry($message);
  125. }
  126. } else {
  127. //给自己发一条未读消息
  128. try {
  129. $saiddata = $result;
  130. $saiddata['action'] = 'said';
  131. $saiddata['is_read'] = 0;
  132. $message = new MqProducer($saiddata);
  133. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  134. $re = $producer->produce($message);
  135. var_dump('消息发送成功' . $frame->fd);
  136. } catch (\Exception $e) {
  137. var_dump('消息发送失败: ' . $e->getMessage());
  138. $this->retry($message);
  139. }
  140. }
  141. //给好友推送消息
  142. $fd = $redisClient->findFd((int) $result['receiver_id']);
  143. if ($server->isEstablished((int) $fd)) {
  144. $data = $result;
  145. $data['is_read'] = 0;
  146. $server->push((int) $fd, json_encode(value: $data));
  147. var_dump('消息给好友发送成功' . $fd,$data);
  148. // 尝试连接
  149. try {
  150. $chatdata = $result;
  151. $chatdata['action'] = 'recieved';
  152. $chatdata['receiver_id'] = $result['user_id'];
  153. $chatdata['user_id'] = $result['receiver_id'];
  154. // $chatdata['user_name'] = $result['user_name'];
  155. $chatdata['is_read'] = 0;
  156. $message = new MqProducer($chatdata);
  157. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  158. $re = $producer->produce($message);
  159. var_dump('消息发送成功#');
  160. } catch (\Exception $e) {
  161. var_dump('消息发送失败: ' . $e->getMessage());
  162. $this->retry($message);
  163. }
  164. } else {
  165. try {
  166. $chatdata = $result;
  167. $chatdata['action'] = 'recieved';
  168. $chatdata['receiver_id'] = $result['user_id'];
  169. $chatdata['user_id'] = $result['receiver_id'];
  170. // $chatdata['user_name'] = $result['user_name'];
  171. $chatdata['is_read'] = 0;
  172. $message = new MqProducer($chatdata);
  173. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  174. $re = $producer->produce($message);
  175. var_dump('消息发送成功!');
  176. } catch (\Exception $e) {
  177. var_dump('消息发送失败: ' . $e->getMessage());
  178. $this->retry($message);
  179. }
  180. }
  181. } else if (isset($result['talk_type']) && $result['talk_type'] == 2) {
  182. //根据群找到 群用户,群发一遍消息
  183. $groupUserList = $this->chatServiceClient->getGroupMembers(['group_id' => $result['receiver_id']]);
  184. // var_dump($groupUserList['data'], '-----------------############-----------------');
  185. if ($groupUserList['data']) {
  186. $chatdata = $result;
  187. foreach ($groupUserList['data'] as $val) {
  188. $fd = $redisClient->findFd((int) $val['user_id']);
  189. if ($server->isEstablished((int) $fd)) {
  190. var_dump($fd, $val['user_id']);
  191. $result['user_name'] = $val['group_name']??'';
  192. $server->push((int) $fd, json_encode($result));
  193. var_dump($val, '-------------44----');
  194. try {
  195. //分发说,只记录自己对自己
  196. if ($result['user_id'] == $val['user_id']) {
  197. $chatdata['receiver_id'] = $result['receiver_id'];
  198. $chatdata['user_id'] = $result['user_id'];
  199. $chatdata['group_receiver_id'] = $val['user_id'];
  200. $chatdata['is_read'] = 1;
  201. $chatdata['action'] = 'said';
  202. $message = new MqProducer($chatdata);
  203. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  204. $re = $producer->produce($message);
  205. }
  206. //分发接收,不记录自己
  207. if ($chatdata['user_id'] != $val['user_id']) {
  208. $chatdata['receiver_id'] = $result['receiver_id'];
  209. $chatdata['user_id'] = $val['user_id'];
  210. $chatdata['group_receiver_id'] = $result['user_id'];
  211. $chatdata['is_read'] = 0;
  212. $chatdata['action'] = 'recieved';
  213. $message = new MqProducer($chatdata);
  214. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  215. $re = $producer->produce($message);
  216. var_dump('消息发送成功@');
  217. }
  218. } catch (\Exception $e) {
  219. var_dump('消息发送失败: ' . $e->getMessage());
  220. }
  221. } else {
  222. try {
  223. //分发说,只记录自己对自己
  224. if ($result['user_id'] == $val['user_id']) {
  225. $chatdata['receiver_id'] = $result['receiver_id'];
  226. $chatdata['user_id'] = $result['user_id'];
  227. $chatdata['group_receiver_id'] = $val['user_id'];
  228. $chatdata['is_read'] = 0;
  229. $chatdata['action'] = 'said';
  230. $chatdata['group_name'] = $val['group_name']??'';
  231. $message = new MqProducer($chatdata);
  232. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  233. $re = $producer->produce($message);
  234. }
  235. //分发接收,不记录自己
  236. if ($chatdata['user_id'] != $val['user_id']) {
  237. $chatdata['receiver_id'] = $result['receiver_id'];
  238. $chatdata['user_id'] = $val['user_id'];
  239. $chatdata['group_receiver_id'] = $result['user_id'];
  240. $chatdata['is_read'] = 0;
  241. $chatdata['action'] = 'recieved';
  242. $chatdata['group_name'] = $val['group_name']??'';
  243. $message = new MqProducer($chatdata);
  244. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  245. $re = $producer->produce($message);
  246. var_dump('消息发送成功$');
  247. }
  248. } catch (\Exception $e) {
  249. var_dump('消息发送失败: ' . $e->getMessage());
  250. }
  251. }
  252. }
  253. }
  254. }
  255. }
  256. protected function handleVideoSignaling($server, $frame, $userId, $message)
  257. {
  258. $redisClient = new RedisService();
  259. $receiverId = $message['receiver_id'] ?? null;
  260. if (!$receiverId) {
  261. $server->push($frame->fd, json_encode(['error' => 'Missing receiver_id']));
  262. return;
  263. }
  264. // 获取接收方 fd
  265. $fd = $redisClient->findFd((int)$receiverId);
  266. // 判断接收者是否在线
  267. if ($server->isEstablished((int)$fd)) {
  268. // 转发消息给接收方
  269. $server->push((int)$fd, json_encode($message));
  270. } else {
  271. // 可选:缓存或通知对方不在线
  272. $server->push($frame->fd, json_encode([
  273. 'type' => 'offline',
  274. 'receiver_id' => $receiverId,
  275. 'content' => '对方不在线'
  276. ]));
  277. }
  278. }
  279. public function retry($message): void
  280. {
  281. $maxRetries = 10; // 最大重试次数
  282. $retryCount = 0;
  283. while ($retryCount < $maxRetries) {
  284. try {
  285. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  286. $re = $producer->produce($message);
  287. var_dump('重试消息发送成功');
  288. break;
  289. } catch (\Exception $e) {
  290. $retryCount++;
  291. if ($retryCount >= $maxRetries) {
  292. var_dump('达到最大重试次数,消息发送失败: ' . $e->getMessage());
  293. } else {
  294. var_dump('第 ' . $retryCount . ' 次重试失败: ' . $e->getMessage());
  295. }
  296. }
  297. }
  298. }
  299. public function onClose($server, int $fd, int $reactorId): void
  300. {
  301. // var_dump('closed::::::::::::::::::', $fd, "======", $reactorId, "+++++++++++");
  302. $redisClient = new RedisService();
  303. $userId = $redisClient->findUser((string) $fd);
  304. // 清理连接信息
  305. $this->redis->hDel('websocket:connections', (string)$fd);
  306. $this->redis->hDel('websocket:activity', (string)$fd);
  307. $this->redis->hDel('websocket:heartbeat', (string)$fd);
  308. $redisClient->unbind((string) $fd, (int) $userId);
  309. // 记录断开连接
  310. $this->redis->lPush('websocket:disconnections', json_encode([
  311. 'fd' => $fd,
  312. 'user_id' => $userId,
  313. 'time' => time()
  314. ]));
  315. }
  316. public function onOpen($server, $request): void
  317. {
  318. $token = $request->get['token'];
  319. $userInfo = $this->jwt->getClaimsByToken($token);
  320. $response = (new Response($server))->init($request);
  321. $fd = $response->getFd();
  322. // 绑定连接
  323. $server->bind($fd, $userInfo['uid']);
  324. $redisClient = new RedisService();
  325. $redisClient->bind((string) $fd, $userInfo['uid']);
  326. // 更新连接信息
  327. $this->updateConnection($fd, $userInfo['uid']);
  328. // 启动心跳检测
  329. $this->startHeartbeat($server, $fd);
  330. // 发送连接成功消息
  331. $server->push($request->fd, json_encode([
  332. "event" => "connect",
  333. "content" => [
  334. "ping_interval" => self::HEARTBEAT_INTERVAL,
  335. "ping_timeout" => self::CONNECTION_TIMEOUT,
  336. "content" => "连接成功",
  337. "fd" => $fd,
  338. "user_id" => $userInfo['uid'],
  339. "reconnect_attempts" => self::MAX_RECONNECT_ATTEMPTS
  340. ],
  341. ]));
  342. // 记录连接成功
  343. $this->redis->lPush('websocket:connections', json_encode([
  344. 'fd' => $fd,
  345. 'user_id' => $userInfo['uid'],
  346. 'time' => time()
  347. ]));
  348. }
  349. /**
  350. * 处理重连请求
  351. */
  352. private function handleReconnect($server, $frame): void
  353. {
  354. $data = json_decode($frame->data, true);
  355. $userId = $data['user_id'] ?? null;
  356. $attempt = $data['attempt'] ?? 1;
  357. if ($attempt > self::MAX_RECONNECT_ATTEMPTS) {
  358. $server->push($frame->fd, json_encode([
  359. 'type' => 'reconnect_failed',
  360. 'message' => '重连次数超限'
  361. ]));
  362. return;
  363. }
  364. // 更新连接信息
  365. $this->updateConnection($frame->fd, $userId);
  366. $server->push($frame->fd, json_encode([
  367. 'type' => 'reconnect_success',
  368. 'attempt' => $attempt,
  369. 'message' => '重连成功'
  370. ]));
  371. }
  372. /**
  373. * 更新最后活动时间
  374. */
  375. private function updateLastActivity($fd): void
  376. {
  377. $this->redis->hSet('websocket:activity', (string)$fd, time());
  378. }
  379. /**
  380. * 更新连接信息
  381. */
  382. private function updateConnection($fd, $userId): void
  383. {
  384. $connectionInfo = [
  385. 'fd' => $fd,
  386. 'user_id' => $userId,
  387. 'connected_at' => time(),
  388. 'last_activity' => time(),
  389. 'status' => 'connected'
  390. ];
  391. $this->redis->hSet('websocket:connections', (string)$fd, json_encode($connectionInfo));
  392. }
  393. /**
  394. * 启动心跳检测
  395. */
  396. private function startHeartbeat($server, $fd): void
  397. {
  398. Coroutine::create(function() use ($server, $fd) {
  399. while (true) {
  400. // 检查连接是否还存在
  401. if (!$server->isEstablished($fd)) {
  402. break;
  403. }
  404. // 发送ping
  405. $server->push($fd, 'ping');
  406. // 等待pong响应
  407. $this->waitForPong($server, $fd);
  408. // 等待下次心跳
  409. Coroutine::sleep(self::HEARTBEAT_INTERVAL);
  410. }
  411. });
  412. }
  413. /**
  414. * 等待pong响应
  415. */
  416. private function waitForPong($server, $fd): void
  417. {
  418. $startTime = time();
  419. while (time() - $startTime < self::CONNECTION_TIMEOUT) {
  420. if (!$server->isEstablished($fd)) {
  421. return;
  422. }
  423. // 检查是否收到pong
  424. $pongTime = $this->redis->hGet('websocket:heartbeat', (string)$fd);
  425. if ($pongTime && (time() - $pongTime) < self::HEARTBEAT_INTERVAL) {
  426. return; // 收到pong响应
  427. }
  428. Coroutine::sleep(1);
  429. }
  430. // 超时,断开连接
  431. if ($server->isEstablished($fd)) {
  432. $server->close($fd);
  433. }
  434. }
  435. }