|
@@ -33,21 +33,21 @@ class MqConsumer extends ConsumerMessage
|
|
|
|
|
|
public function consumeMessage($data, AMQPMessage $message): Result
|
|
|
{
|
|
|
- $redis = $this->redisFactory->get('default');
|
|
|
- $lockKey = 'mq_consumer_lock';
|
|
|
- $lockValue = uniqid();
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
|
|
|
-
|
|
|
- $lockAcquired = $redis->setnx($lockKey, $lockValue);
|
|
|
- if (!$lockAcquired) {
|
|
|
- $this->logger->info('Another consumer instance is already running.');
|
|
|
- $this->logger->info($redis->get($lockKey));
|
|
|
-
|
|
|
- return Result::REQUEUE;
|
|
|
- }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
|
|
|
-
|
|
|
- $redis->expire($lockKey, 60);
|
|
|
+
|
|
|
+
|
|
|
try {
|
|
|
|
|
|
$this->logger->info('消费数据', ['data' => $data]);
|