MqConsumer.php 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. <?php
  2. declare (strict_types = 1);
  3. namespace App\Amqp\Consumer;
  4. use App\JsonRpc\ChatServiceInterface;
  5. use Hyperf\Amqp\Annotation\Consumer;
  6. use Hyperf\Amqp\Message\ConsumerMessage;
  7. use Hyperf\Amqp\Result;
  8. use Hyperf\Di\Annotation\Inject;
  9. use PhpAmqpLib\Message\AMQPMessage;
  10. use Psr\Log\LoggerInterface;
  11. #[Consumer(exchange: 'hyperf', routingKey: 'hyperf', queue: 'hyperf', name: "MqConsumer", nums: 1)]
  12. class MqConsumer extends ConsumerMessage
  13. {
  14. /**
  15. * @var ChatServiceInterface
  16. */
  17. #[Inject]
  18. private $chatServiceClient;
  19. protected $logger;
  20. public function __construct(LoggerInterface $logger)
  21. {
  22. $this->logger = $logger;
  23. }
  24. public function consumeMessage($data, AMQPMessage $message): Result
  25. {
  26. try {
  27. // 数据存储
  28. $this->logger->info('消费数据', ['data' => $data]);
  29. // 调用数据处理服务
  30. $result = $this->chatServiceClient->addChatRecords($data);
  31. // 记录处理结果
  32. $this->logger->info("消费成功:", ['result' => $result]);
  33. return Result::ACK;
  34. } catch (\Exception $e) {
  35. // 记录错误
  36. $this->logger->error("消费失败:", ['error' => $e->getMessage()]);
  37. // 返回拒绝,重新入队
  38. // return Result::REQUEUE;
  39. }
  40. }
  41. public function isEnable(): bool
  42. {
  43. return true;
  44. }
  45. }