WebSocketController.php 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. <?php
  2. declare (strict_types = 1);
  3. namespace App\Controller;
  4. use App\Amqp\Producer\MqProducer;
  5. use App\JsonRpc\ChatServiceInterface;
  6. use App\JsonRpc\UserServiceInterface;
  7. use App\Service\Message\ReceiveHandleService;
  8. use App\Service\RedisService;
  9. use Hyperf\Amqp\Producer;
  10. use Hyperf\Context\ApplicationContext as ContextApplicationContext;
  11. use Hyperf\Contract\OnCloseInterface;
  12. use Hyperf\Contract\OnMessageInterface;
  13. use Hyperf\Contract\OnOpenInterface;
  14. use Hyperf\Di\Annotation\Inject;
  15. use Hyperf\Engine\WebSocket\Response;
  16. use Phper666\JWTAuth\JWT;
  17. use swoole\Server;
  18. class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
  19. {
  20. #[Inject]
  21. protected JWT $jwt;
  22. /**
  23. * @var ChatServiceInterface
  24. */
  25. #[Inject]
  26. private $chatServiceClient;
  27. /**
  28. * @var UserServiceInterface
  29. */
  30. #[Inject]
  31. private $userServiceClient;
  32. /**
  33. * @Inject
  34. * @var ReceiveHandleService
  35. */
  36. protected $receiveHandle;
  37. protected $server;
  38. public function onMessage($server, $frame): void
  39. {
  40. //把数据推给前端
  41. $redisClient = new RedisService();
  42. $userId = $redisClient->findUser((string) $frame->fd);
  43. //存入队列
  44. $result = json_decode($frame->data, true);
  45. $result['user_id'] = $userId;
  46. var_dump($result, '-------------1----');
  47. $userInfo = $redisClient->getUserInfo((string) $userId);
  48. if ($userInfo) {
  49. $userInfoArr = json_decode($userInfo);
  50. $result['user_avatar'] = $userInfoArr['avatar'];
  51. } else {
  52. $userInfos = $this->userServiceClient->getUserInfo((int) $userId);
  53. $redisClient->setUserInfo($userId, $userInfos['data']);
  54. $result['user_avatar'] = $userInfos['data']['avatar'];
  55. }
  56. $myFriends = $redisClient->getUserFriends((string) $userId);
  57. $myFriendsArr = [];
  58. if ($myFriends) {
  59. $myFriendsArr = json_decode($myFriends);
  60. } else {
  61. $myFriends = $this->chatServiceClient->getFriendsList(['user_id' => $userId, 'status' => 2]);
  62. $redisClient->setUserFriends($userId, $myFriends['data']);
  63. $myFriendsArr = $myFriends['data'];
  64. }
  65. //推送给前台
  66. //组装数据+头像
  67. if ($result['talk_type'] == 1) {
  68. //判断$result['receiver_id']是否是好友
  69. $myFriendsID = array_column($myFriendsArr, 'friend_id');
  70. if (!in_array($result['receiver_id'], $myFriendsID)) {
  71. $myFriends = $this->chatServiceClient->getFriendsList(['user_id' => $userId, 'status' => 2]);
  72. $redisClient->setUserFriends($userId, $myFriends['data']);
  73. $myFriendsArr = $myFriends['data'];
  74. if (!in_array($result['receiver_id'], $myFriendsID)) {
  75. $result['content'] = '您还不是好友,无法发送消息!';
  76. $server->push((int) $frame->fd, json_encode($result));
  77. }
  78. return;
  79. }
  80. //给自己推一条数据
  81. if ($server->isEstablished($frame->fd)) {
  82. $result['is_read'] = 1;
  83. $server->push((int) $frame->fd, json_encode($result));
  84. // 尝试连接
  85. try {
  86. $saiddata = $result;
  87. $saiddata['action'] = 'said';
  88. $message = new MqProducer($saiddata);
  89. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  90. $re = $producer->produce($message);
  91. var_dump('消息发送成功' . $frame->fd);
  92. } catch (\Exception $e) {
  93. var_dump('消息发送失败: ' . $e->getMessage());
  94. $this->retry($message);
  95. }
  96. } else {
  97. //给自己发一条未读消息
  98. try {
  99. $saiddata = $result;
  100. $saiddata['action'] = 'said';
  101. $saiddata['is_read'] = 0;
  102. $message = new MqProducer($saiddata);
  103. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  104. $re = $producer->produce($message);
  105. var_dump('消息发送成功' . $frame->fd);
  106. } catch (\Exception $e) {
  107. var_dump('消息发送失败: ' . $e->getMessage());
  108. $this->retry($message);
  109. }
  110. }
  111. //给好友推送消息
  112. $fd = $redisClient->findFd((int) $result['receiver_id']);
  113. if ($server->isEstablished((int) $fd)) {
  114. $data = $result;
  115. $data['is_read'] = 1;
  116. $server->push((int) $fd, json_encode(value: $data));
  117. var_dump('消息给好友发送成功' . $fd);
  118. // 尝试连接
  119. try {
  120. $chatdata = $result;
  121. $chatdata['action'] = 'recieved';
  122. $chatdata['receiver_id'] = $result['user_id'];
  123. $chatdata['user_id'] = $result['receiver_id'];
  124. $message = new MqProducer($chatdata);
  125. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  126. $re = $producer->produce($message);
  127. var_dump('消息发送成功');
  128. } catch (\Exception $e) {
  129. var_dump('消息发送失败: ' . $e->getMessage());
  130. $this->retry($message);
  131. }
  132. } else {
  133. try {
  134. $chatdata = $result;
  135. $chatdata['action'] = 'recieved';
  136. $chatdata['receiver_id'] = $result['user_id'];
  137. $chatdata['user_id'] = $result['receiver_id'];
  138. $chatdata['is_read'] = 0;
  139. $message = new MqProducer($chatdata);
  140. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  141. $re = $producer->produce($message);
  142. var_dump('消息发送成功');
  143. } catch (\Exception $e) {
  144. var_dump('消息发送失败: ' . $e->getMessage());
  145. $this->retry($message);
  146. }
  147. }
  148. } else if ($result['talk_type'] == 2) {
  149. //根据群找到 群用户,群发一遍消息
  150. $groupUserList = $this->chatServiceClient->getGroupMembers(['group_id' => $result['receiver_id']]);
  151. var_dump($groupUserList['data'],'----------------------------------');
  152. if ($groupUserList['data']) {
  153. $chatdata = $result;
  154. foreach ($groupUserList['data'] as $val) {
  155. $fd = $redisClient->findFd((int) $val['user_id']);
  156. if ($server->isEstablished((int) $fd)) {
  157. var_dump($fd, $val['user_id']);
  158. $server->push((int) $fd, json_encode($result));
  159. try {
  160. //分发说,只记录自己对自己
  161. if ($result['user_id'] == $val['user_id']) {
  162. $chatdata['receiver_id'] = $result['receiver_id'];
  163. $chatdata['user_id'] = $result['user_id'];
  164. $chatdata['group_receiver_id'] = $val['user_id'];
  165. $chatdata['is_read'] = 1;
  166. $chatdata['action'] = 'said';
  167. $message = new MqProducer($chatdata);
  168. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  169. $re = $producer->produce($message);
  170. }
  171. //分发接收,不记录自己
  172. if ($chatdata['user_id'] != $val['user_id']) {
  173. $chatdata['receiver_id'] = $result['receiver_id'];
  174. $chatdata['user_id'] = $val['user_id'];
  175. $chatdata['group_receiver_id'] = $result['user_id'];
  176. $chatdata['is_read'] = 1;
  177. $chatdata['action'] = 'recieved';
  178. $message = new MqProducer($chatdata);
  179. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  180. $re = $producer->produce($message);
  181. var_dump('消息发送成功');
  182. }
  183. } catch (\Exception $e) {
  184. var_dump('消息发送失败: ' . $e->getMessage());
  185. }
  186. } else {
  187. try {
  188. //分发说,只记录自己对自己
  189. if ($result['user_id'] == $val['user_id']) {
  190. $chatdata['receiver_id'] = $result['receiver_id'];
  191. $chatdata['user_id'] = $result['user_id'];
  192. $chatdata['group_receiver_id'] = $val['user_id'];
  193. $chatdata['is_read'] = 0;
  194. $chatdata['action'] = 'said';
  195. $message = new MqProducer($chatdata);
  196. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  197. $re = $producer->produce($message);
  198. }
  199. //分发接收,不记录自己
  200. if ($chatdata['user_id'] != $val['user_id']) {
  201. $chatdata['receiver_id'] = $result['receiver_id'];
  202. $chatdata['user_id'] = $val['user_id'];
  203. $chatdata['group_receiver_id'] = $result['user_id'];
  204. $chatdata['is_read'] = 0;
  205. $chatdata['action'] = 'recieved';
  206. $message = new MqProducer($chatdata);
  207. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  208. $re = $producer->produce($message);
  209. var_dump('消息发送成功');
  210. }
  211. } catch (\Exception $e) {
  212. var_dump('消息发送失败: ' . $e->getMessage());
  213. }
  214. }
  215. }
  216. }
  217. }
  218. }
  219. public function retry($message): void
  220. {
  221. $maxRetries = 10; // 最大重试次数
  222. $retryCount = 0;
  223. while ($retryCount < $maxRetries) {
  224. try {
  225. $producer = ContextApplicationContext::getContainer()->get(Producer::class);
  226. $re = $producer->produce($message);
  227. var_dump('重试消息发送成功');
  228. break;
  229. } catch (\Exception $e) {
  230. $retryCount++;
  231. if ($retryCount >= $maxRetries) {
  232. var_dump('达到最大重试次数,消息发送失败: ' . $e->getMessage());
  233. } else {
  234. var_dump('第 ' . $retryCount . ' 次重试失败: ' . $e->getMessage());
  235. }
  236. }
  237. }
  238. }
  239. public function onClose($server, int $fd, int $reactorId): void
  240. {
  241. var_dump('closed::::::::::::::::::', $fd, "======", $reactorId, "+++++++++++");
  242. $redisClient = new RedisService();
  243. $userId = $redisClient->findUser((string) $fd);
  244. $redisClient->unbind((string) $fd, (int) $userId);
  245. }
  246. public function onOpen($server, $request): void
  247. {
  248. $token = $request->get['token'];
  249. $userInfo = $this->jwt->getClaimsByToken($token);
  250. $response = (new Response($server))->init($request);
  251. $fd = $response->getFd();
  252. // var_dump("1管道ID:",$fd);
  253. // $data = [
  254. // 'user_id'=>$userInfo['uid'],
  255. // 'fd'=>$fd
  256. // ];
  257. // var_dump(SERVER_RUN_ID,"+++++++++++++");
  258. // $this->chatServiceClient->addChatChannel($data);
  259. $server->bind($fd, $userInfo['uid']);
  260. $redisClient = new RedisService();
  261. $redisClient->bind((string) $fd, $userInfo['uid']);
  262. $server->push($request->fd, json_encode([
  263. "event" => "connect",
  264. "content" => [
  265. "ping_interval" => 20,
  266. "ping_timeout" => 20 * 3,
  267. "content" => "连接成功",
  268. "fd" => $fd,
  269. "user_id" => $userInfo['uid'],
  270. ],
  271. ]));
  272. }
  273. }