Browse Source

Merge branch 'master' of http://git.bjzxtw.org.cn:3000/zxt/admin_consumer

LiuJ 2 days ago
parent
commit
80564ba2f1

+ 102 - 3
app/Amqp/Consumer/MqConsumer.php

@@ -12,7 +12,7 @@ use Hyperf\Di\Annotation\Inject;
 use PhpAmqpLib\Message\AMQPMessage;
 use Psr\Log\LoggerInterface;
 use Hyperf\Redis\RedisFactory;
-
+use App\Controller\MessageController;
 #[Consumer(exchange: 'chatprod', routingKey: 'chatprod', queue: 'chatprod', name: "chatprod", nums: 1)]
 class MqConsumer extends ConsumerMessage
 {
@@ -21,6 +21,11 @@ class MqConsumer extends ConsumerMessage
      */
     #[Inject]
     private $chatServiceClient;
+    /**
+     * @var MessageController
+     */
+    #[Inject]
+    protected MessageController $messageController;
 
     protected $logger;
     #[Inject]
@@ -52,8 +57,102 @@ class MqConsumer extends ConsumerMessage
             // 数据存储
             $this->logger->info('消费数据', ['data' => $data]);
             var_dump($data, '=================消费数据==============');
-            // 调用数据处理服务
-            $result = $this->chatServiceClient->addChatRecords($data);
+            switch ($data['talk_type']) {
+                case 100:
+                    //审核资讯-通知所有的管理员
+                    $this->messageController->sendAdminMessage($data);
+                    return Result::ACK;
+                    break;
+                case 101:
+                    //审核资讯-通知所有的管理员
+                    $this->messageController->sendUserMessage($data);
+                    return Result::ACK;
+                    break;
+                case 200:
+                    //审核商品-通知所有的管理员
+                    $this->messageController->sendAdminMessage($data);
+                    return Result::ACK;
+                    break;
+                case 201:
+                    //审核商品-通知发布者
+                    $this->messageController->sendUserMessage($data);
+                    return Result::ACK;
+                    break;
+                case 300:
+                    $this->messageController->sendAdminMessage($data);
+                    return Result::ACK;
+                    break;
+                case 301:
+                    $this->messageController->sendUserMessage($data);
+                    return Result::ACK;
+                    break;
+                case 400:
+                    $this->messageController->sendAdminMessage($data);
+                    return Result::ACK;
+                    break;
+                case 401:
+                    $this->messageController->sendUserMessage($data);
+                    return Result::ACK;
+                    break;
+                case 500:
+                    $this->messageController->sendAdminMessage($data);
+                    return Result::ACK;
+                    break;
+                case 501:
+                    $this->messageController->sendUserMessage($data);
+                    return Result::ACK;
+                    break;
+                case 600:
+                    $this->messageController->sendAdminMessage($data);
+                    return Result::ACK;
+                    break;
+                case 601:
+                    $this->messageController->sendUserMessage($data);
+                    return Result::ACK;
+                    break;
+                case 700:
+                    $this->messageController->sendAdminMessage($data);
+                    return Result::ACK;
+                    break;
+                case 701:
+                    $this->messageController->sendUserMessage($data);
+                    return Result::ACK;
+                    break;
+                case 800:
+                    $this->messageController->sendAdminMessage($data);
+                    return Result::ACK;
+                    break;
+                case 801:
+                    $this->messageController->sendUserMessage($data);
+                    return Result::ACK;
+                    break;
+                case 802:
+                        $this->messageController->sendOtherUserMessage($data);
+                        return Result::ACK;
+                        break;
+                case 900:
+                    $this->messageController->sendAdminMessage($data);
+                    return Result::ACK;
+                    break;
+                case 901:
+                    $this->messageController->sendUserMessage($data);
+                    return Result::ACK;
+                    break;
+                case 1000:
+                    $this->messageController->sendAdminMessage($data);
+                    return Result::ACK;
+                    break;
+                case 1001:
+                    $this->messageController->sendUserMessage($data);
+                    return Result::ACK;
+                    break;
+                default:
+                    // 调用数据处理服务
+                    $result = $this->chatServiceClient->addChatRecords($data);
+                    return Result::ACK;
+                    break;
+            }
+        
             // 记录处理结果
             $this->logger->info("消费成功:", ['result' => $result]);
             return Result::ACK;

+ 2 - 0
app/Controller/ChatController.php

@@ -738,6 +738,7 @@ class ChatController extends AbstractController
             return Result::error($errorMessage);
         }
         $requireData['user_id'] = Context::get("UserId");
