rkljw 5 mesi fa
parent
commit
9599797ea7

+ 15 - 3
app/Amqp/Consumer/MqConsumer.php

@@ -57,7 +57,15 @@ class MqConsumer extends ConsumerMessage
             // 数据存储
             $this->logger->info('消费数据', ['data' => $data]);
             var_dump($data, '=================消费数据==============');
-            switch ($data['talk_type']) {
+            switch (intval($data['talk_type'])) {
+                case 1:
+                    $this->messageController->sendSingleChat($data);
+                    return Result::ACK;
+                    break;
+                case 2:
+                    $this->messageController->sendGroupChat($data);
+                    return Result::ACK;
+                    break;
                 case 100:
                     //审核资讯-通知所有的管理员
                     $this->messageController->sendAdminMessage($data);
@@ -152,19 +160,23 @@ class MqConsumer extends ConsumerMessage
                     return Result::ACK;
                     break;
             }
-        
+
             // 记录处理结果
             $this->logger->info("消费成功:", ['result' => $result]);
             return Result::ACK;
         } catch (\Exception $e) {
+            var_dump($e->getMessage(), '=================消费失败==============');
             // 记录错误
             $this->logger->error("消费失败:", ['error' => $e->getMessage()]);
             // 返回拒绝,重新入队
-            // return Result::REQUEUE;
+//             return Result::REQUEUE;
+            return Result::ACK;
         }
 
+
     }
 
+
     public function isEnable(): bool
     {
         return true;

+ 17 - 0
app/Controller/ChatController.php

@@ -739,6 +739,22 @@ class ChatController extends AbstractController
         }
         $requireData['user_id'] = Context::get("UserId");
         $requireData['type_id'] = Context::get("TypeId");
+        if($requireData['type_id']==10000 && $requireData['is_group']==1){
+            $url = env('IM_URL').'/api/v1/group/create';
+            $data = [
+                'avatar' => '',
+                'name' => $requireData['group_name'],
+                'profile'=>'',
+                'ids' => '',
+            ];
+            $options = [
+                'authorization'=>Context::get('Token')
+            ];
+            $res = PublicData::im_post($url,$data,$options);
+            if($res['code']==200){
+                $requireData['group_id'] = $res['data']['group_id'];
+            }
+        }
         $result = $this->chatServiceClient->addTopic($requireData);
         return $result && $result['code'] != 0 ? Result::success($result['data']) : Result::error($result['message']);
     }
@@ -1141,6 +1157,7 @@ class ChatController extends AbstractController
             return Result::error($errorMessage);
         }
         $requireData['user_id'] = Context::get("UserId");
+        $requireData['token'] = Context::get('Token');
         $result = $this->chatServiceClient->applyTopic($requireData);
         return $result && $result['code'] != 0 ? Result::success($result['data']) : Result::error($result['message']);
     }

+ 53 - 4
app/Controller/MessageController.php

@@ -31,12 +31,10 @@ class MessageController extends AbstractController
             // 获取 Swoole WebSocket Server 实例
             $server = \Hyperf\Context\ApplicationContext::getContainer()->get(\Swoole\Server::class);
             $redisClient = new RedisService();
-            var_dump($adminList, '-------------&&&&&&&&&&&&&&&&&&&&&&----');
             if ($adminList && isset($adminList['data'])) {
                 foreach ($adminList['data'] as $admin) {
                     $fd = $redisClient->findFd((int)$admin['id']);
                     if ($fd && $server->isEstablished((int)$fd)) {
-                        var_dump($fd, '-------------&&&&&&&&&&&&&&&&&&&&&&----');
                         $server->push((int)$fd, json_encode($data));
                     }
                 }
@@ -67,8 +65,6 @@ class MessageController extends AbstractController
             $server = \Hyperf\Context\ApplicationContext::getContainer()->get(\Swoole\Server::class);
             $redisClient = new RedisService();
             $data['user_id'] = json_decode($data['user_id'],true);
-            // var_dump($data['user_id'],"====================%%%%%%%%%%%====");
-            
             if(is_array($data['user_id'])){
                 foreach($data['user_id'] as $user_id){
                     $fd = $redisClient->findFd((int)$user_id);
@@ -83,5 +79,58 @@ class MessageController extends AbstractController
             return false;
         }
     }
+    /**
+     * 单聊
+     */
+    public function sendSingleChat($data){
+        $userInfo = $this->userServiceClient->getImContact([
+            'user_id' => $data['receiver_id'],
+            'friend_id' => $data['user_id'],
+        ]);
+//        var_dump("用户信息:",$userInfo);
+        $server = \Hyperf\Context\ApplicationContext::getContainer()->get(\Swoole\Server::class);
+        $redisClient = new RedisService();
+        $message = [
+            'talk_type' => 1,
+            'title' => $userInfo['data']['remark'],
+            'content' => $data['content'],
+            'messageType' => 1,
+            'receiver_id'=> $data['user_id'],
+//                    'user_id' => $user['user_id'] ?? '',
+            'time' => microtime(),
+        ];
+        $fd = $redisClient->findFd((int)$data['receiver_id']);
+        if ($fd && $server->isEstablished((int)$fd)) {
+            $server->push((int)$fd, json_encode($message));
+        }
+        return true;
+    }
+    /**
+     * 群聊
+     */
+    public function sendGroupChat($data){
+        $userList = $this->userServiceClient->getImGroupMember(['user_id' => $data['user_id'],'group_id' => $data['receiver_id']]);
+        // 获取 Swoole WebSocket Server 实例
+        $server = \Hyperf\Context\ApplicationContext::getContainer()->get(\Swoole\Server::class);
+        $redisClient = new RedisService();
+        if($userList && isset($userList['data'])){
+            foreach($userList['data'] as $user){
+                $message = [
+                    'talk_type' => 2,
+                    'title' => $user['group_name'],
+                    'content' => $data['content'],
+                    'messageType' => 1,
+                    'receiver_id'=> $data['receiver_id'],
+//                    'user_id' => $user['user_id'] ?? '',
+                    'time' => microtime(),
+                ];
+                $fd = $redisClient->findFd((int)$user['user_id']);
+                if ($fd && $server->isEstablished((int)$fd)) {
+                    $server->push((int)$fd, json_encode($message));
+                }
+            }
+        }
+    }
+
 
 }

