AbstractLoadBalancer.php 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * This file is part of Hyperf.
  5. *
  6. * @link https://www.hyperf.io
  7. * @document https://hyperf.wiki
  8. * @contact group@hyperf.io
  9. * @license https://github.com/hyperf/hyperf/blob/master/LICENSE
  10. */
  11. namespace Hyperf\LoadBalancer;
  12. use Closure;
  13. use Hyperf\Coordinator\Constants;
  14. use Hyperf\Coordinator\CoordinatorManager;
  15. use Hyperf\Coroutine\Coroutine;
  16. use Psr\Log\LoggerInterface;
  17. use Throwable;
  18. abstract class AbstractLoadBalancer implements LoadBalancerInterface
  19. {
  20. /**
  21. * @var array<string, ?Closure>
  22. */
  23. protected array $afterRefreshCallbacks = [];
  24. protected bool $autoRefresh = false;
  25. /**
  26. * @param Node[] $nodes
  27. */
  28. public function __construct(protected array $nodes = [], protected ?LoggerInterface $logger = null)
  29. {
  30. }
  31. /**
  32. * @param Node[] $nodes
  33. */
  34. public function setNodes(array $nodes): static
  35. {
  36. $this->nodes = $nodes;
  37. return $this;
  38. }
  39. public function getNodes(): array
  40. {
  41. return $this->nodes;
  42. }
  43. /**
  44. * Remove a node from the node list.
  45. */
  46. public function removeNode(Node $node): bool
  47. {
  48. foreach ($this->nodes as $key => $activeNode) {
  49. if ($activeNode === $node) {
  50. unset($this->nodes[$key]);
  51. return true;
  52. }
  53. }
  54. return false;
  55. }
  56. public function refresh(callable $callback, int $tickMs = 5000): void
  57. {
  58. if (! $this->autoRefresh) {
  59. $this->autoRefresh = true;
  60. Coroutine::create(function () use ($callback, $tickMs) {
  61. while (true) {
  62. try {
  63. $exited = CoordinatorManager::until(Constants::WORKER_EXIT)->yield($tickMs / 1000);
  64. if ($exited) {
  65. break;
  66. }
  67. $beforeNodes = $this->getNodes();
  68. $nodes = $callback($beforeNodes);
  69. if (is_array($nodes)) {
  70. $this->setNodes($nodes);
  71. foreach ($this->afterRefreshCallbacks as $refreshCallback) {
  72. ! is_null($refreshCallback) && $refreshCallback($beforeNodes, $nodes);
  73. }
  74. }
  75. } catch (Throwable $exception) {
  76. $this->logger?->error((string) $exception);
  77. }
  78. }
  79. $this->autoRefresh = false;
  80. });
  81. }
  82. }
  83. public function isAutoRefresh(): bool
  84. {
  85. return $this->autoRefresh;
  86. }
  87. public function afterRefreshed(string $key, ?Closure $callback): void
  88. {
  89. $this->afterRefreshCallbacks[$key] = $callback;
  90. }
  91. public function clearAfterRefreshedCallbacks(): void
  92. {
  93. $this->afterRefreshCallbacks = [];
  94. }
  95. }