AbstractDriver.php 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * This file is part of Hyperf.
  5. *
  6. * @link https://www.hyperf.io
  7. * @document https://hyperf.wiki
  8. * @contact group@hyperf.io
  9. * @license https://github.com/hyperf/hyperf/blob/master/LICENSE
  10. */
  11. namespace Hyperf\ConfigCenter;
  12. use Hyperf\ConfigCenter\Contract\ClientInterface;
  13. use Hyperf\ConfigCenter\Contract\DriverInterface;
  14. use Hyperf\ConfigCenter\Contract\PipeMessageInterface;
  15. use Hyperf\ConfigCenter\Event\ConfigChanged;
  16. use Hyperf\Contract\ConfigInterface;
  17. use Hyperf\Contract\StdoutLoggerInterface;
  18. use Hyperf\Coordinator\Constants;
  19. use Hyperf\Coordinator\CoordinatorManager;
  20. use Hyperf\Coroutine\Coroutine;
  21. use Hyperf\Process\ProcessCollector;
  22. use InvalidArgumentException;
  23. use Psr\Container\ContainerInterface;
  24. use Psr\EventDispatcher\EventDispatcherInterface;
  25. use Psr\Log\LoggerInterface;
  26. use Swoole\Process;
  27. use Swoole\Server;
  28. use Throwable;
  29. use function Hyperf\Support\retry;
  30. abstract class AbstractDriver implements DriverInterface
  31. {
  32. /**
  33. * @var null|Server
  34. */
  35. protected $server;
  36. protected ConfigInterface $config;
  37. protected LoggerInterface $logger;
  38. protected ClientInterface $client;
  39. protected string $pipeMessage = PipeMessage::class;
  40. protected string $driverName = '';
  41. public function __construct(protected ContainerInterface $container)
  42. {
  43. $this->config = $container->get(ConfigInterface::class);
  44. $this->logger = $container->get(StdoutLoggerInterface::class);
  45. }
  46. public function createMessageFetcherLoop(): void
  47. {
  48. Coroutine::create(function () {
  49. $interval = $this->getInterval();
  50. retry(INF, function () use ($interval) {
  51. $prevConfig = [];
  52. while (true) {
  53. try {
  54. $coordinator = CoordinatorManager::until(Constants::WORKER_EXIT);
  55. $workerExited = $coordinator->yield($interval);
  56. if ($workerExited) {
  57. break;
  58. }
  59. $config = $this->pull();
  60. if ($config !== $prevConfig) {
  61. $this->syncConfig($config, $prevConfig);
  62. }
  63. $prevConfig = $config;
  64. } catch (Throwable $exception) {
  65. $this->logger->error((string) $exception);
  66. throw $exception;
  67. }
  68. }
  69. }, $interval * 1000);
  70. });
  71. }
  72. public function fetchConfig(): void
  73. {
  74. if (method_exists($this->client, 'pull')) {
  75. $config = $this->pull();
  76. $config && is_array($config) && $this->updateConfig($config);
  77. }
  78. }
  79. public function onPipeMessage(PipeMessageInterface $pipeMessage): void
  80. {
  81. $this->updateConfig($pipeMessage->getData());
  82. }
  83. public function getServer(): ?Server
  84. {
  85. return $this->server;
  86. }
  87. public function setServer($server): AbstractDriver
  88. {
  89. $this->server = $server;
  90. return $this;
  91. }
  92. protected function event(object $event)
  93. {
  94. $this->container->get(EventDispatcherInterface::class)?->dispatch($event);
  95. }
  96. protected function syncConfig(array $config, ?array $prevConfig = null)
  97. {
  98. if (class_exists(ProcessCollector::class) && ! ProcessCollector::isEmpty()) {
  99. $this->shareConfigToProcesses($config);
  100. } else {
  101. $this->updateConfig($config);
  102. }
  103. $prevConfig !== null && $this->event(new ConfigChanged($config, $prevConfig));
  104. }
  105. protected function pull(): array
  106. {
  107. return $this->client->pull();
  108. }
  109. protected function updateConfig(array $config): void
  110. {
  111. foreach ($config as $key => $value) {
  112. if (is_string($key)) {
  113. $this->config->set($key, $value);
  114. $this->logger->debug(sprintf('Config [%s] is updated', $key));
  115. }
  116. }
  117. }
  118. protected function getInterval(): int
  119. {
  120. return (int) $this->config->get('config_center.drivers.' . $this->driverName . '.interval', 5);
  121. }
  122. protected function shareConfigToProcesses(array $config): void
  123. {
  124. $pipeMessage = $this->pipeMessage;
  125. $message = new $pipeMessage($config);
  126. if (! $message instanceof PipeMessageInterface) {
  127. throw new InvalidArgumentException('Invalid pipe message object.');
  128. }
  129. $this->shareMessageToWorkers($message);
  130. $this->shareMessageToUserProcesses($message);
  131. }
  132. protected function shareMessageToWorkers(PipeMessageInterface $message): void
  133. {
  134. if ($this->server instanceof Server) {
  135. $workerCount = $this->server->setting['worker_num'] + ($this->server->setting['task_worker_num'] ?? 0) - 1;
  136. for ($workerId = 0; $workerId <= $workerCount; ++$workerId) {
  137. $this->server->sendMessage($message, $workerId);
  138. }
  139. }
  140. }
  141. protected function shareMessageToUserProcesses(PipeMessageInterface $message): void
  142. {
  143. $processes = ProcessCollector::all();
  144. if ($processes) {
  145. $string = serialize($message);
  146. /** @var Process $process */
  147. foreach ($processes as $process) {
  148. $result = $process->exportSocket()->send($string, 10);
  149. if ($result === false) {
  150. $this->logger->error('Configuration synchronization failed. Please restart the server.');
  151. }
  152. }
  153. }
  154. }
  155. }