123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- <?php
- declare (strict_types = 1);
- namespace App\Amqp\Consumer;
- use App\JsonRpc\ChatServiceInterface;
- use Hyperf\Amqp\Annotation\Consumer;
- use Hyperf\Amqp\Message\ConsumerMessage;
- use Hyperf\Amqp\Result;
- use Hyperf\Di\Annotation\Inject;
- use PhpAmqpLib\Message\AMQPMessage;
- use Psr\Log\LoggerInterface;
- use Hyperf\Redis\RedisFactory;
- use App\Controller\MessageController;
- #[Consumer(exchange: 'chatprod', routingKey: 'chatprod', queue: 'chatprod', name: "chatprod", nums: 1)]
- class MqConsumer extends ConsumerMessage
- {
- /**
- * @var ChatServiceInterface
- */
- #[Inject]
- private $chatServiceClient;
- /**
- * @var MessageController
- */
- #[Inject]
- protected MessageController $messageController;
- protected $logger;
- #[Inject]
- protected RedisFactory $redisFactory;
- public function __construct(LoggerInterface $logger)
- {
- $this->logger = $logger;
- }
- public function consumeMessage($data, AMQPMessage $message): Result
- {
- // $redis = $this->redisFactory->get('default');
- // $lockKey = 'mq_consumer_lock';
- // $lockValue = uniqid();
- // // 获取锁
- // $lockAcquired = $redis->setnx($lockKey, $lockValue);
- // if (!$lockAcquired) {
- // $this->logger->info('Another consumer instance is already running.');
- // $this->logger->info($redis->get($lockKey));
- // // return Result::ACK;
- // return Result::REQUEUE;
- // }
- // // 设置锁过期时间,防止死锁
- // $redis->expire($lockKey, 60);
- try {
- // 数据存储
- $this->logger->info('消费数据', ['data' => $data]);
- var_dump($data, '=================消费数据==============');
- switch ($data['talk_type']) {
- case 100:
- //审核资讯-通知所有的管理员
- $this->messageController->sendAdminMessage($data);
- return Result::ACK;
- break;
- case 101:
- //审核资讯-通知所有的管理员
- $this->messageController->sendUserMessage($data);
- return Result::ACK;
- break;
- case 200:
- //审核商品-通知所有的管理员
- $this->messageController->sendAdminMessage($data);
- return Result::ACK;
- break;
- case 201:
- //审核商品-通知发布者
- $this->messageController->sendUserMessage($data);
- return Result::ACK;
- break;
- case 300:
- $this->messageController->sendAdminMessage($data);
- return Result::ACK;
- break;
- case 301:
- $this->messageController->sendUserMessage($data);
- return Result::ACK;
- break;
- case 400:
- $this->messageController->sendAdminMessage($data);
- return Result::ACK;
- break;
- case 401:
- $this->messageController->sendUserMessage($data);
- return Result::ACK;
- break;
- case 500:
- $this->messageController->sendAdminMessage($data);
- return Result::ACK;
- break;
- case 501:
- $this->messageController->sendUserMessage($data);
- return Result::ACK;
- break;
- case 600:
- $this->messageController->sendAdminMessage($data);
- return Result::ACK;
- break;
- case 601:
- $this->messageController->sendUserMessage($data);
- return Result::ACK;
- break;
- case 700:
- $this->messageController->sendAdminMessage($data);
- return Result::ACK;
- break;
- case 701:
- $this->messageController->sendUserMessage($data);
- return Result::ACK;
- break;
- case 800:
- $this->messageController->sendAdminMessage($data);
- return Result::ACK;
- break;
- case 801:
- $this->messageController->sendUserMessage($data);
- return Result::ACK;
- break;
- case 802:
- $this->messageController->sendOtherUserMessage($data);
- return Result::ACK;
- break;
- case 900:
- $this->messageController->sendAdminMessage($data);
- return Result::ACK;
- break;
- case 901:
- $this->messageController->sendUserMessage($data);
- return Result::ACK;
- break;
- case 1000:
- $this->messageController->sendAdminMessage($data);
- return Result::ACK;
- break;
- case 1001:
- $this->messageController->sendUserMessage($data);
- return Result::ACK;
- break;
- default:
- // 调用数据处理服务
- $result = $this->chatServiceClient->addChatRecords($data);
- return Result::ACK;
- break;
- }
-
- // 记录处理结果
- $this->logger->info("消费成功:", ['result' => $result]);
- return Result::ACK;
- } catch (\Exception $e) {
- // 记录错误
- $this->logger->error("消费失败:", ['error' => $e->getMessage()]);
- // 返回拒绝,重新入队
- // return Result::REQUEUE;
- }
- }
- public function isEnable(): bool
- {
- return true;
- }
- }
|