+        $requireData['type_id'] = Context::get("TypeId");
         $result = $this->chatServiceClient->addTopic($requireData);
         return $result && $result['code'] != 0 ? Result::success($result['data']) : Result::error($result['message']);
     }
@@ -764,6 +765,7 @@ class ChatController extends AbstractController
             return Result::error($errorMessage);
         }
         $requireData['user_id'] = Context::get("UserId");
+        $requireData['type_id'] = Context::get("TypeId");
         $result = $this->chatServiceClient->updateTopic($requireData);
         return $result && $result['code'] != 0 ? Result::success($result['data']) : Result::error($result['message']);
     }

+ 86 - 0
app/Controller/MessageController.php

@@ -0,0 +1,86 @@
+<?php
+
+declare(strict_types=1);
+namespace App\Controller;
+
+/**
+ * MessageController
+ * @package App\Controller
+ */
+use App\JsonRpc\UserServiceInterface;
+use Hyperf\Di\Annotation\Inject;
+use Hyperf\Context\ApplicationContext;
+use Swoole\WebSocket\Server as WebSocketServer;
+use App\Service\RedisService;
+use Hyperf\Server\ServerManager;
+use function Hyperf\Support\call;
+use swoole\Server;
+class MessageController extends AbstractController
+{
+
+    /**
+     * @var UserServiceInterface
+     */
+    #[Inject]
+    private $userServiceClient;
+    public function sendAdminMessage($data)
+    {
+        try {
+            // 获取管理员列表
+            $adminList = $this->userServiceClient->getTypeUserList(['type_id' => 10000]);
+            // 获取 Swoole WebSocket Server 实例
+            $server = \Hyperf\Context\ApplicationContext::getContainer()->get(\Swoole\Server::class);
+            $redisClient = new RedisService();
+    
+            if ($adminList && isset($adminList['data'])) {
+                foreach ($adminList['data'] as $admin) {
+                    $fd = $redisClient->findFd((int)$admin['id']);
+                    if ($fd && $server->isEstablished((int)$fd)) {
+                        $server->push((int)$fd, json_encode($data));
+                    }
+                }
+            }
+            return true;
+        } catch (\Throwable $e) {
+            var_dump('发送消息错误: ' . $e->getMessage());
+            return false;
+        }
+    }
+    public function sendUserMessage($data){
+        try { 
+            // 获取 Swoole WebSocket Server 实例
+            $server = \Hyperf\Context\ApplicationContext::getContainer()->get(\Swoole\Server::class);
+            $redisClient = new RedisService();
+            $fd = $redisClient->findFd((int)$data['user_id']);
+            if ($fd && $server->isEstablished((int)$fd)) {
+                $server->push((int)$fd, json_encode($data));
+            }
+        } catch (\Throwable $e) {
+            var_dump('发送消息错误: ' . $e->getMessage());
+            return false;
+        }
+    }
+    public function sendOtherUserMessage($data){
+        try {
+            // 获取 Swoole WebSocket Server 实例
+            $server = \Hyperf\Context\ApplicationContext::getContainer()->get(\Swoole\Server::class);
+            $redisClient = new RedisService();
+            $data['user_id'] = json_decode($data['user_id'],true);
+            // var_dump($data['user_id'],"====================%%%%%%%%%%%====");
+            
+            if(is_array($data['user_id'])){
+                foreach($data['user_id'] as $user_id){
+                    $fd = $redisClient->findFd((int)$user_id);
+                    if ($fd && $server->isEstablished((int)$fd)) {
+                        $server->push((int)$fd, json_encode($data));
+                    }
+                }
+            }
+           
+        } catch (\Throwable $e) {
+            var_dump('发送消息错误: ' . $e->getMessage());
+            return false;
+        }
+    }
+
+}

