StreamSelectLoop.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. <?php
  2. namespace React\EventLoop;
  3. use React\EventLoop\Tick\FutureTickQueue;
  4. use React\EventLoop\Timer\Timer;
  5. use React\EventLoop\Timer\Timers;
  6. /**
  7. * A `stream_select()` based event loop.
  8. *
  9. * This uses the [`stream_select()`](https://www.php.net/manual/en/function.stream-select.php)
  10. * function and is the only implementation that works out of the box with PHP.
  11. *
  12. * This event loop works out of the box on PHP 5.4 through PHP 8+ and HHVM.
  13. * This means that no installation is required and this library works on all
  14. * platforms and supported PHP versions.
  15. * Accordingly, the [`Loop` class](#loop) and the deprecated [`Factory`](#factory)
  16. * will use this event loop by default if you do not install any of the event loop
  17. * extensions listed below.
  18. *
  19. * Under the hood, it does a simple `select` system call.
  20. * This system call is limited to the maximum file descriptor number of
  21. * `FD_SETSIZE` (platform dependent, commonly 1024) and scales with `O(m)`
  22. * (`m` being the maximum file descriptor number passed).
  23. * This means that you may run into issues when handling thousands of streams
  24. * concurrently and you may want to look into using one of the alternative
  25. * event loop implementations listed below in this case.
  26. * If your use case is among the many common use cases that involve handling only
  27. * dozens or a few hundred streams at once, then this event loop implementation
  28. * performs really well.
  29. *
  30. * If you want to use signal handling (see also [`addSignal()`](#addsignal) below),
  31. * this event loop implementation requires `ext-pcntl`.
  32. * This extension is only available for Unix-like platforms and does not support
  33. * Windows.
  34. * It is commonly installed as part of many PHP distributions.
  35. * If this extension is missing (or you're running on Windows), signal handling is
  36. * not supported and throws a `BadMethodCallException` instead.
  37. *
  38. * This event loop is known to rely on wall-clock time to schedule future timers
  39. * when using any version before PHP 7.3, because a monotonic time source is
  40. * only available as of PHP 7.3 (`hrtime()`).
  41. * While this does not affect many common use cases, this is an important
  42. * distinction for programs that rely on a high time precision or on systems
  43. * that are subject to discontinuous time adjustments (time jumps).
  44. * This means that if you schedule a timer to trigger in 30s on PHP < 7.3 and
  45. * then adjust your system time forward by 20s, the timer may trigger in 10s.
  46. * See also [`addTimer()`](#addtimer) for more details.
  47. *
  48. * @link https://www.php.net/manual/en/function.stream-select.php
  49. */
  50. final class StreamSelectLoop implements LoopInterface
  51. {
  52. /** @internal */
  53. const MICROSECONDS_PER_SECOND = 1000000;
  54. private $futureTickQueue;
  55. private $timers;
  56. private $readStreams = array();
  57. private $readListeners = array();
  58. private $writeStreams = array();
  59. private $writeListeners = array();
  60. private $running;
  61. private $pcntl = false;
  62. private $pcntlPoll = false;
  63. private $signals;
  64. public function __construct()
  65. {
  66. $this->futureTickQueue = new FutureTickQueue();
  67. $this->timers = new Timers();
  68. $this->pcntl = \function_exists('pcntl_signal') && \function_exists('pcntl_signal_dispatch');
  69. $this->pcntlPoll = $this->pcntl && !\function_exists('pcntl_async_signals');
  70. $this->signals = new SignalsHandler();
  71. // prefer async signals if available (PHP 7.1+) or fall back to dispatching on each tick
  72. if ($this->pcntl && !$this->pcntlPoll) {
  73. \pcntl_async_signals(true);
  74. }
  75. }
  76. public function addReadStream($stream, $listener)
  77. {
  78. $key = (int) $stream;
  79. if (!isset($this->readStreams[$key])) {
  80. $this->readStreams[$key] = $stream;
  81. $this->readListeners[$key] = $listener;
  82. }
  83. }
  84. public function addWriteStream($stream, $listener)
  85. {
  86. $key = (int) $stream;
  87. if (!isset($this->writeStreams[$key])) {
  88. $this->writeStreams[$key] = $stream;
  89. $this->writeListeners[$key] = $listener;
  90. }
  91. }
  92. public function removeReadStream($stream)
  93. {
  94. $key = (int) $stream;
  95. unset(
  96. $this->readStreams[$key],
  97. $this->readListeners[$key]
  98. );
  99. }
  100. public function removeWriteStream($stream)
  101. {
  102. $key = (int) $stream;
  103. unset(
  104. $this->writeStreams[$key],
  105. $this->writeListeners[$key]
  106. );
  107. }
  108. public function addTimer($interval, $callback)
  109. {
  110. $timer = new Timer($interval, $callback, false);
  111. $this->timers->add($timer);
  112. return $timer;
  113. }
  114. public function addPeriodicTimer($interval, $callback)
  115. {
  116. $timer = new Timer($interval, $callback, true);
  117. $this->timers->add($timer);
  118. return $timer;
  119. }
  120. public function cancelTimer(TimerInterface $timer)
  121. {
  122. $this->timers->cancel($timer);
  123. }
  124. public function futureTick($listener)
  125. {
  126. $this->futureTickQueue->add($listener);
  127. }
  128. public function addSignal($signal, $listener)
  129. {
  130. if ($this->pcntl === false) {
  131. throw new \BadMethodCallException('Event loop feature "signals" isn\'t supported by the "StreamSelectLoop"');
  132. }
  133. $first = $this->signals->count($signal) === 0;
  134. $this->signals->add($signal, $listener);
  135. if ($first) {
  136. \pcntl_signal($signal, array($this->signals, 'call'));
  137. }
  138. }
  139. public function removeSignal($signal, $listener)
  140. {
  141. if (!$this->signals->count($signal)) {
  142. return;
  143. }
  144. $this->signals->remove($signal, $listener);
  145. if ($this->signals->count($signal) === 0) {
  146. \pcntl_signal($signal, \SIG_DFL);
  147. }
  148. }
  149. public function run()
  150. {
  151. $this->running = true;
  152. while ($this->running) {
  153. $this->futureTickQueue->tick();
  154. $this->timers->tick();
  155. // Future-tick queue has pending callbacks ...
  156. if (!$this->running || !$this->futureTickQueue->isEmpty()) {
  157. $timeout = 0;
  158. // There is a pending timer, only block until it is due ...
  159. } elseif ($scheduledAt = $this->timers->getFirst()) {
  160. $timeout = $scheduledAt - $this->timers->getTime();
  161. if ($timeout < 0) {
  162. $timeout = 0;
  163. } else {
  164. // Convert float seconds to int microseconds.
  165. // Ensure we do not exceed maximum integer size, which may
  166. // cause the loop to tick once every ~35min on 32bit systems.
  167. $timeout *= self::MICROSECONDS_PER_SECOND;
  168. $timeout = $timeout > \PHP_INT_MAX ? \PHP_INT_MAX : (int)$timeout;
  169. }
  170. // The only possible event is stream or signal activity, so wait forever ...
  171. } elseif ($this->readStreams || $this->writeStreams || !$this->signals->isEmpty()) {
  172. $timeout = null;
  173. // There's nothing left to do ...
  174. } else {
  175. break;
  176. }
  177. $this->waitForStreamActivity($timeout);
  178. }
  179. }
  180. public function stop()
  181. {
  182. $this->running = false;
  183. }
  184. /**
  185. * Wait/check for stream activity, or until the next timer is due.
  186. *
  187. * @param integer|null $timeout Activity timeout in microseconds, or null to wait forever.
  188. */
  189. private function waitForStreamActivity($timeout)
  190. {
  191. $read = $this->readStreams;
  192. $write = $this->writeStreams;
  193. $available = $this->streamSelect($read, $write, $timeout);
  194. if ($this->pcntlPoll) {
  195. \pcntl_signal_dispatch();
  196. }
  197. if (false === $available) {
  198. // if a system call has been interrupted,
  199. // we cannot rely on it's outcome
  200. return;
  201. }
  202. foreach ($read as $stream) {
  203. $key = (int) $stream;
  204. if (isset($this->readListeners[$key])) {
  205. \call_user_func($this->readListeners[$key], $stream);
  206. }
  207. }
  208. foreach ($write as $stream) {
  209. $key = (int) $stream;
  210. if (isset($this->writeListeners[$key])) {
  211. \call_user_func($this->writeListeners[$key], $stream);
  212. }
  213. }
  214. }
  215. /**
  216. * Emulate a stream_select() implementation that does not break when passed
  217. * empty stream arrays.
  218. *
  219. * @param array $read An array of read streams to select upon.
  220. * @param array $write An array of write streams to select upon.
  221. * @param int|null $timeout Activity timeout in microseconds, or null to wait forever.
  222. *
  223. * @return int|false The total number of streams that are ready for read/write.
  224. * Can return false if stream_select() is interrupted by a signal.
  225. */
  226. private function streamSelect(array &$read, array &$write, $timeout)
  227. {
  228. if ($read || $write) {
  229. // We do not usually use or expose the `exceptfds` parameter passed to the underlying `select`.
  230. // However, Windows does not report failed connection attempts in `writefds` passed to `select` like most other platforms.
  231. // Instead, it uses `writefds` only for successful connection attempts and `exceptfds` for failed connection attempts.
  232. // We work around this by adding all sockets that look like a pending connection attempt to `exceptfds` automatically on Windows and merge it back later.
  233. // This ensures the public API matches other loop implementations across all platforms (see also test suite or rather test matrix).
  234. // Lacking better APIs, every write-only socket that has not yet read any data is assumed to be in a pending connection attempt state.
  235. // @link https://docs.microsoft.com/de-de/windows/win32/api/winsock2/nf-winsock2-select
  236. $except = null;
  237. if (\DIRECTORY_SEPARATOR === '\\') {
  238. $except = array();
  239. foreach ($write as $key => $socket) {
  240. if (!isset($read[$key]) && @\ftell($socket) === 0) {
  241. $except[$key] = $socket;
  242. }
  243. }
  244. }
  245. /** @var ?callable $previous */
  246. $previous = \set_error_handler(function ($errno, $errstr) use (&$previous) {
  247. // suppress warnings that occur when `stream_select()` is interrupted by a signal
  248. // PHP defines `EINTR` through `ext-sockets` or `ext-pcntl`, otherwise use common default (Linux & Mac)
  249. $eintr = \defined('SOCKET_EINTR') ? \SOCKET_EINTR : (\defined('PCNTL_EINTR') ? \PCNTL_EINTR : 4);
  250. if ($errno === \E_WARNING && \strpos($errstr, '[' . $eintr .']: ') !== false) {
  251. return;
  252. }
  253. // forward any other error to registered error handler or print warning
  254. return ($previous !== null) ? \call_user_func_array($previous, \func_get_args()) : false;
  255. });
  256. try {
  257. $ret = \stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout);
  258. \restore_error_handler();
  259. } catch (\Throwable $e) { // @codeCoverageIgnoreStart
  260. \restore_error_handler();
  261. throw $e;
  262. } catch (\Exception $e) {
  263. \restore_error_handler();
  264. throw $e;
  265. } // @codeCoverageIgnoreEnd
  266. if ($except) {
  267. $write = \array_merge($write, $except);
  268. }
  269. return $ret;
  270. }
  271. if ($timeout > 0) {
  272. \usleep($timeout);
  273. } elseif ($timeout === null) {
  274. // wait forever (we only reach this if we're only awaiting signals)
  275. // this may be interrupted and return earlier when a signal is received
  276. \sleep(PHP_INT_MAX);
  277. }
  278. return 0;
  279. }
  280. }