findUser((string) $frame->fd); // 处理心跳消息 if ($frame->data === 'ping') { $server->push($frame->fd, 'pong'); $this->updateLastActivity($frame->fd); return; } if ($frame->data === 'pong') { $this->updateLastActivity($frame->fd); $this->redis->hSet('websocket:heartbeat', (string)$frame->fd, time()); return; } // 处理重连请求 if (strpos($frame->data, 'reconnect') === 0) { $this->handleReconnect($server, $frame); return; } //存入队列 $result = json_decode($frame->data, true); $result['user_id'] = $userId; // 更新最后活动时间 $this->updateLastActivity($frame->fd); // $show_id = $result['show_id']; // var_dump($result, '-------------1----'); $userInfo = $redisClient->getUserInfo((string) $userId); // var_dump($userInfo, '-------------22----'); if ($userInfo) { $userInfoArr = json_decode($userInfo); $result['user_avatar'] = $userInfoArr['avatar']; $result['user_name'] = $userInfoArr['user_name']; } else { $userInfos = $this->userServiceClient->getUserInfo((int) $userId); // var_dump($userInfos, '-------------33----'); $redisClient->setUserInfo($userId, $userInfos['data']); $result['user_avatar'] = $userInfos['data']['avatar']; $result['user_name'] = $userInfos['data']['user_name']; } $myFriends = $redisClient->getUserFriends((string) $userId); $myFriendsArr = []; if ($myFriends) { $myFriendsArr = json_decode($myFriends); } else { $myFriends = $this->chatServiceClient->getFriendsList(['user_id' => $userId, 'status' => 2]); $redisClient->setUserFriends($userId, $myFriends['data']); $myFriendsArr = $myFriends['data']; } //推送给前台 if (isset($result['type']) && in_array($result['type'], ['offer', 'answer', 'ice-candidate'])) { // 是视频通话信令,走特殊处理逻辑 $this->handleVideoSignaling($server, $frame, $userId, $result); return; } //组装数据+头像 if (isset($result['talk_type']) && $result['talk_type'] == 1) { //判断$result['receiver_id']是否是好友 $myFriendsID = array_column($myFriendsArr, 'friend_id'); if (!in_array($result['receiver_id'], $myFriendsID)) { $myFriends = $this->chatServiceClient->getFriendsList(['user_id' => $userId, 'status' => 2]); $redisClient->setUserFriends($userId, $myFriends['data']); $myFriendsArrdata = $myFriends['data']; $myFriendsArrID = array_column($myFriendsArrdata, 'friend_id'); if (!in_array($result['receiver_id'], $myFriendsArrID)) { $result['content'] = '您还不是好友,无法发送消息!'; $server->push((int) $frame->fd, json_encode($result)); return; } } //给自己推一条数据 if ($server->isEstablished($frame->fd)) { $result['is_read'] = 1; $server->push((int) $frame->fd, json_encode($result)); // 尝试连接 try { $saiddata = $result; $saiddata['action'] = 'said'; $message = new MqProducer($saiddata); $producer = ContextApplicationContext::getContainer()->get(Producer::class); $re = $producer->produce($message); var_dump('消息发送成功' . $frame->fd,$result); } catch (\Exception $e) { var_dump('消息发送失败: ' . $e->getMessage()); $this->retry($message); } } else { //给自己发一条未读消息 try { $saiddata = $result; $saiddata['action'] = 'said'; $saiddata['is_read'] = 0; $message = new MqProducer($saiddata); $producer = ContextApplicationContext::getContainer()->get(Producer::class); $re = $producer->produce($message); var_dump('消息发送成功' . $frame->fd); } catch (\Exception $e) { var_dump('消息发送失败: ' . $e->getMessage()); $this->retry($message); } } //给好友推送消息 $fd = $redisClient->findFd((int) $result['receiver_id']); if ($server->isEstablished((int) $fd)) { $data = $result; $data['is_read'] = 0; $server->push((int) $fd, json_encode(value: $data)); var_dump('消息给好友发送成功' . $fd,$data); // 尝试连接 try { $chatdata = $result; $chatdata['action'] = 'recieved'; $chatdata['receiver_id'] = $result['user_id']; $chatdata['user_id'] = $result['receiver_id']; // $chatdata['user_name'] = $result['user_name']; $chatdata['is_read'] = 0; $message = new MqProducer($chatdata); $producer = ContextApplicationContext::getContainer()->get(Producer::class); $re = $producer->produce($message); var_dump('消息发送成功#'); } catch (\Exception $e) { var_dump('消息发送失败: ' . $e->getMessage()); $this->retry($message); } } else { try { $chatdata = $result; $chatdata['action'] = 'recieved'; $chatdata['receiver_id'] = $result['user_id']; $chatdata['user_id'] = $result['receiver_id']; // $chatdata['user_name'] = $result['user_name']; $chatdata['is_read'] = 0; $message = new MqProducer($chatdata); $producer = ContextApplicationContext::getContainer()->get(Producer::class); $re = $producer->produce($message); var_dump('消息发送成功!'); } catch (\Exception $e) { var_dump('消息发送失败: ' . $e->getMessage()); $this->retry($message); } } } else if (isset($result['talk_type']) && $result['talk_type'] == 2) { //根据群找到 群用户,群发一遍消息 $groupUserList = $this->chatServiceClient->getGroupMembers(['group_id' => $result['receiver_id']]); // var_dump($groupUserList['data'], '-----------------############-----------------'); if ($groupUserList['data']) { $chatdata = $result; foreach ($groupUserList['data'] as $val) { $fd = $redisClient->findFd((int) $val['user_id']); if ($server->isEstablished((int) $fd)) { var_dump($fd, $val['user_id']); $result['user_name'] = $val['group_name']??''; $server->push((int) $fd, json_encode($result)); var_dump($val, '-------------44----'); try { //分发说,只记录自己对自己 if ($result['user_id'] == $val['user_id']) { $chatdata['receiver_id'] = $result['receiver_id']; $chatdata['user_id'] = $result['user_id']; $chatdata['group_receiver_id'] = $val['user_id']; $chatdata['is_read'] = 1; $chatdata['action'] = 'said'; $message = new MqProducer($chatdata); $producer = ContextApplicationContext::getContainer()->get(Producer::class); $re = $producer->produce($message); } //分发接收,不记录自己 if ($chatdata['user_id'] != $val['user_id']) { $chatdata['receiver_id'] = $result['receiver_id']; $chatdata['user_id'] = $val['user_id']; $chatdata['group_receiver_id'] = $result['user_id']; $chatdata['is_read'] = 0; $chatdata['action'] = 'recieved'; $message = new MqProducer($chatdata); $producer = ContextApplicationContext::getContainer()->get(Producer::class); $re = $producer->produce($message); var_dump('消息发送成功@'); } } catch (\Exception $e) { var_dump('消息发送失败: ' . $e->getMessage()); } } else { try { //分发说,只记录自己对自己 if ($result['user_id'] == $val['user_id']) { $chatdata['receiver_id'] = $result['receiver_id']; $chatdata['user_id'] = $result['user_id']; $chatdata['group_receiver_id'] = $val['user_id']; $chatdata['is_read'] = 0; $chatdata['action'] = 'said'; $chatdata['group_name'] = $val['group_name']??''; $message = new MqProducer($chatdata); $producer = ContextApplicationContext::getContainer()->get(Producer::class); $re = $producer->produce($message); } //分发接收,不记录自己 if ($chatdata['user_id'] != $val['user_id']) { $chatdata['receiver_id'] = $result['receiver_id']; $chatdata['user_id'] = $val['user_id']; $chatdata['group_receiver_id'] = $result['user_id']; $chatdata['is_read'] = 0; $chatdata['action'] = 'recieved'; $chatdata['group_name'] = $val['group_name']??''; $message = new MqProducer($chatdata); $producer = ContextApplicationContext::getContainer()->get(Producer::class); $re = $producer->produce($message); var_dump('消息发送成功$'); } } catch (\Exception $e) { var_dump('消息发送失败: ' . $e->getMessage()); } } } } } } protected function handleVideoSignaling($server, $frame, $userId, $message) { $redisClient = new RedisService(); $receiverId = $message['receiver_id'] ?? null; if (!$receiverId) { $server->push($frame->fd, json_encode(['error' => 'Missing receiver_id'])); return; } // 获取接收方 fd $fd = $redisClient->findFd((int)$receiverId); // 判断接收者是否在线 if ($server->isEstablished((int)$fd)) { // 转发消息给接收方 $server->push((int)$fd, json_encode($message)); } else { // 可选:缓存或通知对方不在线 $server->push($frame->fd, json_encode([ 'type' => 'offline', 'receiver_id' => $receiverId, 'content' => '对方不在线' ])); } } public function retry($message): void { $maxRetries = 10; // 最大重试次数 $retryCount = 0; while ($retryCount < $maxRetries) { try { $producer = ContextApplicationContext::getContainer()->get(Producer::class); $re = $producer->produce($message); var_dump('重试消息发送成功'); break; } catch (\Exception $e) { $retryCount++; if ($retryCount >= $maxRetries) { var_dump('达到最大重试次数,消息发送失败: ' . $e->getMessage()); } else { var_dump('第 ' . $retryCount . ' 次重试失败: ' . $e->getMessage()); } } } } public function onClose($server, int $fd, int $reactorId): void { // var_dump('closed::::::::::::::::::', $fd, "======", $reactorId, "+++++++++++"); $redisClient = new RedisService(); $userId = $redisClient->findUser((string) $fd); // 清理连接信息 $this->redis->hDel('websocket:connections', (string)$fd); $this->redis->hDel('websocket:activity', (string)$fd); $this->redis->hDel('websocket:heartbeat', (string)$fd); $redisClient->unbind((string) $fd, (int) $userId); // 记录断开连接 $this->redis->lPush('websocket:disconnections', json_encode([ 'fd' => $fd, 'user_id' => $userId, 'time' => time() ])); } public function onOpen($server, $request): void { $token = $request->get['token']; $userInfo = $this->jwt->getClaimsByToken($token); $response = (new Response($server))->init($request); $fd = $response->getFd(); // 绑定连接 $server->bind($fd, $userInfo['uid']); $redisClient = new RedisService(); $redisClient->bind((string) $fd, $userInfo['uid']); // 更新连接信息 $this->updateConnection($fd, $userInfo['uid']); // 启动心跳检测 $this->startHeartbeat($server, $fd); // 发送连接成功消息 $server->push($request->fd, json_encode([ "event" => "connect", "content" => [ "ping_interval" => self::HEARTBEAT_INTERVAL, "ping_timeout" => self::CONNECTION_TIMEOUT, "content" => "连接成功", "fd" => $fd, "user_id" => $userInfo['uid'], "reconnect_attempts" => self::MAX_RECONNECT_ATTEMPTS ], ])); // 记录连接成功 $this->redis->lPush('websocket:connections', json_encode([ 'fd' => $fd, 'user_id' => $userInfo['uid'], 'time' => time() ])); } /** * 处理重连请求 */ private function handleReconnect($server, $frame): void { $data = json_decode($frame->data, true); $userId = $data['user_id'] ?? null; $attempt = $data['attempt'] ?? 1; if ($attempt > self::MAX_RECONNECT_ATTEMPTS) { $server->push($frame->fd, json_encode([ 'type' => 'reconnect_failed', 'message' => '重连次数超限' ])); return; } // 更新连接信息 $this->updateConnection($frame->fd, $userId); $server->push($frame->fd, json_encode([ 'type' => 'reconnect_success', 'attempt' => $attempt, 'message' => '重连成功' ])); } /** * 更新最后活动时间 */ private function updateLastActivity($fd): void { $this->redis->hSet('websocket:activity', (string)$fd, time()); } /** * 更新连接信息 */ private function updateConnection($fd, $userId): void { $connectionInfo = [ 'fd' => $fd, 'user_id' => $userId, 'connected_at' => time(), 'last_activity' => time(), 'status' => 'connected' ]; $this->redis->hSet('websocket:connections', (string)$fd, json_encode($connectionInfo)); } /** * 启动心跳检测 */ private function startHeartbeat($server, $fd): void { Coroutine::create(function() use ($server, $fd) { while (true) { // 检查连接是否还存在 if (!$server->isEstablished($fd)) { break; } // 发送ping $server->push($fd, 'ping'); // 等待pong响应 $this->waitForPong($server, $fd); // 等待下次心跳 Coroutine::sleep(self::HEARTBEAT_INTERVAL); } }); } /** * 等待pong响应 */ private function waitForPong($server, $fd): void { $startTime = time(); while (time() - $startTime < self::CONNECTION_TIMEOUT) { if (!$server->isEstablished($fd)) { return; } // 检查是否收到pong $pongTime = $this->redis->hGet('websocket:heartbeat', (string)$fd); if ($pongTime && (time() - $pongTime) < self::HEARTBEAT_INTERVAL) { return; // 收到pong响应 } Coroutine::sleep(1); } // 超时,断开连接 if ($server->isEstablished($fd)) { $server->close($fd); } } }