|
@@ -11,6 +11,7 @@ use Hyperf\Amqp\Result;
|
|
|
use Hyperf\Di\Annotation\Inject;
|
|
|
use PhpAmqpLib\Message\AMQPMessage;
|
|
|
use Psr\Log\LoggerInterface;
|
|
|
+use Hyperf\Redis\RedisFactory;
|
|
|
|
|
|
#[Consumer(exchange: 'hyperf', routingKey: 'hyperf', queue: 'hyperf', name: "MqConsumer", nums: 1)]
|
|
|
class MqConsumer extends ConsumerMessage
|
|
@@ -22,6 +23,8 @@ class MqConsumer extends ConsumerMessage
|
|
|
private $chatServiceClient;
|
|
|
|
|
|
protected $logger;
|
|
|
+ #[Inject]
|
|
|
+ protected RedisFactory $redisFactory;
|
|
|
|
|
|
public function __construct(LoggerInterface $logger)
|
|
|
{
|
|
@@ -30,6 +33,20 @@ class MqConsumer extends ConsumerMessage
|
|
|
|
|
|
public function consumeMessage($data, AMQPMessage $message): Result
|
|
|
{
|
|
|
+ $redis = $this->redisFactory->get('default');
|
|
|
+ $lockKey = 'mq_consumer_lock';
|
|
|
+ $lockValue = uniqid();
|
|
|
+
|
|
|
+ // 获取锁
|
|
|
+ $lockAcquired = $redis->setnx($lockKey, $lockValue);
|
|
|
+ if (!$lockAcquired) {
|
|
|
+ $this->logger->info('Another consumer instance is already running.');
|
|
|
+ var_dump($redis->get($lockKey));
|
|
|
+ return Result::ACK;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 设置锁过期时间,防止死锁
|
|
|
+ $redis->expire($lockKey, 60);
|
|
|
try {
|
|
|
// 数据存储
|
|
|
$this->logger->info('消费数据', ['data' => $data]);
|