Browse Source

持久化

rkljw 2 days ago
parent
commit
c96c1978f8
2 changed files with 266 additions and 105 deletions
  1. 179 18
      app/Controller/WebSocketController.php
  2. 87 87
      app/Service/Message/ReceiveHandleService.php

+ 179 - 18
app/Controller/WebSocketController.php

@@ -16,41 +16,69 @@ use Hyperf\Contract\OnMessageInterface;
 use Hyperf\Contract\OnOpenInterface;
 use Hyperf\Di\Annotation\Inject;
 use Hyperf\Engine\WebSocket\Response;
+use Hyperf\Redis\Redis;
 use Phper666\JWTAuth\JWT;
 use swoole\Server;
+use Hyperf\Coroutine\Coroutine;
 
 class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
 {
-
     #[Inject]
     protected JWT $jwt;
-    /**
-     * @var ChatServiceInterface
-     */
+    
     #[Inject]
-    private $chatServiceClient;
-    /**
-     * @var UserServiceInterface
-     */
+    private ChatServiceInterface $chatServiceClient;
+    
     #[Inject]
-    private $userServiceClient;
+    private UserServiceInterface $userServiceClient;
 
-    /**
-     * @Inject
-     * @var ReceiveHandleService
-     */
-    protected $receiveHandle;
+    #[Inject]
+    protected ReceiveHandleService $receiveHandle;
+    
+    #[Inject]
+    protected Redis $redis;
+    
     protected $server;
-
+    
+    // 持久连接配置
+    private const HEARTBEAT_INTERVAL = 30; // 心跳间隔(秒)
+    private const CONNECTION_TIMEOUT = 90;  // 连接超时(秒)
+    private const MAX_RECONNECT_ATTEMPTS = 5; // 最大重连次数
+    
+    // 存储连接信息
+    private static $connections = [];
+    private static $heartbeatTasks = [];
 
     public function onMessage($server, $frame): void
     {
-        //把数据推给前端
         $redisClient = new RedisService();
         $userId = $redisClient->findUser((string) $frame->fd);
+        
+        // 处理心跳消息
+        if ($frame->data === 'ping') {
+            $server->push($frame->fd, 'pong');
+            $this->updateLastActivity($frame->fd);
+            return;
+        }
+        
+        if ($frame->data === 'pong') {
+            $this->updateLastActivity($frame->fd);
+            $this->redis->hSet('websocket:heartbeat', (string)$frame->fd, time());
+            return;
+        }
+        
+        // 处理重连请求
+        if (strpos($frame->data, 'reconnect') === 0) {
+            $this->handleReconnect($server, $frame);
+            return;
+        }
+        
         //存入队列
         $result = json_decode($frame->data, true);
         $result['user_id'] = $userId;
+        
+        // 更新最后活动时间
+        $this->updateLastActivity($frame->fd);
 
         // $show_id = $result['show_id'];
 
@@ -295,7 +323,20 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
         // var_dump('closed::::::::::::::::::', $fd, "======", $reactorId, "+++++++++++");
         $redisClient = new RedisService();
         $userId = $redisClient->findUser((string) $fd);
+        
+        // 清理连接信息
+        $this->redis->hDel('websocket:connections', (string)$fd);
+        $this->redis->hDel('websocket:activity', (string)$fd);
+        $this->redis->hDel('websocket:heartbeat', (string)$fd);
+        
         $redisClient->unbind((string) $fd, (int) $userId);
+        
+        // 记录断开连接
+        $this->redis->lPush('websocket:disconnections', json_encode([
+            'fd' => $fd,
+            'user_id' => $userId,
+            'time' => time()
+        ]));
     }
     public function onOpen($server, $request): void
     {
@@ -303,18 +344,138 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
         $userInfo = $this->jwt->getClaimsByToken($token);
         $response = (new Response($server))->init($request);
         $fd = $response->getFd();
+        
+        // 绑定连接
         $server->bind($fd, $userInfo['uid']);
         $redisClient = new RedisService();
         $redisClient->bind((string) $fd, $userInfo['uid']);
+        
+        // 更新连接信息
+        $this->updateConnection($fd, $userInfo['uid']);
+        
+        // 启动心跳检测
+        $this->startHeartbeat($server, $fd);
+        
+        // 发送连接成功消息
         $server->push($request->fd, json_encode([
             "event" => "connect",
             "content" => [
-                "ping_interval" => 20,
-                "ping_timeout" => 20 * 3,
+                "ping_interval" => self::HEARTBEAT_INTERVAL,
+                "ping_timeout" => self::CONNECTION_TIMEOUT,
                 "content" => "连接成功",
                 "fd" => $fd,
                 "user_id" => $userInfo['uid'],
+                "reconnect_attempts" => self::MAX_RECONNECT_ATTEMPTS
             ],
         ]));
+        
+        // 记录连接成功
+        $this->redis->lPush('websocket:connections', json_encode([
+            'fd' => $fd,
+            'user_id' => $userInfo['uid'],
+            'time' => time()
+        ]));
+    }
+    
+    /**
+     * 处理重连请求
+     */
+    private function handleReconnect($server, $frame): void
+    {
+        $data = json_decode($frame->data, true);
+        $userId = $data['user_id'] ?? null;
+        $attempt = $data['attempt'] ?? 1;
+        
+        if ($attempt > self::MAX_RECONNECT_ATTEMPTS) {
+            $server->push($frame->fd, json_encode([
+                'type' => 'reconnect_failed',
+                'message' => '重连次数超限'
+            ]));
+            return;
+        }
+        
+        // 更新连接信息
+        $this->updateConnection($frame->fd, $userId);
+        
+        $server->push($frame->fd, json_encode([
+            'type' => 'reconnect_success',
+            'attempt' => $attempt,
+            'message' => '重连成功'
+        ]));
+    }
+    
+    /**
+     * 更新最后活动时间
+     */
+    private function updateLastActivity($fd): void
+    {
+        $this->redis->hSet('websocket:activity', (string)$fd, time());
+    }
+    
+    /**
+     * 更新连接信息
+     */
+    private function updateConnection($fd, $userId): void
+    {
+        $connectionInfo = [
+            'fd' => $fd,
+            'user_id' => $userId,
+            'connected_at' => time(),
+            'last_activity' => time(),
+            'status' => 'connected'
+        ];
+        
+        $this->redis->hSet('websocket:connections', (string)$fd, json_encode($connectionInfo));
+    }
+    
+    /**
+     * 启动心跳检测
+     */
+    private function startHeartbeat($server, $fd): void
+    {
+        Coroutine::create(function() use ($server, $fd) {
+            while (true) {
+                // 检查连接是否还存在
+                if (!$server->isEstablished($fd)) {
+                    break;
+                }
+                
+                // 发送ping
+                $server->push($fd, 'ping');
+                
+                // 等待pong响应
+                $this->waitForPong($server, $fd);
+                
+                // 等待下次心跳
+                Coroutine::sleep(self::HEARTBEAT_INTERVAL);
+            }
+        });
+    }
+    
+    /**
+     * 等待pong响应
+     */
+    private function waitForPong($server, $fd): void
+    {
+        $startTime = time();
+        
+        while (time() - $startTime < self::CONNECTION_TIMEOUT) {
+            if (!$server->isEstablished($fd)) {
+                return;
+            }
+            
+            // 检查是否收到pong
+            $pongTime = $this->redis->hGet('websocket:heartbeat', (string)$fd);
+            if ($pongTime && (time() - $pongTime) < self::HEARTBEAT_INTERVAL) {
+                return; // 收到pong响应
+            }
+            
+            Coroutine::sleep(1);
+        }
+        
+        // 超时,断开连接
+        if ($server->isEstablished($fd)) {
+            $server->close($fd);
+        }
     }
 }

