MqConsumer.php 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. <?php
  2. declare (strict_types = 1);
  3. namespace App\Amqp\Consumer;
  4. use App\JsonRpc\ChatServiceInterface;
  5. use Hyperf\Amqp\Annotation\Consumer;
  6. use Hyperf\Amqp\Message\ConsumerMessage;
  7. use Hyperf\Amqp\Result;
  8. use Hyperf\Di\Annotation\Inject;
  9. use PhpAmqpLib\Message\AMQPMessage;
  10. use Psr\Log\LoggerInterface;
  11. use Hyperf\Redis\RedisFactory;
  12. #[Consumer(exchange: 'chatprod', routingKey: 'chatprod', queue: 'chatprod', name: "chatprod", nums: 1)]
  13. class MqConsumer extends ConsumerMessage
  14. {
  15. /**
  16. * @var ChatServiceInterface
  17. */
  18. #[Inject]
  19. private $chatServiceClient;
  20. protected $logger;
  21. #[Inject]
  22. protected RedisFactory $redisFactory;
  23. public function __construct(LoggerInterface $logger)
  24. {
  25. $this->logger = $logger;
  26. }
  27. public function consumeMessage($data, AMQPMessage $message): Result
  28. {
  29. // $redis = $this->redisFactory->get('default');
  30. // $lockKey = 'mq_consumer_lock';
  31. // $lockValue = uniqid();
  32. // // 获取锁
  33. // $lockAcquired = $redis->setnx($lockKey, $lockValue);
  34. // if (!$lockAcquired) {
  35. // $this->logger->info('Another consumer instance is already running.');
  36. // $this->logger->info($redis->get($lockKey));
  37. // // return Result::ACK;
  38. // return Result::REQUEUE;
  39. // }
  40. // // 设置锁过期时间,防止死锁
  41. // $redis->expire($lockKey, 60);
  42. try {
  43. // 数据存储
  44. $this->logger->info('消费数据', ['data' => $data]);
  45. var_dump($data, '=================消费数据==============');
  46. // 调用数据处理服务
  47. $result = $this->chatServiceClient->addChatRecords($data);
  48. // 记录处理结果
  49. $this->logger->info("消费成功:", ['result' => $result]);
  50. return Result::ACK;
  51. } catch (\Exception $e) {
  52. // 记录错误
  53. $this->logger->error("消费失败:", ['error' => $e->getMessage()]);
  54. // 返回拒绝,重新入队
  55. // return Result::REQUEUE;
  56. }
  57. }
  58. public function isEnable(): bool
  59. {
  60. return true;
  61. }
  62. }