+ 16 - 0
app/JsonRpc/UserService.php

@@ -188,4 +188,20 @@ class UserService extends AbstractServiceClient implements UserServiceInterface
     public function getTypeUserList(array $data){
         return $this->__request(__FUNCTION__, $data);
     }
+
+    /**
+     * @param array $data
+     * @return mixed
+     */
+    public function getImGroupMember(array $data){
+        return $this->__request(__FUNCTION__, $data);
+    }
+
+    /**
+     * @param array $data
+     * @return mixed
+     */
+    public function getImContact(array $data){
+        return $this->__request(__FUNCTION__, $data);
+    }
 }

+ 12 - 0
app/JsonRpc/UserServiceInterface.php

@@ -124,4 +124,16 @@ interface UserServiceInterface
      */
     public function getTypeUserList(array $data);
 
+    /**
+     * @param array $data
+     * @return mixed
+     */
+    public function getImGroupMember(array $data);
+    /**
+     * @param array $data
+     * @return mixed
+     */
+    public function getImContact(array $data);
+
+
 }

+ 1 - 0
app/Middleware/Auth/FooMiddleware.php

@@ -130,6 +130,7 @@ class FooMiddleware implements MiddlewareInterface
                 // var_dump("中间件:", $ver);
                 Context::set("UserId", $ver['uid']);
                 Context::set("TypeId", $ver['type_id']);
+                Context::set("Token", $header['token'][0]);
                 if ($ver) {
                     return $handler->handle($request);
                 }

+ 50 - 1
app/Tools/PublicData.php

@@ -157,7 +157,6 @@ class PublicData
      */
     public static function http_post_zp($url, $data, $options = [])
     {
-
         // 初始化CURL会话
         $ch = curl_init($url);
         var_dump("参数:",$data);
@@ -269,5 +268,55 @@ class PublicData
         return $data;
     }
 
+    public static function im_post($url, $data, $options = [])
+    {
+        // 初始化CURL会话
+        $ch = curl_init($url);
+        // JSON 编码(保持中文不转义)
+        $jsonBody = is_string($data) ? $data : json_encode($data, JSON_UNESCAPED_UNICODE);
+        // 基础选项
+        curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
+        curl_setopt($ch, CURLOPT_HEADER, false);
+        curl_setopt($ch, CURLOPT_POST, true);
+        curl_setopt($ch, CURLOPT_POSTFIELDS, $jsonBody);
+        // HTTPS 兼容
+        if (stripos($url, 'https://') === 0) {
+            curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
+            curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
+        }
+        // Header:Content-Type JSON,Authorization(优先使用传入的)
+        $authorization = $options['authorization'] ?? 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI';
+        $headers = [
+            'Content-Type: application/json',
+            'Accept: application/json',
+            'Authorization: ' . $authorization,
+        ];
+        // 允许外部追加自定义 header(数组,形如 ['X-xxx: yyy'])
+        if (!empty($options['headers']) && is_array($options['headers'])) {
+            $headers = array_merge($headers, $options['headers']);
+        }
+        curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
+        // 超时设置(可通过 options 覆盖)
+        curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, (int)($options['connect_timeout'] ?? 5));
+        curl_setopt($ch, CURLOPT_TIMEOUT, (int)($options['timeout'] ?? 15));
+        // 追加额外的 CURL 选项(如需)
+        if (!empty($options['curl']) && is_array($options['curl'])) {
+            curl_setopt_array($ch, $options['curl']);
+        }
+        // 执行请求
+        $response = curl_exec($ch);
+        if ($response === false) {
+            $errorMsg = curl_error($ch);
+            curl_close($ch);
+            throw new \Exception('CURL Error: ' . $errorMsg);
+        }
+        // 获取HTTP状态码
+        $httpCode = (int) curl_getinfo($ch, CURLINFO_HTTP_CODE);
+        // 关闭会话
+        curl_close($ch);
+        // 返回结果
+        return json_decode($response,true);
+    }
+
 
 }