ExtEventLoop.php 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. <?php
  2. namespace React\EventLoop;
  3. use BadMethodCallException;
  4. use Event;
  5. use EventBase;
  6. use React\EventLoop\Tick\FutureTickQueue;
  7. use React\EventLoop\Timer\Timer;
  8. use SplObjectStorage;
  9. /**
  10. * An `ext-event` based event loop.
  11. *
  12. * This uses the [`event` PECL extension](https://pecl.php.net/package/event),
  13. * that provides an interface to `libevent` library.
  14. * `libevent` itself supports a number of system-specific backends (epoll, kqueue).
  15. *
  16. * This loop is known to work with PHP 5.4 through PHP 8+.
  17. *
  18. * @link https://pecl.php.net/package/event
  19. */
  20. final class ExtEventLoop implements LoopInterface
  21. {
  22. private $eventBase;
  23. private $futureTickQueue;
  24. private $timerCallback;
  25. private $timerEvents;
  26. private $streamCallback;
  27. private $readEvents = array();
  28. private $writeEvents = array();
  29. private $readListeners = array();
  30. private $writeListeners = array();
  31. private $readRefs = array();
  32. private $writeRefs = array();
  33. private $running;
  34. private $signals;
  35. private $signalEvents = array();
  36. public function __construct()
  37. {
  38. if (!\class_exists('EventBase', false)) {
  39. throw new BadMethodCallException('Cannot create ExtEventLoop, ext-event extension missing');
  40. }
  41. // support arbitrary file descriptors and not just sockets
  42. // Windows only has limited file descriptor support, so do not require this (will fail otherwise)
  43. // @link http://www.wangafu.net/~nickm/libevent-book/Ref2_eventbase.html#_setting_up_a_complicated_event_base
  44. $config = new \EventConfig();
  45. if (\DIRECTORY_SEPARATOR !== '\\') {
  46. $config->requireFeatures(\EventConfig::FEATURE_FDS);
  47. }
  48. $this->eventBase = new EventBase($config);
  49. $this->futureTickQueue = new FutureTickQueue();
  50. $this->timerEvents = new SplObjectStorage();
  51. $this->signals = new SignalsHandler();
  52. $this->createTimerCallback();
  53. $this->createStreamCallback();
  54. }
  55. public function __destruct()
  56. {
  57. // explicitly clear all references to Event objects to prevent SEGFAULTs on Windows
  58. foreach ($this->timerEvents as $timer) {
  59. $this->timerEvents->detach($timer);
  60. }
  61. $this->readEvents = array();
  62. $this->writeEvents = array();
  63. }
  64. public function addReadStream($stream, $listener)
  65. {
  66. $key = (int) $stream;
  67. if (isset($this->readListeners[$key])) {
  68. return;
  69. }
  70. $event = new Event($this->eventBase, $stream, Event::PERSIST | Event::READ, $this->streamCallback);
  71. $event->add();
  72. $this->readEvents[$key] = $event;
  73. $this->readListeners[$key] = $listener;
  74. // ext-event does not increase refcount on stream resources for PHP 7+
  75. // manually keep track of stream resource to prevent premature garbage collection
  76. if (\PHP_VERSION_ID >= 70000) {
  77. $this->readRefs[$key] = $stream;
  78. }
  79. }
  80. public function addWriteStream($stream, $listener)
  81. {
  82. $key = (int) $stream;
  83. if (isset($this->writeListeners[$key])) {
  84. return;
  85. }
  86. $event = new Event($this->eventBase, $stream, Event::PERSIST | Event::WRITE, $this->streamCallback);
  87. $event->add();
  88. $this->writeEvents[$key] = $event;
  89. $this->writeListeners[$key] = $listener;
  90. // ext-event does not increase refcount on stream resources for PHP 7+
  91. // manually keep track of stream resource to prevent premature garbage collection
  92. if (\PHP_VERSION_ID >= 70000) {
  93. $this->writeRefs[$key] = $stream;
  94. }
  95. }
  96. public function removeReadStream($stream)
  97. {
  98. $key = (int) $stream;
  99. if (isset($this->readEvents[$key])) {
  100. $this->readEvents[$key]->free();
  101. unset(
  102. $this->readEvents[$key],
  103. $this->readListeners[$key],
  104. $this->readRefs[$key]
  105. );
  106. }
  107. }
  108. public function removeWriteStream($stream)
  109. {
  110. $key = (int) $stream;
  111. if (isset($this->writeEvents[$key])) {
  112. $this->writeEvents[$key]->free();
  113. unset(
  114. $this->writeEvents[$key],
  115. $this->writeListeners[$key],
  116. $this->writeRefs[$key]
  117. );
  118. }
  119. }
  120. public function addTimer($interval, $callback)
  121. {
  122. $timer = new Timer($interval, $callback, false);
  123. $this->scheduleTimer($timer);
  124. return $timer;
  125. }
  126. public function addPeriodicTimer($interval, $callback)
  127. {
  128. $timer = new Timer($interval, $callback, true);
  129. $this->scheduleTimer($timer);
  130. return $timer;
  131. }
  132. public function cancelTimer(TimerInterface $timer)
  133. {
  134. if ($this->timerEvents->contains($timer)) {
  135. $this->timerEvents[$timer]->free();
  136. $this->timerEvents->detach($timer);
  137. }
  138. }
  139. public function futureTick($listener)
  140. {
  141. $this->futureTickQueue->add($listener);
  142. }
  143. public function addSignal($signal, $listener)
  144. {
  145. $this->signals->add($signal, $listener);
  146. if (!isset($this->signalEvents[$signal])) {
  147. $this->signalEvents[$signal] = Event::signal($this->eventBase, $signal, array($this->signals, 'call'));
  148. $this->signalEvents[$signal]->add();
  149. }
  150. }
  151. public function removeSignal($signal, $listener)
  152. {
  153. $this->signals->remove($signal, $listener);
  154. if (isset($this->signalEvents[$signal]) && $this->signals->count($signal) === 0) {
  155. $this->signalEvents[$signal]->free();
  156. unset($this->signalEvents[$signal]);
  157. }
  158. }
  159. public function run()
  160. {
  161. $this->running = true;
  162. while ($this->running) {
  163. $this->futureTickQueue->tick();
  164. $flags = EventBase::LOOP_ONCE;
  165. if (!$this->running || !$this->futureTickQueue->isEmpty()) {
  166. $flags |= EventBase::LOOP_NONBLOCK;
  167. } elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count() && $this->signals->isEmpty()) {
  168. break;
  169. }
  170. $this->eventBase->loop($flags);
  171. }
  172. }
  173. public function stop()
  174. {
  175. $this->running = false;
  176. }
  177. /**
  178. * Schedule a timer for execution.
  179. *
  180. * @param TimerInterface $timer
  181. */
  182. private function scheduleTimer(TimerInterface $timer)
  183. {
  184. $flags = Event::TIMEOUT;
  185. if ($timer->isPeriodic()) {
  186. $flags |= Event::PERSIST;
  187. }
  188. $event = new Event($this->eventBase, -1, $flags, $this->timerCallback, $timer);
  189. $this->timerEvents[$timer] = $event;
  190. $event->add($timer->getInterval());
  191. }
  192. /**
  193. * Create a callback used as the target of timer events.
  194. *
  195. * A reference is kept to the callback for the lifetime of the loop
  196. * to prevent "Cannot destroy active lambda function" fatal error from
  197. * the event extension.
  198. */
  199. private function createTimerCallback()
  200. {
  201. $timers = $this->timerEvents;
  202. $this->timerCallback = function ($_, $__, $timer) use ($timers) {
  203. \call_user_func($timer->getCallback(), $timer);
  204. if (!$timer->isPeriodic() && $timers->contains($timer)) {
  205. $this->cancelTimer($timer);
  206. }
  207. };
  208. }
  209. /**
  210. * Create a callback used as the target of stream events.
  211. *
  212. * A reference is kept to the callback for the lifetime of the loop
  213. * to prevent "Cannot destroy active lambda function" fatal error from
  214. * the event extension.
  215. */
  216. private function createStreamCallback()
  217. {
  218. $read =& $this->readListeners;
  219. $write =& $this->writeListeners;
  220. $this->streamCallback = function ($stream, $flags) use (&$read, &$write) {
  221. $key = (int) $stream;
  222. if (Event::READ === (Event::READ & $flags) && isset($read[$key])) {
  223. \call_user_func($read[$key], $stream);
  224. }
  225. if (Event::WRITE === (Event::WRITE & $flags) && isset($write[$key])) {
  226. \call_user_func($write[$key], $stream);
  227. }
  228. };
  229. }
  230. }