+ 87 - 87
app/Service/Message/ReceiveHandleService.php

@@ -1,87 +1,87 @@
-<?php
-declare(strict_types=1);
-
-namespace App\Service\Message;
-
-
-use App\Service\SocketClientService;
-
-use Swoole\Http\Response;
-use Swoole\WebSocket\Frame;
-use Swoole\WebSocket\Server;
-
-class ReceiveHandleService
-{
-    /**
-     * @var SocketClientService
-     */
-    private $client;
-
-    // 消息事件绑定
-    const EVENTS = [
-        EVENT_TALK          => 'onTalk',
-        EVENT_TALK_KEYBOARD => 'onKeyboard',
-    ];
-
-    /**
-     * ReceiveHandleService constructor.
-     *
-     * @param SocketClientService $client
-     */
-//    public function __construct(SocketClientService $client)
-//    {
-//        $this->client = $client;
-//    }
-
-    /**
-     * 对话文本消息
-     *
-     * @param Response|Server $server
-     * @param Frame           $frame
-     * @param array|string    $data 解析后数据
-     * @return void
-     */
-    public function onTalk($server, Frame $frame, $data)
-    {
-
-        var_dump("测试数据发送==========");
-        return true;
-//        $user_id = $this->client->findFdUserId($frame->fd);
-//        if ($user_id != $data['sender_id']) return;
-//
-//        // 验证消息类型
-//        if (!in_array($data['talk_type'], TalkModeConstant::getTypes())) return;
-//
-//        // 验证发送消息用户与接受消息用户之间是否存在好友或群聊关系
-//        $isTrue = UserRelation::isFriendOrGroupMember($user_id, (int)$data['receiver_id'], (int)$data['talk_type']);
-//        if (!$isTrue) {
-//            $server->push($frame->fd, json_encode(['event_error', [
-//                'message' => '暂不属于好友关系或群聊成员,无法发送聊天消息!'
-//            ]]));
-//            return;
-//        }
-
-//        di()->get(TalkMessageService::class)->insertText([
-//            'talk_type'   => $data['talk_type'],
-//            'user_id'     => $data['sender_id'],
-//            'receiver_id' => $data['receiver_id'],
-//            'content'     => $data['text_message'],
-//        ]);
-    }
-
-    /**
-     * 键盘输入消息
-     *
-     * @param Response|Server $server
-     * @param Frame           $frame
-     * @param array|string    $data 解析后数据
-     * @return void
-     */
-//    public function onKeyboard($server, Frame $frame, $data)
-//    {
-//        event()->dispatch(new TalkEvent(TalkEventConstant::EVENT_TALK_KEYBOARD, [
-//            'sender_id'   => (int)$data['sender_id'],
-//            'receiver_id' => (int)$data['receiver_id'],
-//        ]));
-//    }
-}
+<?php
+declare(strict_types=1);
+
+namespace App\Service\Message;
+
+
+use App\Service\SocketClientService;
+
+use Swoole\Http\Response;
+use Swoole\WebSocket\Frame;
+use Swoole\WebSocket\Server;
+
+class ReceiveHandleService
+{
+    /**
+     * @var SocketClientService
+     */
+    private $client;
+
+    // 消息事件绑定
+    const EVENTS = [
+        'onTalk'          => 'onTalk',
+        'onKeyboard' => 'onKeyboard',
+    ];
+
+    /**
+     * ReceiveHandleService constructor.
+     *
+     * @param SocketClientService $client
+     */
+//    public function __construct(SocketClientService $client)
+//    {
+//        $this->client = $client;
+//    }
+
+    /**
+     * 对话文本消息
+     *
+     * @param Response|Server $server
+     * @param Frame           $frame
+     * @param array|string    $data 解析后数据
+     * @return void
+     */
+    public function onTalk($server, Frame $frame, $data)
+    {
+
+        var_dump("测试数据发送==========");
+        return true;
+//        $user_id = $this->client->findFdUserId($frame->fd);
+//        if ($user_id != $data['sender_id']) return;
+//
+//        // 验证消息类型
+//        if (!in_array($data['talk_type'], TalkModeConstant::getTypes())) return;
+//
+//        // 验证发送消息用户与接受消息用户之间是否存在好友或群聊关系
+//        $isTrue = UserRelation::isFriendOrGroupMember($user_id, (int)$data['receiver_id'], (int)$data['talk_type']);
+//        if (!$isTrue) {
+//            $server->push($frame->fd, json_encode(['event_error', [
+//                'message' => '暂不属于好友关系或群聊成员,无法发送聊天消息!'
+//            ]]));
+//            return;
+//        }
+
+//        di()->get(TalkMessageService::class)->insertText([
+//            'talk_type'   => $data['talk_type'],
+//            'user_id'     => $data['sender_id'],
+//            'receiver_id' => $data['receiver_id'],
+//            'content'     => $data['text_message'],
+//        ]);
+    }
+
+    /**
+     * 键盘输入消息
+     *
+     * @param Response|Server $server
+     * @param Frame           $frame
+     * @param array|string    $data 解析后数据
+     * @return void
+     */
+//    public function onKeyboard($server, Frame $frame, $data)
+//    {
+//        event()->dispatch(new TalkEvent(TalkEventConstant::EVENT_TALK_KEYBOARD, [
+//            'sender_id'   => (int)$data['sender_id'],
+//            'receiver_id' => (int)$data['receiver_id'],
+//        ]));
+//    }
+}