|
@@ -11,6 +11,7 @@ use Hyperf\Amqp\Result;
|
|
|
use Hyperf\Di\Annotation\Inject;
|
|
|
use PhpAmqpLib\Message\AMQPMessage;
|
|
|
use Psr\Log\LoggerInterface;
|
|
|
+use Hyperf\Redis\RedisFactory;
|
|
|
|
|
|
|
|
|
class MqConsumer extends ConsumerMessage
|
|
@@ -22,6 +23,8 @@ class MqConsumer extends ConsumerMessage
|
|
|
private $chatServiceClient;
|
|
|
|
|
|
protected $logger;
|
|
|
+
|
|
|
+ protected RedisFactory $redisFactory;
|
|
|
|
|
|
public function __construct(LoggerInterface $logger)
|
|
|
{
|
|
@@ -30,6 +33,20 @@ class MqConsumer extends ConsumerMessage
|
|
|
|
|
|
public function consumeMessage($data, AMQPMessage $message): Result
|
|
|
{
|
|
|
+ $redis = $this->redisFactory->get('default');
|
|
|
+ $lockKey = 'mq_consumer_lock';
|
|
|
+ $lockValue = uniqid();
|
|
|
+
|
|
|
+
|
|
|
+ $lockAcquired = $redis->setnx($lockKey, $lockValue);
|
|
|
+ if (!$lockAcquired) {
|
|
|
+ $this->logger->info('Another consumer instance is already running.');
|
|
|
+ var_dump($redis->get($lockKey));
|
|
|
+ return Result::ACK;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ $redis->expire($lockKey, 60);
|
|
|
try {
|
|
|
|
|
|
$this->logger->info('消费数据', ['data' => $data]);
|