MqConsumer.php 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. <?php
  2. declare (strict_types = 1);
  3. namespace App\Amqp\Consumer;
  4. use App\JsonRpc\ChatServiceInterface;
  5. use Hyperf\Amqp\Annotation\Consumer;
  6. use Hyperf\Amqp\Message\ConsumerMessage;
  7. use Hyperf\Amqp\Result;
  8. use Hyperf\Di\Annotation\Inject;
  9. use PhpAmqpLib\Message\AMQPMessage;
  10. use Psr\Log\LoggerInterface;
  11. use Hyperf\Redis\RedisFactory;
  12. #[Consumer(exchange: 'hyperf', routingKey: 'hyperf', queue: 'hyperf', name: "MqConsumer", nums: 1)]
  13. class MqConsumer extends ConsumerMessage
  14. {
  15. /**
  16. * @var ChatServiceInterface
  17. */
  18. #[Inject]
  19. private $chatServiceClient;
  20. protected $logger;
  21. #[Inject]
  22. protected RedisFactory $redisFactory;
  23. public function __construct(LoggerInterface $logger)
  24. {
  25. $this->logger = $logger;
  26. }
  27. public function consumeMessage($data, AMQPMessage $message): Result
  28. {
  29. $redis = $this->redisFactory->get('default');
  30. $lockKey = 'mq_consumer_lock';
  31. $lockValue = uniqid();
  32. // 获取锁
  33. $lockAcquired = $redis->setnx($lockKey, $lockValue);
  34. if (!$lockAcquired) {
  35. $this->logger->info('Another consumer instance is already running.');
  36. var_dump($redis->get($lockKey));
  37. return Result::ACK;
  38. }
  39. // 设置锁过期时间,防止死锁
  40. $redis->expire($lockKey, 60);
  41. try {
  42. // 数据存储
  43. $this->logger->info('消费数据', ['data' => $data]);
  44. // 调用数据处理服务
  45. $result = $this->chatServiceClient->addChatRecords($data);
  46. // 记录处理结果
  47. $this->logger->info("消费成功:", ['result' => $result]);
  48. return Result::ACK;
  49. } catch (\Exception $e) {
  50. // 记录错误
  51. $this->logger->error("消费失败:", ['error' => $e->getMessage()]);
  52. // 返回拒绝,重新入队
  53. // return Result::REQUEUE;
  54. }
  55. }
  56. public function isEnable(): bool
  57. {
  58. return true;
  59. }
  60. }