123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- <?php
- declare (strict_types = 1);
- namespace App\Controller;
- use App\Amqp\Producer\MqProducer;
- use App\JsonRpc\ChatServiceInterface;
- use App\JsonRpc\UserServiceInterface;
- use App\Service\Message\ReceiveHandleService;
- use App\Service\RedisService;
- use Hyperf\Amqp\Producer;
- use Hyperf\Context\ApplicationContext as ContextApplicationContext;
- use Hyperf\Contract\OnCloseInterface;
- use Hyperf\Contract\OnMessageInterface;
- use Hyperf\Contract\OnOpenInterface;
- use Hyperf\Di\Annotation\Inject;
- use Hyperf\Engine\WebSocket\Response;
- use Phper666\JWTAuth\JWT;
- use swoole\Server;
- class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
- {
- #[Inject]
- protected JWT $jwt;
- /**
- * @var ChatServiceInterface
- */
- #[Inject]
- private $chatServiceClient;
- /**
- * @var UserServiceInterface
- */
- #[Inject]
- private $userServiceClient;
- /**
- * @Inject
- * @var ReceiveHandleService
- */
- protected $receiveHandle;
- protected $server;
-
- public function onMessage($server, $frame): void
- {
- //把数据推给前端
- $redisClient = new RedisService();
- $userId = $redisClient->findUser((string) $frame->fd);
- //存入队列
- $result = json_decode($frame->data, true);
- $result['user_id'] = $userId;
- // $show_id = $result['show_id'];
- var_dump($result, '-------------1----');
- $userInfo = $redisClient->getUserInfo((string) $userId);
- if ($userInfo) {
- $userInfoArr = json_decode($userInfo);
- $result['user_avatar'] = $userInfoArr['avatar'];
- } else {
- $userInfos = $this->userServiceClient->getUserInfo((int) $userId);
- $redisClient->setUserInfo($userId, $userInfos['data']);
- $result['user_avatar'] = $userInfos['data']['avatar'];
- }
- $myFriends = $redisClient->getUserFriends((string) $userId);
- $myFriendsArr = [];
- if ($myFriends) {
- $myFriendsArr = json_decode($myFriends);
- } else {
- $myFriends = $this->chatServiceClient->getFriendsList(['user_id' => $userId, 'status' => 2]);
- $redisClient->setUserFriends($userId, $myFriends['data']);
- $myFriendsArr = $myFriends['data'];
- }
-
- //推送给前台
- //组装数据+头像
- if ($result['talk_type'] == 1) {
- //判断$result['receiver_id']是否是好友
- $myFriendsID = array_column($myFriendsArr, 'friend_id');
- if (!in_array($result['receiver_id'], $myFriendsID)) {
- $myFriends = $this->chatServiceClient->getFriendsList(['user_id' => $userId, 'status' => 2]);
- $redisClient->setUserFriends($userId, $myFriends['data']);
- $myFriendsArrdata = $myFriends['data'];
- $myFriendsArrID = array_column($myFriendsArrdata, 'friend_id');
- if (!in_array($result['receiver_id'], $myFriendsArrID)) {
- $result['content'] = '您还不是好友,无法发送消息!';
- $server->push((int) $frame->fd, json_encode($result));
- return;
- }
- }
- //给自己推一条数据
- if ($server->isEstablished($frame->fd)) {
- $result['is_read'] = 1;
- $server->push((int) $frame->fd, json_encode($result));
- // 尝试连接
- try {
- $saiddata = $result;
- $saiddata['action'] = 'said';
- $message = new MqProducer($saiddata);
- $producer = ContextApplicationContext::getContainer()->get(Producer::class);
- $re = $producer->produce($message);
- var_dump('消息发送成功' . $frame->fd);
- } catch (\Exception $e) {
- var_dump('消息发送失败: ' . $e->getMessage());
- $this->retry($message);
- }
- } else {
- //给自己发一条未读消息
- try {
- $saiddata = $result;
- $saiddata['action'] = 'said';
- $saiddata['is_read'] = 0;
- $message = new MqProducer($saiddata);
- $producer = ContextApplicationContext::getContainer()->get(Producer::class);
- $re = $producer->produce($message);
- var_dump('消息发送成功' . $frame->fd);
- } catch (\Exception $e) {
- var_dump('消息发送失败: ' . $e->getMessage());
- $this->retry($message);
- }
- }
- //给好友推送消息
- $fd = $redisClient->findFd((int) $result['receiver_id']);
- if ($server->isEstablished((int) $fd)) {
- $data = $result;
- $data['is_read'] = 0;
- $server->push((int) $fd, json_encode(value: $data));
- var_dump('消息给好友发送成功' . $fd);
- // 尝试连接
- try {
- $chatdata = $result;
- $chatdata['action'] = 'recieved';
- $chatdata['receiver_id'] = $result['user_id'];
- $chatdata['user_id'] = $result['receiver_id'];
- $chatdata['is_read'] = 0;
- $message = new MqProducer($chatdata);
- $producer = ContextApplicationContext::getContainer()->get(Producer::class);
- $re = $producer->produce($message);
- var_dump('消息发送成功');
- } catch (\Exception $e) {
- var_dump('消息发送失败: ' . $e->getMessage());
- $this->retry($message);
- }
- } else {
- try {
- $chatdata = $result;
- $chatdata['action'] = 'recieved';
- $chatdata['receiver_id'] = $result['user_id'];
- $chatdata['user_id'] = $result['receiver_id'];
- $chatdata['is_read'] = 0;
- $message = new MqProducer($chatdata);
- $producer = ContextApplicationContext::getContainer()->get(Producer::class);
- $re = $producer->produce($message);
- var_dump('消息发送成功');
- } catch (\Exception $e) {
- var_dump('消息发送失败: ' . $e->getMessage());
- $this->retry($message);
- }
- }
- } else if ($result['talk_type'] == 2) {
- //根据群找到 群用户,群发一遍消息
- $groupUserList = $this->chatServiceClient->getGroupMembers(['group_id' => $result['receiver_id']]);
- var_dump($groupUserList['data'],'----------------------------------');
- if ($groupUserList['data']) {
- $chatdata = $result;
- foreach ($groupUserList['data'] as $val) {
- $fd = $redisClient->findFd((int) $val['user_id']);
- if ($server->isEstablished((int) $fd)) {
- var_dump($fd, $val['user_id']);
- $server->push((int) $fd, json_encode($result));
- try {
- //分发说,只记录自己对自己
- if ($result['user_id'] == $val['user_id']) {
- $chatdata['receiver_id'] = $result['receiver_id'];
- $chatdata['user_id'] = $result['user_id'];
- $chatdata['group_receiver_id'] = $val['user_id'];
- $chatdata['is_read'] = 1;
- $chatdata['action'] = 'said';
- $message = new MqProducer($chatdata);
- $producer = ContextApplicationContext::getContainer()->get(Producer::class);
- $re = $producer->produce($message);
- }
- //分发接收,不记录自己
- if ($chatdata['user_id'] != $val['user_id']) {
- $chatdata['receiver_id'] = $result['receiver_id'];
- $chatdata['user_id'] = $val['user_id'];
- $chatdata['group_receiver_id'] = $result['user_id'];
- $chatdata['is_read'] = 0;
- $chatdata['action'] = 'recieved';
- $message = new MqProducer($chatdata);
- $producer = ContextApplicationContext::getContainer()->get(Producer::class);
- $re = $producer->produce($message);
- var_dump('消息发送成功');
- }
- } catch (\Exception $e) {
- var_dump('消息发送失败: ' . $e->getMessage());
- }
- } else {
- try {
- //分发说,只记录自己对自己
- if ($result['user_id'] == $val['user_id']) {
- $chatdata['receiver_id'] = $result['receiver_id'];
- $chatdata['user_id'] = $result['user_id'];
- $chatdata['group_receiver_id'] = $val['user_id'];
- $chatdata['is_read'] = 0;
- $chatdata['action'] = 'said';
- $message = new MqProducer($chatdata);
- $producer = ContextApplicationContext::getContainer()->get(Producer::class);
- $re = $producer->produce($message);
- }
- //分发接收,不记录自己
- if ($chatdata['user_id'] != $val['user_id']) {
- $chatdata['receiver_id'] = $result['receiver_id'];
- $chatdata['user_id'] = $val['user_id'];
- $chatdata['group_receiver_id'] = $result['user_id'];
- $chatdata['is_read'] = 0;
- $chatdata['action'] = 'recieved';
- $message = new MqProducer($chatdata);
- $producer = ContextApplicationContext::getContainer()->get(Producer::class);
- $re = $producer->produce($message);
- var_dump('消息发送成功');
- }
- } catch (\Exception $e) {
- var_dump('消息发送失败: ' . $e->getMessage());
- }
- }
- }
- }
- }
- }
- public function retry($message): void
- {
- $maxRetries = 10; // 最大重试次数
- $retryCount = 0;
- while ($retryCount < $maxRetries) {
- try {
- $producer = ContextApplicationContext::getContainer()->get(Producer::class);
- $re = $producer->produce($message);
- var_dump('重试消息发送成功');
- break;
- } catch (\Exception $e) {
- $retryCount++;
- if ($retryCount >= $maxRetries) {
- var_dump('达到最大重试次数,消息发送失败: ' . $e->getMessage());
- } else {
- var_dump('第 ' . $retryCount . ' 次重试失败: ' . $e->getMessage());
- }
- }
- }
- }
- public function onClose($server, int $fd, int $reactorId): void
- {
- var_dump('closed::::::::::::::::::', $fd, "======", $reactorId, "+++++++++++");
- $redisClient = new RedisService();
- $userId = $redisClient->findUser((string) $fd);
- $redisClient->unbind((string) $fd, (int) $userId);
- }
- public function onOpen($server, $request): void
- {
- $token = $request->get['token'];
- $userInfo = $this->jwt->getClaimsByToken($token);
- $response = (new Response($server))->init($request);
- $fd = $response->getFd();
- $server->bind($fd, $userInfo['uid']);
- $redisClient = new RedisService();
- $redisClient->bind((string) $fd, $userInfo['uid']);
- $server->push($request->fd, json_encode([
- "event" => "connect",
- "content" => [
- "ping_interval" => 20,
- "ping_timeout" => 20 * 3,
- "content" => "连接成功",
- "fd" => $fd,
- "user_id" => $userInfo['uid'],
- ],
- ]));
- }
- }
|