ReadableResourceStream.php 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. <?php
  2. namespace React\Stream;
  3. use Evenement\EventEmitter;
  4. use React\EventLoop\Loop;
  5. use React\EventLoop\LoopInterface;
  6. use InvalidArgumentException;
  7. final class ReadableResourceStream extends EventEmitter implements ReadableStreamInterface
  8. {
  9. /**
  10. * @var resource
  11. */
  12. private $stream;
  13. /** @var LoopInterface */
  14. private $loop;
  15. /**
  16. * Controls the maximum buffer size in bytes to read at once from the stream.
  17. *
  18. * This value SHOULD NOT be changed unless you know what you're doing.
  19. *
  20. * This can be a positive number which means that up to X bytes will be read
  21. * at once from the underlying stream resource. Note that the actual number
  22. * of bytes read may be lower if the stream resource has less than X bytes
  23. * currently available.
  24. *
  25. * This can be `-1` which means read everything available from the
  26. * underlying stream resource.
  27. * This should read until the stream resource is not readable anymore
  28. * (i.e. underlying buffer drained), note that this does not neccessarily
  29. * mean it reached EOF.
  30. *
  31. * @var int
  32. */
  33. private $bufferSize;
  34. private $closed = false;
  35. private $listening = false;
  36. public function __construct($stream, LoopInterface $loop = null, $readChunkSize = null)
  37. {
  38. if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") {
  39. throw new InvalidArgumentException('First parameter must be a valid stream resource');
  40. }
  41. // ensure resource is opened for reading (fopen mode must contain "r" or "+")
  42. $meta = \stream_get_meta_data($stream);
  43. if (isset($meta['mode']) && $meta['mode'] !== '' && \strpos($meta['mode'], 'r') === \strpos($meta['mode'], '+')) {
  44. throw new InvalidArgumentException('Given stream resource is not opened in read mode');
  45. }
  46. // this class relies on non-blocking I/O in order to not interrupt the event loop
  47. // e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
  48. if (\stream_set_blocking($stream, false) !== true) {
  49. throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
  50. }
  51. // Use unbuffered read operations on the underlying stream resource.
  52. // Reading chunks from the stream may otherwise leave unread bytes in
  53. // PHP's stream buffers which some event loop implementations do not
  54. // trigger events on (edge triggered).
  55. // This does not affect the default event loop implementation (level
  56. // triggered), so we can ignore platforms not supporting this (HHVM).
  57. // Pipe streams (such as STDIN) do not seem to require this and legacy
  58. // PHP versions cause SEGFAULTs on unbuffered pipe streams, so skip this.
  59. if (\function_exists('stream_set_read_buffer') && !$this->isLegacyPipe($stream)) {
  60. \stream_set_read_buffer($stream, 0);
  61. }
  62. $this->stream = $stream;
  63. $this->loop = $loop ?: Loop::get();
  64. $this->bufferSize = ($readChunkSize === null) ? 65536 : (int)$readChunkSize;
  65. $this->resume();
  66. }
  67. public function isReadable()
  68. {
  69. return !$this->closed;
  70. }
  71. public function pause()
  72. {
  73. if ($this->listening) {
  74. $this->loop->removeReadStream($this->stream);
  75. $this->listening = false;
  76. }
  77. }
  78. public function resume()
  79. {
  80. if (!$this->listening && !$this->closed) {
  81. $this->loop->addReadStream($this->stream, array($this, 'handleData'));
  82. $this->listening = true;
  83. }
  84. }
  85. public function pipe(WritableStreamInterface $dest, array $options = array())
  86. {
  87. return Util::pipe($this, $dest, $options);
  88. }
  89. public function close()
  90. {
  91. if ($this->closed) {
  92. return;
  93. }
  94. $this->closed = true;
  95. $this->emit('close');
  96. $this->pause();
  97. $this->removeAllListeners();
  98. if (\is_resource($this->stream)) {
  99. \fclose($this->stream);
  100. }
  101. }
  102. /** @internal */
  103. public function handleData()
  104. {
  105. $error = null;
  106. \set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error) {
  107. $error = new \ErrorException(
  108. $errstr,
  109. 0,
  110. $errno,
  111. $errfile,
  112. $errline
  113. );
  114. });
  115. $data = \stream_get_contents($this->stream, $this->bufferSize);
  116. \restore_error_handler();
  117. if ($error !== null) {
  118. $this->emit('error', array(new \RuntimeException('Unable to read from stream: ' . $error->getMessage(), 0, $error)));
  119. $this->close();
  120. return;
  121. }
  122. if ($data !== '') {
  123. $this->emit('data', array($data));
  124. } elseif (\feof($this->stream)) {
  125. // no data read => we reached the end and close the stream
  126. $this->emit('end');
  127. $this->close();
  128. }
  129. }
  130. /**
  131. * Returns whether this is a pipe resource in a legacy environment
  132. *
  133. * This works around a legacy PHP bug (#61019) that was fixed in PHP 5.4.28+
  134. * and PHP 5.5.12+ and newer.
  135. *
  136. * @param resource $resource
  137. * @return bool
  138. * @link https://github.com/reactphp/child-process/issues/40
  139. *
  140. * @codeCoverageIgnore
  141. */
  142. private function isLegacyPipe($resource)
  143. {
  144. if (\PHP_VERSION_ID < 50428 || (\PHP_VERSION_ID >= 50500 && \PHP_VERSION_ID < 50512)) {
  145. $meta = \stream_get_meta_data($resource);
  146. if (isset($meta['stream_type']) && $meta['stream_type'] === 'STDIO') {
  147. return true;
  148. }
  149. }
  150. return false;
  151. }
  152. }