<?php declare(strict_types=1); namespace App\Controller; use App\Amqp\Producer\MqProducer; use Hyperf\Amqp\Producer; use Hyperf\Context\ApplicationContext as ContextApplicationContext; use Hyperf\Contract\OnCloseInterface; use Hyperf\Contract\OnMessageInterface; use Hyperf\Contract\OnOpenInterface; use Hyperf\Engine\WebSocket\Frame; use Hyperf\Engine\WebSocket\Response; use Hyperf\WebSocketServer\Annotation\MessageHandler; use Hyperf\WebSocketServer\Context\WebSocketContext; use Hyperf\WebSocketServer\Message\Text; use Hyperf\Di\Annotation\Inject; use Phper666\JWTAuth\JWT; use App\JsonRpc\ChatServiceInterface; use Hyperf\WebSocketServer\Constant\Opcode; use App\Service\RedisService; use App\Service\Message\ReceiveHandleService; use http\Client\Request; use App\Controller\AbstractController; use App\JsonRpc\UserServiceInterface; class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface { #[Inject] protected JWT $jwt; /** * @var ChatServiceInterface */ #[Inject] private $chatServiceClient; /** * @var UserServiceInterface */ #[Inject] private $userServiceClient; /** * @Inject * @var ReceiveHandleService */ protected $receiveHandle; public function onMessage($server, $frame): void { //把数据推给前端 $redisClient = new RedisService(); $userId = $redisClient->findUser((string)$frame->fd); //存入队列 $result = json_decode($frame->data, true); $result['user_id'] = $userId; $userInfo = $redisClient->getUserInfo($userId); if($userInfo){ $userInfoArr = json_decode($userInfo); $result['user_avatar'] = $userInfoArr['avatar']; }else{ $userInfos = $this->userServiceClient->getUserInfo((int)$userId); $redisClient->setUserInfo($userId,$userInfos['data']); $result['user_avatar'] = $userInfos['data']['avatar']; } var_dump("接收到的数据:",$result); $message = new MqProducer($result); $producer = ContextApplicationContext::getContainer()->get(Producer::class); $producer->produce($message); //推送给前台 //组装数据+头像 if($result['talk_type']==1){ //给自己推一条数据 if ($server->isEstablished($frame->fd)) { $server->push($frame->fd, json_encode($result)); } //给好友推送消息 $fd = $redisClient->findFd((int)$result['receiver_id']); // $result['user_id'] = $result['friend_id']; if ($server->isEstablished((int)$fd)) { $b = $server->push((int)$fd, json_encode($result)); var_dump("推送成功:",$b); } }else if($result['talk_type']==2){ //根据群找到 群用户,群发一遍消息 $groupUserList = $this->chatServiceClient->getAllTalkGroupMember(['group_id'=>$result['receiver_id']]); var_dump($groupUserList['data']); if($groupUserList['data']){ foreach ($groupUserList['data'] as $val){ $fd = $redisClient->findFd((int)$val['user_id']); if ($server->isEstablished((int)$fd)) { $server->push((int)$fd, json_encode($result)); } } } } } public function onClose($server, int $fd, int $reactorId): void { var_dump('closed::::::::::::::::::',$fd,"======",$reactorId,"+++++++++++"); // $data = [ // 'fd'=>$fd // ]; // $this->chatServiceClient->delChatChannel($data); $redisClient = new RedisService(); $userId = $redisClient->findUser((string)$fd); $redisClient->unbind((string)$fd,(int)$userId); } public function onOpen($server, $request): void { $token = $request->get['token']; $userInfo = $this->jwt->getClaimsByToken($token); $response = (new Response($server))->init($request); $fd = $response->getFd(); // var_dump("管道ID:",$fd); // $data = [ // 'user_id'=>$userInfo['uid'], // 'fd'=>$fd // ]; // var_dump(SERVER_RUN_ID,"+++++++++++++"); // $this->chatServiceClient->addChatChannel($data); $server->bind($fd,$userInfo['uid']); $redisClient = new RedisService(); $redisClient->bind((string)$fd,$userInfo['uid']); $server->push($request->fd, json_encode([ "event" => "connect", "content" => [ "ping_interval" => 20, "ping_timeout" => 20 * 3, "content" =>"连接成功" ], ])); } }