MqConsumer.php 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. <?php
  2. declare (strict_types = 1);
  3. namespace App\Amqp\Consumer;
  4. use App\JsonRpc\ChatServiceInterface;
  5. use Hyperf\Amqp\Annotation\Consumer;
  6. use Hyperf\Amqp\Message\ConsumerMessage;
  7. use Hyperf\Amqp\Result;
  8. use Hyperf\Di\Annotation\Inject;
  9. use PhpAmqpLib\Message\AMQPMessage;
  10. use Psr\Log\LoggerInterface;
  11. use Hyperf\Redis\RedisFactory;
  12. use App\Controller\MessageController;
  13. #[Consumer(exchange: 'chatprod', routingKey: 'chatprod', queue: 'chatprod', name: "chatprod", nums: 1)]
  14. class MqConsumer extends ConsumerMessage
  15. {
  16. /**
  17. * @var ChatServiceInterface
  18. */
  19. #[Inject]
  20. private $chatServiceClient;
  21. /**
  22. * @var MessageController
  23. */
  24. #[Inject]
  25. protected MessageController $messageController;
  26. protected $logger;
  27. #[Inject]
  28. protected RedisFactory $redisFactory;
  29. public function __construct(LoggerInterface $logger)
  30. {
  31. $this->logger = $logger;
  32. }
  33. public function consumeMessage($data, AMQPMessage $message): Result
  34. {
  35. // $redis = $this->redisFactory->get('default');
  36. // $lockKey = 'mq_consumer_lock';
  37. // $lockValue = uniqid();
  38. // // 获取锁
  39. // $lockAcquired = $redis->setnx($lockKey, $lockValue);
  40. // if (!$lockAcquired) {
  41. // $this->logger->info('Another consumer instance is already running.');
  42. // $this->logger->info($redis->get($lockKey));
  43. // // return Result::ACK;
  44. // return Result::REQUEUE;
  45. // }
  46. // // 设置锁过期时间,防止死锁
  47. // $redis->expire($lockKey, 60);
  48. try {
  49. // 数据存储
  50. $this->logger->info('消费数据', ['data' => $data]);
  51. var_dump($data, '=================消费数据==============');
  52. switch (intval($data['talk_type'])) {
  53. case 1:
  54. $this->messageController->sendSingleChat($data);
  55. return Result::ACK;
  56. break;
  57. case 2:
  58. $this->messageController->sendGroupChat($data);
  59. return Result::ACK;
  60. break;
  61. case 100:
  62. //审核资讯-通知所有的管理员
  63. $this->messageController->sendAdminMessage($data);
  64. return Result::ACK;
  65. break;
  66. case 101:
  67. //审核资讯-通知所有的管理员
  68. $this->messageController->sendUserMessage($data);
  69. return Result::ACK;
  70. break;
  71. case 200:
  72. //审核商品-通知所有的管理员
  73. $this->messageController->sendAdminMessage($data);
  74. return Result::ACK;
  75. break;
  76. case 201:
  77. //审核商品-通知发布者
  78. $this->messageController->sendUserMessage($data);
  79. return Result::ACK;
  80. break;
  81. case 300:
  82. $this->messageController->sendAdminMessage($data);
  83. return Result::ACK;
  84. break;
  85. case 301:
  86. $this->messageController->sendUserMessage($data);
  87. return Result::ACK;
  88. break;
  89. case 400:
  90. $this->messageController->sendAdminMessage($data);
  91. return Result::ACK;
  92. break;
  93. case 401:
  94. $this->messageController->sendUserMessage($data);
  95. return Result::ACK;
  96. break;
  97. case 500:
  98. $this->messageController->sendAdminMessage($data);
  99. return Result::ACK;
  100. break;
  101. case 501:
  102. $this->messageController->sendUserMessage($data);
  103. return Result::ACK;
  104. break;
  105. case 600:
  106. $this->messageController->sendAdminMessage($data);
  107. return Result::ACK;
  108. break;
  109. case 601:
  110. $this->messageController->sendUserMessage($data);
  111. return Result::ACK;
  112. break;
  113. case 700:
  114. $this->messageController->sendAdminMessage($data);
  115. return Result::ACK;
  116. break;
  117. case 701:
  118. $this->messageController->sendUserMessage($data);
  119. return Result::ACK;
  120. break;
  121. case 800:
  122. $this->messageController->sendAdminMessage($data);
  123. return Result::ACK;
  124. break;
  125. case 801:
  126. $this->messageController->sendUserMessage($data);
  127. return Result::ACK;
  128. break;
  129. case 802:
  130. $this->messageController->sendOtherUserMessage($data);
  131. return Result::ACK;
  132. break;
  133. case 900:
  134. $this->messageController->sendAdminMessage($data);
  135. return Result::ACK;
  136. break;
  137. case 901:
  138. $this->messageController->sendUserMessage($data);
  139. return Result::ACK;
  140. break;
  141. case 1000:
  142. $this->messageController->sendAdminMessage($data);
  143. return Result::ACK;
  144. break;
  145. case 1001:
  146. $this->messageController->sendUserMessage($data);
  147. return Result::ACK;
  148. break;
  149. case 1002:
  150. $this->messageController->sendUserMessage($data);
  151. return Result::ACK;
  152. break;
  153. default:
  154. // 调用数据处理服务
  155. $result = $this->chatServiceClient->addChatRecords($data);
  156. return Result::ACK;
  157. break;
  158. }
  159. // 记录处理结果
  160. $this->logger->info("消费成功:", ['result' => $result]);
  161. return Result::ACK;
  162. } catch (\Exception $e) {
  163. var_dump($e->getMessage(), '=================消费失败==============');
  164. // 记录错误
  165. $this->logger->error("消费失败:", ['error' => $e->getMessage()]);
  166. // 返回拒绝,重新入队
  167. // return Result::REQUEUE;
  168. return Result::ACK;
  169. }
  170. }
  171. public function isEnable(): bool
  172. {
  173. return true;
  174. }
  175. }