MqConsumer.php 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. <?php
  2. declare (strict_types = 1);
  3. namespace App\Amqp\Consumer;
  4. use App\JsonRpc\ChatServiceInterface;
  5. use Hyperf\Amqp\Annotation\Consumer;
  6. use Hyperf\Amqp\Message\ConsumerMessage;
  7. use Hyperf\Amqp\Result;
  8. use Hyperf\Di\Annotation\Inject;
  9. use PhpAmqpLib\Message\AMQPMessage;
  10. use Psr\Log\LoggerInterface;
  11. use Hyperf\Redis\RedisFactory;
  12. use App\Controller\MessageController;
  13. #[Consumer(exchange: 'chatprod', routingKey: 'chatprod', queue: 'chatprod', name: "chatprod", nums: 1)]
  14. class MqConsumer extends ConsumerMessage
  15. {
  16. /**
  17. * @var ChatServiceInterface
  18. */
  19. #[Inject]
  20. private $chatServiceClient;
  21. /**
  22. * @var MessageController
  23. */
  24. #[Inject]
  25. protected MessageController $messageController;
  26. protected $logger;
  27. #[Inject]
  28. protected RedisFactory $redisFactory;
  29. public function __construct(LoggerInterface $logger)
  30. {
  31. $this->logger = $logger;
  32. }
  33. public function consumeMessage($data, AMQPMessage $message): Result
  34. {
  35. // $redis = $this->redisFactory->get('default');
  36. // $lockKey = 'mq_consumer_lock';
  37. // $lockValue = uniqid();
  38. // // 获取锁
  39. // $lockAcquired = $redis->setnx($lockKey, $lockValue);
  40. // if (!$lockAcquired) {
  41. // $this->logger->info('Another consumer instance is already running.');
  42. // $this->logger->info($redis->get($lockKey));
  43. // // return Result::ACK;
  44. // return Result::REQUEUE;
  45. // }
  46. // // 设置锁过期时间,防止死锁
  47. // $redis->expire($lockKey, 60);
  48. try {
  49. // 数据存储
  50. $this->logger->info('消费数据', ['data' => $data]);
  51. var_dump($data, '=================消费数据==============');
  52. switch ($data['talk_type']) {
  53. case 100:
  54. //审核资讯-通知所有的管理员
  55. $this->messageController->sendAdminMessage($data);
  56. return Result::ACK;
  57. break;
  58. case 101:
  59. //审核资讯-通知所有的管理员
  60. $this->messageController->sendUserMessage($data);
  61. return Result::ACK;
  62. break;
  63. case 200:
  64. //审核商品-通知所有的管理员
  65. $this->messageController->sendAdminMessage($data);
  66. return Result::ACK;
  67. break;
  68. case 201:
  69. //审核商品-通知发布者
  70. $this->messageController->sendUserMessage($data);
  71. return Result::ACK;
  72. break;
  73. case 300:
  74. $this->messageController->sendAdminMessage($data);
  75. return Result::ACK;
  76. break;
  77. case 301:
  78. $this->messageController->sendUserMessage($data);
  79. return Result::ACK;
  80. break;
  81. case 400:
  82. $this->messageController->sendAdminMessage($data);
  83. return Result::ACK;
  84. break;
  85. case 401:
  86. $this->messageController->sendUserMessage($data);
  87. return Result::ACK;
  88. break;
  89. case 500:
  90. $this->messageController->sendAdminMessage($data);
  91. return Result::ACK;
  92. break;
  93. case 501:
  94. $this->messageController->sendUserMessage($data);
  95. return Result::ACK;
  96. break;
  97. case 600:
  98. $this->messageController->sendAdminMessage($data);
  99. return Result::ACK;
  100. break;
  101. case 601:
  102. $this->messageController->sendUserMessage($data);
  103. return Result::ACK;
  104. break;
  105. case 700:
  106. $this->messageController->sendAdminMessage($data);
  107. return Result::ACK;
  108. break;
  109. case 701:
  110. $this->messageController->sendUserMessage($data);
  111. return Result::ACK;
  112. break;
  113. case 800:
  114. $this->messageController->sendAdminMessage($data);
  115. return Result::ACK;
  116. break;
  117. case 801:
  118. $this->messageController->sendUserMessage($data);
  119. return Result::ACK;
  120. break;
  121. case 802:
  122. $this->messageController->sendOtherUserMessage($data);
  123. return Result::ACK;
  124. break;
  125. case 900:
  126. $this->messageController->sendAdminMessage($data);
  127. return Result::ACK;
  128. break;
  129. case 901:
  130. $this->messageController->sendUserMessage($data);
  131. return Result::ACK;
  132. break;
  133. case 1000:
  134. $this->messageController->sendAdminMessage($data);
  135. return Result::ACK;
  136. break;
  137. case 1001:
  138. $this->messageController->sendUserMessage($data);
  139. return Result::ACK;
  140. break;
  141. default:
  142. // 调用数据处理服务
  143. $result = $this->chatServiceClient->addChatRecords($data);
  144. return Result::ACK;
  145. break;
  146. }
  147. // 记录处理结果
  148. $this->logger->info("消费成功:", ['result' => $result]);
  149. return Result::ACK;
  150. } catch (\Exception $e) {
  151. // 记录错误
  152. $this->logger->error("消费失败:", ['error' => $e->getMessage()]);
  153. // 返回拒绝,重新入队
  154. // return Result::REQUEUE;
  155. }
  156. }
  157. public function isEnable(): bool
  158. {
  159. return true;
  160. }
  161. }