DuplexResourceStream.php 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. <?php
  2. namespace React\Stream;
  3. use Evenement\EventEmitter;
  4. use React\EventLoop\Loop;
  5. use React\EventLoop\LoopInterface;
  6. use InvalidArgumentException;
  7. final class DuplexResourceStream extends EventEmitter implements DuplexStreamInterface
  8. {
  9. private $stream;
  10. /** @var LoopInterface */
  11. private $loop;
  12. /**
  13. * Controls the maximum buffer size in bytes to read at once from the stream.
  14. *
  15. * This can be a positive number which means that up to X bytes will be read
  16. * at once from the underlying stream resource. Note that the actual number
  17. * of bytes read may be lower if the stream resource has less than X bytes
  18. * currently available.
  19. *
  20. * This can be `-1` which means read everything available from the
  21. * underlying stream resource.
  22. * This should read until the stream resource is not readable anymore
  23. * (i.e. underlying buffer drained), note that this does not neccessarily
  24. * mean it reached EOF.
  25. *
  26. * @var int
  27. */
  28. private $bufferSize;
  29. private $buffer;
  30. private $readable = true;
  31. private $writable = true;
  32. private $closing = false;
  33. private $listening = false;
  34. /**
  35. * @param resource $stream
  36. * @param ?LoopInterface $loop
  37. * @param ?int $readChunkSize
  38. * @param ?WritableStreamInterface $buffer
  39. */
  40. public function __construct($stream, $loop = null, $readChunkSize = null, $buffer = null)
  41. {
  42. if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") {
  43. throw new InvalidArgumentException('First parameter must be a valid stream resource');
  44. }
  45. // ensure resource is opened for reading and wrting (fopen mode must contain "+")
  46. $meta = \stream_get_meta_data($stream);
  47. if (isset($meta['mode']) && $meta['mode'] !== '' && \strpos($meta['mode'], '+') === false) {
  48. throw new InvalidArgumentException('Given stream resource is not opened in read and write mode');
  49. }
  50. // this class relies on non-blocking I/O in order to not interrupt the event loop
  51. // e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
  52. if ($buffer !== null && !$buffer instanceof WritableResourceStream && \stream_set_blocking($stream, false) !== true) {
  53. throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
  54. }
  55. if ($loop !== null && !$loop instanceof LoopInterface) { // manual type check to support legacy PHP < 7.1
  56. throw new \InvalidArgumentException('Argument #2 ($loop) expected null|React\EventLoop\LoopInterface');
  57. }
  58. if ($buffer !== null && !$buffer instanceof WritableStreamInterface) { // manual type check to support legacy PHP < 7.1
  59. throw new \InvalidArgumentException('Argument #4 ($buffer) expected null|React\Stream\WritableStreamInterface');
  60. }
  61. // Use unbuffered read operations on the underlying stream resource.
  62. // Reading chunks from the stream may otherwise leave unread bytes in
  63. // PHP's stream buffers which some event loop implementations do not
  64. // trigger events on (edge triggered).
  65. // This does not affect the default event loop implementation (level
  66. // triggered), so we can ignore platforms not supporting this (HHVM).
  67. // Pipe streams (such as STDIN) do not seem to require this and legacy
  68. // PHP versions cause SEGFAULTs on unbuffered pipe streams, so skip this.
  69. if (\function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) {
  70. \stream_set_read_buffer($stream, 0);
  71. }
  72. if ($buffer === null) {
  73. $buffer = new WritableResourceStream($stream, $loop);
  74. }
  75. $this->stream = $stream;
  76. $this->loop = $loop ?: Loop::get();
  77. $this->bufferSize = ($readChunkSize === null) ? 65536 : (int)$readChunkSize;
  78. $this->buffer = $buffer;
  79. $that = $this;
  80. $this->buffer->on('error', function ($error) use ($that) {
  81. $that->emit('error', array($error));
  82. });
  83. $this->buffer->on('close', array($this, 'close'));
  84. $this->buffer->on('drain', function () use ($that) {
  85. $that->emit('drain');
  86. });
  87. $this->resume();
  88. }
  89. public function isReadable()
  90. {
  91. return $this->readable;
  92. }
  93. public function isWritable()
  94. {
  95. return $this->writable;
  96. }
  97. public function pause()
  98. {
  99. if ($this->listening) {
  100. $this->loop->removeReadStream($this->stream);
  101. $this->listening = false;
  102. }
  103. }
  104. public function resume()
  105. {
  106. if (!$this->listening && $this->readable) {
  107. $this->loop->addReadStream($this->stream, array($this, 'handleData'));
  108. $this->listening = true;
  109. }
  110. }
  111. public function write($data)
  112. {
  113. if (!$this->writable) {
  114. return false;
  115. }
  116. return $this->buffer->write($data);
  117. }
  118. public function close()
  119. {
  120. if (!$this->writable && !$this->closing) {
  121. return;
  122. }
  123. $this->closing = false;
  124. $this->readable = false;
  125. $this->writable = false;
  126. $this->emit('close');
  127. $this->pause();
  128. $this->buffer->close();
  129. $this->removeAllListeners();
  130. if (\is_resource($this->stream)) {
  131. \fclose($this->stream);
  132. }
  133. }
  134. public function end($data = null)
  135. {
  136. if (!$this->writable) {
  137. return;
  138. }
  139. $this->closing = true;
  140. $this->readable = false;
  141. $this->writable = false;
  142. $this->pause();
  143. $this->buffer->end($data);
  144. }
  145. public function pipe(WritableStreamInterface $dest, array $options = array())
  146. {
  147. return Util::pipe($this, $dest, $options);
  148. }
  149. /** @internal */
  150. public function handleData($stream)
  151. {
  152. $error = null;
  153. \set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) {
  154. $error = new \ErrorException(
  155. $errstr,
  156. 0,
  157. $errno,
  158. $errfile,
  159. $errline
  160. );
  161. });
  162. $data = \stream_get_contents($stream, $this->bufferSize);
  163. \restore_error_handler();
  164. if ($error !== null) {
  165. $this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)));
  166. $this->close();
  167. return;
  168. }
  169. if ($data !== '') {
  170. $this->emit('data', array($data));
  171. } elseif (\feof($this->stream)) {
  172. // no data read => we reached the end and close the stream
  173. $this->emit('end');
  174. $this->close();
  175. }
  176. }
  177. /**
  178. * Returns whether this is a pipe resource in a legacy environment
  179. *
  180. * This works around a legacy PHP bug (#61019) that was fixed in PHP 5.4.28+
  181. * and PHP 5.5.12+ and newer.
  182. *
  183. * @param resource $resource
  184. * @return bool
  185. * @link https://github.com/reactphp/child-process/issues/40
  186. *
  187. * @codeCoverageIgnore
  188. */
  189. private function isLegacyPipe($resource)
  190. {
  191. if (\PHP_VERSION_ID < 50428 || (\PHP_VERSION_ID >= 50500 && \PHP_VERSION_ID < 50512)) {
  192. $meta = \stream_get_meta_data($resource);
  193. if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') {
  194. return true;
  195. }
  196. }
  197. return false;
  198. }
  199. }