logger = $logger; } public function consumeMessage($data, AMQPMessage $message): Result { // $redis = $this->redisFactory->get('default'); // $lockKey = 'mq_consumer_lock'; // $lockValue = uniqid(); // // 获取锁 // $lockAcquired = $redis->setnx($lockKey, $lockValue); // if (!$lockAcquired) { // $this->logger->info('Another consumer instance is already running.'); // $this->logger->info($redis->get($lockKey)); // // return Result::ACK; // return Result::REQUEUE; // } // // 设置锁过期时间,防止死锁 // $redis->expire($lockKey, 60); try { // 数据存储 $this->logger->info('消费数据', ['data' => $data]); var_dump($data, '=================消费数据=============='); // 调用数据处理服务 $result = $this->chatServiceClient->addChatRecords($data); // 记录处理结果 $this->logger->info("消费成功:", ['result' => $result]); return Result::ACK; } catch (\Exception $e) { // 记录错误 $this->logger->error("消费失败:", ['error' => $e->getMessage()]); // 返回拒绝,重新入队 // return Result::REQUEUE; } } public function isEnable(): bool { return true; } }