__handlePropertyHandler(__CLASS__); } #[Inject] protected JWT $jwt; /** * @var ChatServiceInterface */ #[Inject] private $chatServiceClient; /** * @Inject * @var ReceiveHandleService */ protected $receiveHandle; public function onMessage($server, $frame) : void { //把数据推给前端 $redisClient = new RedisService(); $userId = $redisClient->findUser((string) $frame->fd); var_dump("用户ID:::", $userId); //存入队列 $result = json_decode($frame->data, true); $result['user_id'] = $userId; var_dump("接收到的数据:", $result); $message = new MqProducer($result); $producer = ContextApplicationContext::getContainer()->get(Producer::class); $producer->produce($message); foreach ($server->connections as $fd) { if ($server->isEstablished($fd)) { var_dump($fd); $server->push($fd, $frame->data); } } } public function onClose($server, int $fd, int $reactorId) : void { var_dump('closed::::::::::::::::::', $fd, "======", $reactorId, "+++++++++++"); // $data = [ // 'fd'=>$fd // ]; // $this->chatServiceClient->delChatChannel($data); $redisClient = new RedisService(); $userId = $redisClient->findUser((string) $fd); $redisClient->unbind((string) $fd, (int) $userId); } public function onOpen($server, $request) : void { $token = $request->get['token']; $userInfo = $this->jwt->getClaimsByToken($token); $response = (new Response($server))->init($request); $fd = $response->getFd(); // var_dump("管道ID:",$fd); // $data = [ // 'user_id'=>$userInfo['uid'], // 'fd'=>$fd // ]; // var_dump(SERVER_RUN_ID,"+++++++++++++"); // $this->chatServiceClient->addChatChannel($data); $server->bind($fd, $userInfo['uid']); $redisClient = new RedisService(); $redisClient->bind((string) $fd, $userInfo['uid']); $server->push($request->fd, json_encode(["event" => "connect", "content" => ["ping_interval" => 20, "ping_timeout" => 20 * 3, "content" => "连接成功"]])); } }