+ 3 - 3
app/Controller/NewsController.php

@@ -104,7 +104,7 @@ class NewsController extends AbstractController
         //获取用户信息
         $user_id = Context::get("UserId");
         $userInfo = $this->userServiceClient->getUserInfo((int)$user_id);
-        var_dump("用户信息:", $userInfo);
+        // var_dump("用户信息:", $userInfo);
         $sszq = $userInfo['data']['sszq'];
         $type_id = $userInfo['data']['type_id']; //'1:个人会员 2:政务会员 3:企业会员 4:调研员 10000:管理员 20000:游客(小程序)'
         //管理员看所有
@@ -253,7 +253,7 @@ class NewsController extends AbstractController
         //获取用户信息
         $user_id = Context::get("UserId");
         $userInfo = $this->userServiceClient->getUserInfo((int)$user_id);
-        var_dump("用户信息:", $userInfo);
+        // var_dump("用户信息:", $userInfo);
         $sszq = $userInfo['data']['sszq'];
         //获取websiteid
         // $groupInfo = $this->WebsiteServiceClient->getWebsiteGroupInfo(['id' => $sszq]);
@@ -569,7 +569,7 @@ class NewsController extends AbstractController
         );
         $user_id = Context::get("UserId");
         $userInfo = $this->userServiceClient->getUserInfo((int)$user_id);
-        var_dump("用户信息:", $userInfo);
+        // var_dump("用户信息:", $userInfo);
         $type_id = $userInfo['data']['type_id']; //'1:个人会员 2:政务会员 3:企业会员 4:调研员 10000:管理员 20000:游客(小程序)'
         $requireData['user_type_id'] = $type_id; //重名了
         $requireData['user_id'] = $user_id;

+ 0 - 3
app/Controller/PublicController.php

@@ -905,7 +905,6 @@ class PublicController extends AbstractController
                 'sector_name'=>'required|string',
                 'page_type'=>'required',
                 'sector_img'=>'required|string',
