123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- <?php
- declare(strict_types=1);
- namespace App\Controller;
- use App\Amqp\Producer\MqProducer;
- use Hyperf\Amqp\Producer;
- use Hyperf\Context\ApplicationContext as ContextApplicationContext;
- use Hyperf\Contract\OnCloseInterface;
- use Hyperf\Contract\OnMessageInterface;
- use Hyperf\Contract\OnOpenInterface;
- use Hyperf\Engine\WebSocket\Frame;
- use Hyperf\Engine\WebSocket\Response;
- use Hyperf\WebSocketServer\Annotation\MessageHandler;
- use Hyperf\WebSocketServer\Context\WebSocketContext;
- use Hyperf\WebSocketServer\Message\Text;
- use Hyperf\Di\Annotation\Inject;
- use Phper666\JWTAuth\JWT;
- use App\JsonRpc\ChatServiceInterface;
- use Hyperf\WebSocketServer\Constant\Opcode;
- use App\Service\RedisService;
- use App\Service\Message\ReceiveHandleService;
- use http\Client\Request;
- use App\Controller\AbstractController;
- use App\JsonRpc\UserServiceInterface;
- class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
- {
- #[Inject]
- protected JWT $jwt;
- /**
- * @var ChatServiceInterface
- */
- #[Inject]
- private $chatServiceClient;
- /**
- * @var UserServiceInterface
- */
- #[Inject]
- private $userServiceClient;
- /**
- * @Inject
- * @var ReceiveHandleService
- */
- protected $receiveHandle;
- public function onMessage($server, $frame): void
- {
- //把数据推给前端
- $redisClient = new RedisService();
- $userId = $redisClient->findUser((string)$frame->fd);
- //存入队列
- $result = json_decode($frame->data, true);
- $result['user_id'] = $userId;
- $userInfo = $redisClient->getUserInfo($userId);
- if($userInfo){
- $userInfoArr = json_decode($userInfo);
- $result['user_avatar'] = $userInfoArr['avatar'];
- }else{
- $userInfos = $this->userServiceClient->getUserInfo((int)$userId);
- $redisClient->setUserInfo($userId,$userInfos['data']);
- $result['user_avatar'] = $userInfos['data']['avatar'];
- }
- var_dump("接收到的数据:",$result);
- $message = new MqProducer($result);
- $producer = ContextApplicationContext::getContainer()->get(Producer::class);
- $producer->produce($message);
- //推送给前台
- //组装数据+头像
- if($result['talk_type']==1){
- //给自己推一条数据
- if ($server->isEstablished($frame->fd)) {
- $server->push($frame->fd, json_encode($result));
- }
- //给好友推送消息
- $fd = $redisClient->findFd((int)$result['receiver_id']);
- // $result['user_id'] = $result['friend_id'];
- if ($server->isEstablished((int)$fd)) {
- $b = $server->push((int)$fd, json_encode($result));
- var_dump("推送成功:",$b);
- }
- }else if($result['talk_type']==2){
- //根据群找到 群用户,群发一遍消息
- $groupUserList = $this->chatServiceClient->getAllTalkGroupMember(['group_id'=>$result['receiver_id']]);
- var_dump($groupUserList['data']);
- if($groupUserList['data']){
- foreach ($groupUserList['data'] as $val){
- $fd = $redisClient->findFd((int)$val['user_id']);
- if ($server->isEstablished((int)$fd)) {
- $server->push((int)$fd, json_encode($result));
- }
- }
- }
- }
- }
- public function onClose($server, int $fd, int $reactorId): void
- {
- var_dump('closed::::::::::::::::::',$fd,"======",$reactorId,"+++++++++++");
- // $data = [
- // 'fd'=>$fd
- // ];
- // $this->chatServiceClient->delChatChannel($data);
- $redisClient = new RedisService();
- $userId = $redisClient->findUser((string)$fd);
- $redisClient->unbind((string)$fd,(int)$userId);
- }
- public function onOpen($server, $request): void
- {
- $token = $request->get['token'];
- $userInfo = $this->jwt->getClaimsByToken($token);
- $response = (new Response($server))->init($request);
- $fd = $response->getFd();
- // var_dump("管道ID:",$fd);
- // $data = [
- // 'user_id'=>$userInfo['uid'],
- // 'fd'=>$fd
- // ];
- // var_dump(SERVER_RUN_ID,"+++++++++++++");
- // $this->chatServiceClient->addChatChannel($data);
- $server->bind($fd,$userInfo['uid']);
- $redisClient = new RedisService();
- $redisClient->bind((string)$fd,$userInfo['uid']);
- $server->push($request->fd, json_encode([
- "event" => "connect",
- "content" => [
- "ping_interval" => 20,
- "ping_timeout" => 20 * 3,
- "content" =>"连接成功"
- ],
- ]));
- }
- }
|