MqConsumer.php 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. <?php
  2. declare (strict_types = 1);
  3. namespace App\Amqp\Consumer;
  4. use App\JsonRpc\ChatServiceInterface;
  5. use Hyperf\Amqp\Annotation\Consumer;
  6. use Hyperf\Amqp\Message\ConsumerMessage;
  7. use Hyperf\Amqp\Result;
  8. use Hyperf\Di\Annotation\Inject;
  9. use PhpAmqpLib\Message\AMQPMessage;
  10. use Psr\Log\LoggerInterface;
  11. use Hyperf\Redis\RedisFactory;
  12. use App\Controller\MessageController;
  13. #[Consumer(exchange: 'chatprod', routingKey: 'chatprod', queue: 'chatprod', name: "chatprod", nums: 1)]
  14. class MqConsumer extends ConsumerMessage
  15. {
  16. /**
  17. * @var ChatServiceInterface
  18. */
  19. #[Inject]
  20. private $chatServiceClient;
  21. /**
  22. * @var MessageController
  23. */
  24. #[Inject]
  25. protected MessageController $messageController;
  26. protected $logger;
  27. #[Inject]
  28. protected RedisFactory $redisFactory;
  29. public function __construct(LoggerInterface $logger)
  30. {
  31. $this->logger = $logger;
  32. }
  33. public function consumeMessage($data, AMQPMessage $message): Result
  34. {
  35. // $redis = $this->redisFactory->get('default');
  36. // $lockKey = 'mq_consumer_lock';
  37. // $lockValue = uniqid();
  38. // // 获取锁
  39. // $lockAcquired = $redis->setnx($lockKey, $lockValue);
  40. // if (!$lockAcquired) {
  41. // $this->logger->info('Another consumer instance is already running.');
  42. // $this->logger->info($redis->get($lockKey));
  43. // // return Result::ACK;
  44. // return Result::REQUEUE;
  45. // }
  46. // // 设置锁过期时间,防止死锁
  47. // $redis->expire($lockKey, 60);
  48. try {
  49. // 数据存储
  50. $this->logger->info('消费数据', ['data' => $data]);
  51. var_dump($data, '=================消费数据==============');
  52. switch ($data['talk_type']) {
  53. case 100:
  54. //审核资讯-通知所有的管理员
  55. $this->messageController->sendAdminMessage($data);
  56. return Result::ACK;
  57. break;
  58. default:
  59. // 调用数据处理服务
  60. $result = $this->chatServiceClient->addChatRecords($data);
  61. return Result::ACK;
  62. break;
  63. }
  64. // 记录处理结果
  65. $this->logger->info("消费成功:", ['result' => $result]);
  66. return Result::ACK;
  67. } catch (\Exception $e) {
  68. // 记录错误
  69. $this->logger->error("消费失败:", ['error' => $e->getMessage()]);
  70. // 返回拒绝,重新入队
  71. // return Result::REQUEUE;
  72. }
  73. }
  74. public function isEnable(): bool
  75. {
  76. return true;
  77. }
  78. }