-                'size_id'=>'required|integer',
             ],
             [
                 // 'id.required' => 'id 不能为空',
@@ -917,8 +916,6 @@ class PublicController extends AbstractController
                 'sector_img.required' => '通栏图片不能为空',
                 'sector_img.string' => '通栏代码必须是字符串',
                 'page_type.required' => '页面类型不能为空',
-                'size_id.required' => '尺寸id不能为空',
-                'size_id.integer' => '尺寸id必须是整数',
             ]
         );
          if ($validator->fails()) {

+ 183 - 22
app/Controller/WebSocketController.php

@@ -17,54 +17,82 @@ use Hyperf\Contract\OnOpenInterface;
 use Hyperf\Di\Annotation\Inject;
 
 use Hyperf\Engine\WebSocket\Response;
+use Hyperf\Redis\Redis;
 use Phper666\JWTAuth\JWT;
 use swoole\Server;
+use Hyperf\Coroutine\Coroutine;
 
 class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
 {
-
     #[Inject]
     protected JWT $jwt;
-    /**
-     * @var ChatServiceInterface
-     */
+    
     #[Inject]
-    private $chatServiceClient;
-    /**
-     * @var UserServiceInterface
-     */
+    private ChatServiceInterface $chatServiceClient;
+    
     #[Inject]
-    private $userServiceClient;
+    private UserServiceInterface $userServiceClient;
 
-    /**
-     * @Inject
-     * @var ReceiveHandleService
-     */
-    protected $receiveHandle;
+    #[Inject]
+    protected ReceiveHandleService $receiveHandle;
+    
+    #[Inject]
+    protected Redis $redis;
+    
     protected $server;
-
+    
+    // 持久连接配置
+    private const HEARTBEAT_INTERVAL = 30; // 心跳间隔(秒)
+    private const CONNECTION_TIMEOUT = 90;  // 连接超时(秒)
+    private const MAX_RECONNECT_ATTEMPTS = 5; // 最大重连次数
+    
+    // 存储连接信息
+    private static $connections = [];
+    private static $heartbeatTasks = [];
 
     public function onMessage($server, $frame): void
     {
-        //把数据推给前端
         $redisClient = new RedisService();
         $userId = $redisClient->findUser((string) $frame->fd);
+        
+        // 处理心跳消息
+        if ($frame->data === 'ping') {
+            $server->push($frame->fd, 'pong');
+            $this->updateLastActivity($frame->fd);
+            return;
+        }
+        
+        if ($frame->data === 'pong') {
+            $this->updateLastActivity($frame->fd);
+            $this->redis->hSet('websocket:heartbeat', (string)$frame->fd, time());
+            return;
+        }
+        
+        // 处理重连请求
+        if (strpos($frame->data, 'reconnect') === 0) {
+            $this->handleReconnect($server, $frame);
+            return;
+        }
+        
         //存入队列
         $result = json_decode($frame->data, true);
         $result['user_id'] = $userId;
+        
+        // 更新最后活动时间
+        $this->updateLastActivity($frame->fd);
 
         // $show_id = $result['show_id'];
 
-        var_dump($result, '-------------1----');
+        // var_dump($result, '-------------1----');
         $userInfo = $redisClient->getUserInfo((string) $userId);
-        var_dump($userInfo, '-------------22----');
+        // var_dump($userInfo, '-------------22----');
         if ($userInfo) {
             $userInfoArr = json_decode($userInfo);
             $result['user_avatar'] = $userInfoArr['avatar'];
             $result['user_name'] = $userInfoArr['user_name'];
         } else {
             $userInfos = $this->userServiceClient->getUserInfo((int) $userId);
-            var_dump($userInfos, '-------------33----');
+            // var_dump($userInfos, '-------------33----');
             $redisClient->setUserInfo($userId, $userInfos['data']);
             $result['user_avatar'] = $userInfos['data']['avatar'];
             $result['user_name'] = $userInfos['data']['user_name'];
@@ -175,7 +203,7 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
         } else if (isset($result['talk_type']) && $result['talk_type'] == 2) {
             //根据群找到 群用户,群发一遍消息
             $groupUserList = $this->chatServiceClient->getGroupMembers(['group_id' => $result['receiver_id']]);
-            var_dump($groupUserList['data'], '-----------------############-----------------');
+            // var_dump($groupUserList['data'], '-----------------############-----------------');
             if ($groupUserList['data']) {
                 $chatdata = $result;
                 foreach ($groupUserList['data'] as $val) {
@@ -296,7 +324,20 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
         // var_dump('closed::::::::::::::::::', $fd, "======", $reactorId, "+++++++++++");
         $redisClient = new RedisService();
         $userId = $redisClient->findUser((string) $fd);
+        
+        // 清理连接信息
+        $this->redis->hDel('websocket:connections', (string)$fd);
+        $this->redis->hDel('websocket:activity', (string)$fd);
+        $this->redis->hDel('websocket:heartbeat', (string)$fd);
+        
         $redisClient->unbind((string) $fd, (int) $userId);
+        
+        // 记录断开连接
+        $this->redis->lPush('websocket:disconnections', json_encode([
+            'fd' => $fd,
+            'user_id' => $userId,
+            'time' => time()
+        ]));
     }
     public function onOpen($server, $request): void
     {
@@ -304,18 +345,138 @@ class WebSocketController implements OnMessageInterface, OnOpenInterface, OnClos
         $userInfo = $this->jwt->getClaimsByToken($token);
         $response = (new Response($server))->init($request);
         $fd = $response->getFd();
+        
+        // 绑定连接
         $server->bind($fd, $userInfo['uid']);
         $redisClient = new RedisService();
         $redisClient->bind((string) $fd, $userInfo['uid']);
+        
+        // 更新连接信息
+        $this->updateConnection($fd, $userInfo['uid']);
+        
+        // 启动心跳检测
+        $this->startHeartbeat($server, $fd);
+        
+        // 发送连接成功消息
         $server->push($request->fd, json_encode([
             "event" => "connect",
             "content" => [
-                "ping_interval" => 20,
-                "ping_timeout" => 20 * 3,
+                "ping_interval" => self::HEARTBEAT_INTERVAL,
+                "ping_timeout" => self::CONNECTION_TIMEOUT,
                 "content" => "连接成功",
                 "fd" => $fd,
                 "user_id" => $userInfo['uid'],
+                "reconnect_attempts" => self::MAX_RECONNECT_ATTEMPTS
             ],
         ]));
+        
+        // 记录连接成功
+        $this->redis->lPush('websocket:connections', json_encode([
+            'fd' => $fd,
+            'user_id' => $userInfo['uid'],
+            'time' => time()
+        ]));
+    }
+    
+    /**
+     * 处理重连请求
+     */
+    private function handleReconnect($server, $frame): void
+    {
+        $data = json_decode($frame->data, true);
+        $userId = $data['user_id'] ?? null;
+        $attempt = $data['attempt'] ?? 1;
+        
+        if ($attempt > self::MAX_RECONNECT_ATTEMPTS) {
+            $server->push($frame->fd, json_encode([
+                'type' => 'reconnect_failed',
+                'message' => '重连次数超限'
+            ]));
+            return;
+        }
+        
+        // 更新连接信息
+        $this->updateConnection($frame->fd, $userId);
+        
+        $server->push($frame->fd, json_encode([
+            'type' => 'reconnect_success',
+            'attempt' => $attempt,
+            'message' => '重连成功'
+        ]));
+    }
+    
+    /**
+     * 更新最后活动时间
+     */
+    private function updateLastActivity($fd): void
+    {
+        $this->redis->hSet('websocket:activity', (string)$fd, time());
+    }
+    
+    /**
+     * 更新连接信息
+     */
+    private function updateConnection($fd, $userId): void
+    {
+        $connectionInfo = [
+            'fd' => $fd,
+            'user_id' => $userId,
+            'connected_at' => time(),
+            'last_activity' => time(),
+            'status' => 'connected'
+        ];
+        
+        $this->redis->hSet('websocket:connections', (string)$fd, json_encode($connectionInfo));
+    }
+    
+    /**
+     * 启动心跳检测
+     */
+    private function startHeartbeat($server, $fd): void
+    {
+        Coroutine::create(function() use ($server, $fd) {
+            while (true) {
+                // 检查连接是否还存在
+                if (!$server->isEstablished($fd)) {
+                    break;
+                }
+                
+                // 发送ping
+                $server->push($fd, 'ping');
+                
+                // 等待pong响应
+                $this->waitForPong($server, $fd);
+                
+                // 等待下次心跳
+                Coroutine::sleep(self::HEARTBEAT_INTERVAL);
+            }
+        });
+    }
+    
+    /**
+     * 等待pong响应
+     */
+    private function waitForPong($server, $fd): void
+    {
+        $startTime = time();
+        
+        while (time() - $startTime < self::CONNECTION_TIMEOUT) {
+            if (!$server->isEstablished($fd)) {
+                return;
+            }
+            
+            // 检查是否收到pong
+            $pongTime = $this->redis->hGet('websocket:heartbeat', (string)$fd);
+            if ($pongTime && (time() - $pongTime) < self::HEARTBEAT_INTERVAL) {
+                return; // 收到pong响应
+            }
+            
+            Coroutine::sleep(1);
+        }
+        
+        // 超时,断开连接
+        if ($server->isEstablished($fd)) {
+            $server->close($fd);
+        }
     }
 }

+ 2 - 2
app/Controller/WebsiteController.php

@@ -1686,7 +1686,7 @@ class WebsiteController extends AbstractController
         $user_id = Context::get("UserId");
         var_dump("用户id:", $user_id);
         $userInfo = $this->userServiceClient->getUserInfo((int)$user_id);
-        var_dump("用户信息:", $userInfo);
+        // var_dump("用户信息:", $userInfo);
         $sszq = $userInfo['data']['sszq'];
         $type_id = $userInfo['data']['type_id'];
         //  if($type_id != 10000){
@@ -1733,7 +1733,7 @@ class WebsiteController extends AbstractController
         $user_id = Context::get("UserId");
         var_dump("用户id:", $user_id);
         $userInfo = $this->userServiceClient->getUserInfo((int)$user_id);
-        var_dump("用户信息:", $userInfo);
+        // var_dump("用户信息:", $userInfo);
         $sszq = $userInfo['data']['sszq'];
         $type_id = $userInfo['data']['type_id'];
         $sszqarr = '';

+ 7 - 0
app/JsonRpc/UserService.php

@@ -181,4 +181,11 @@ class UserService extends AbstractServiceClient implements UserServiceInterface
     public function getWebsiteGroupInfo(array $data){
         return $this->__request(__FUNCTION__, $data);
     }
+    /**
+     * @param array $data
+     * @return array|mixed
+     */
+    public function getTypeUserList(array $data){
+        return $this->__request(__FUNCTION__, $data);
+    }
 }

+ 4 - 0
app/JsonRpc/UserServiceInterface.php

@@ -119,5 +119,9 @@ interface UserServiceInterface
      * @return mixed
      */
     public function getWebsiteGroupInfo(array $data);
+    /**
+     * @param array $data
+     */
+    public function getTypeUserList(array $data);
 
 }

+ 87 - 87
app/Service/Message/ReceiveHandleService.php

@@ -1,87 +1,87 @@
-<?php
-declare(strict_types=1);
-
-namespace App\Service\Message;
-
-
-use App\Service\SocketClientService;
-
-use Swoole\Http\Response;
-use Swoole\WebSocket\Frame;
-use Swoole\WebSocket\Server;
-
-class ReceiveHandleService
-{
-    /**
-     * @var SocketClientService
-     */
-    private $client;
-
-    // 消息事件绑定
-    const EVENTS = [
-        EVENT_TALK          => 'onTalk',
-        EVENT_TALK_KEYBOARD => 'onKeyboard',
-    ];
-
-    /**
-     * ReceiveHandleService constructor.
-     *
-     * @param SocketClientService $client
-     */
-//    public function __construct(SocketClientService $client)
-//    {
-//        $this->client = $client;
-//    }
-
-    /**
-     * 对话文本消息
-     *
-     * @param Response|Server $server
-     * @param Frame           $frame
-     * @param array|string    $data 解析后数据
-     * @return void
-     */
-    public function onTalk($server, Frame $frame, $data)
-    {
-
-        var_dump("测试数据发送==========");
-        return true;
-//        $user_id = $this->client->findFdUserId($frame->fd);
-//        if ($user_id != $data['sender_id']) return;
-//
-//        // 验证消息类型
-//        if (!in_array($data['talk_type'], TalkModeConstant::getTypes())) return;
-//
-//        // 验证发送消息用户与接受消息用户之间是否存在好友或群聊关系
-//        $isTrue = UserRelation::isFriendOrGroupMember($user_id, (int)$data['receiver_id'], (int)$data['talk_type']);
-//        if (!$isTrue) {
-//            $server->push($frame->fd, json_encode(['event_error', [
-//                'message' => '暂不属于好友关系或群聊成员,无法发送聊天消息!'
-//            ]]));
-//            return;
-//        }
-
-//        di()->get(TalkMessageService::class)->insertText([
-//            'talk_type'   => $data['talk_type'],
-//            'user_id'     => $data['sender_id'],
-//            'receiver_id' => $data['receiver_id'],
-//            'content'     => $data['text_message'],
-//        ]);
-    }
-
-    /**
-     * 键盘输入消息
-     *
-     * @param Response|Server $server
-     * @param Frame           $frame
-     * @param array|string    $data 解析后数据
-     * @return void
-     */
-//    public function onKeyboard($server, Frame $frame, $data)
-//    {
-//        event()->dispatch(new TalkEvent(TalkEventConstant::EVENT_TALK_KEYBOARD, [
-//            'sender_id'   => (int)$data['sender_id'],
-//            'receiver_id' => (int)$data['receiver_id'],
-//        ]));
-//    }
-}
+<?php
+declare(strict_types=1);
+
+namespace App\Service\Message;
+
+
+use App\Service\SocketClientService;
+
+use Swoole\Http\Response;
+use Swoole\WebSocket\Frame;
+use Swoole\WebSocket\Server;
+
+class ReceiveHandleService
+{
+    /**
+     * @var SocketClientService
+     */
+    private $client;
+
+    // 消息事件绑定
+    const EVENTS = [
+        'onTalk'          => 'onTalk',
+        'onKeyboard' => 'onKeyboard',
+    ];
+
+    /**
+     * ReceiveHandleService constructor.
+     *
+     * @param SocketClientService $client
+     */
+//    public function __construct(SocketClientService $client)
+//    {
+//        $this->client = $client;
+//    }
+
+    /**
+     * 对话文本消息
+     *
+     * @param Response|Server $server
+     * @param Frame           $frame
+     * @param array|string    $data 解析后数据
+     * @return void
+     */
+    public function onTalk($server, Frame $frame, $data)
+    {
+
+        var_dump("测试数据发送==========");
+        return true;
+//        $user_id = $this->client->findFdUserId($frame->fd);
+//        if ($user_id != $data['sender_id']) return;
+//
+//        // 验证消息类型
+//        if (!in_array($data['talk_type'], TalkModeConstant::getTypes())) return;
+//
+//        // 验证发送消息用户与接受消息用户之间是否存在好友或群聊关系
+//        $isTrue = UserRelation::isFriendOrGroupMember($user_id, (int)$data['receiver_id'], (int)$data['talk_type']);
+//        if (!$isTrue) {
+//            $server->push($frame->fd, json_encode(['event_error', [
+//                'message' => '暂不属于好友关系或群聊成员,无法发送聊天消息!'
+//            ]]));
+//            return;
+//        }
+
+//        di()->get(TalkMessageService::class)->insertText([
+//            'talk_type'   => $data['talk_type'],
+//            'user_id'     => $data['sender_id'],
+//            'receiver_id' => $data['receiver_id'],
+//            'content'     => $data['text_message'],
+//        ]);
+    }
+
+    /**
+     * 键盘输入消息
+     *
+     * @param Response|Server $server
+     * @param Frame           $frame
+     * @param array|string    $data 解析后数据
+     * @return void
+     */
+//    public function onKeyboard($server, Frame $frame, $data)
+//    {
+//        event()->dispatch(new TalkEvent(TalkEventConstant::EVENT_TALK_KEYBOARD, [
+//            'sender_id'   => (int)$data['sender_id'],
+//            'receiver_id' => (int)$data['receiver_id'],
+//        ]));
+//    }
+}