Redis.php 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * This file is part of Hyperf.
  5. *
  6. * @link https://www.hyperf.io
  7. * @document https://hyperf.wiki
  8. * @contact group@hyperf.io
  9. * @license https://github.com/hyperf/hyperf/blob/master/LICENSE
  10. */
  11. namespace Hyperf\Redis;
  12. use Hyperf\Context\Context;
  13. use Hyperf\Redis\Exception\InvalidRedisConnectionException;
  14. use Hyperf\Redis\Pool\PoolFactory;
  15. use Throwable;
  16. use function Hyperf\Coroutine\defer;
  17. /**
  18. * @mixin \Redis
  19. */
  20. class Redis
  21. {
  22. use Traits\ScanCaller;
  23. use Traits\MultiExec;
  24. protected string $poolName = 'default';
  25. public function __construct(protected PoolFactory $factory)
  26. {
  27. }
  28. public function __call($name, $arguments)
  29. {
  30. // Get a connection from coroutine context or connection pool.
  31. $hasContextConnection = Context::has($this->getContextKey());
  32. $connection = $this->getConnection($hasContextConnection);
  33. // Record the start time of the command.
  34. $start = (float) microtime(true);
  35. try {
  36. /** @var RedisConnection $connection */
  37. $connection = $connection->getConnection();
  38. // Execute the command with the arguments.
  39. $result = $connection->{$name}(...$arguments);
  40. } catch (Throwable $exception) {
  41. throw $exception;
  42. } finally {
  43. $time = round((microtime(true) - $start) * 1000, 2);
  44. // Dispatch the command executed event.
  45. $connection->getEventDispatcher()?->dispatch(
  46. new Event\CommandExecuted(
  47. $name,
  48. $arguments,
  49. $time,
  50. $connection,
  51. $this->poolName,
  52. $result ?? null,
  53. $exception ?? null,
  54. )
  55. );
  56. // Release connection.
  57. if (! $hasContextConnection) {
  58. if ($this->shouldUseSameConnection($name)) {
  59. if ($name === 'select' && $db = $arguments[0]) {
  60. $connection->setDatabase((int) $db);
  61. }
  62. // Should storage the connection to coroutine context, then use defer() to release the connection.
  63. Context::set($this->getContextKey(), $connection);
  64. defer(function () use ($connection) {
  65. Context::set($this->getContextKey(), null);
  66. $connection->release();
  67. });
  68. } else {
  69. // Release the connection after command executed.
  70. $connection->release();
  71. }
  72. }
  73. }
  74. return $result;
  75. }
  76. /**
  77. * Define the commands that need same connection to execute.
  78. * When these commands executed, the connection will storage to coroutine context.
  79. */
  80. private function shouldUseSameConnection(string $methodName): bool
  81. {
  82. return in_array($methodName, [
  83. 'multi',
  84. 'pipeline',
  85. 'select',
  86. ]);
  87. }
  88. /**
  89. * Get a connection from coroutine context, or from redis connection pool.
  90. * @param mixed $hasContextConnection
  91. */
  92. private function getConnection($hasContextConnection): RedisConnection
  93. {
  94. $connection = null;
  95. if ($hasContextConnection) {
  96. $connection = Context::get($this->getContextKey());
  97. }
  98. if (! $connection instanceof RedisConnection) {
  99. $pool = $this->factory->getPool($this->poolName);
  100. $connection = $pool->get();
  101. }
  102. if (! $connection instanceof RedisConnection) {
  103. throw new InvalidRedisConnectionException('The connection is not a valid RedisConnection.');
  104. }
  105. return $connection;
  106. }
  107. /**
  108. * The key to identify the connection object in coroutine context.
  109. */
  110. private function getContextKey(): string
  111. {
  112. return sprintf('redis.connection.%s', $this->poolName);
  113. }
  114. }