WritableResourceStream.php 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. <?php
  2. namespace React\Stream;
  3. use Evenement\EventEmitter;
  4. use React\EventLoop\Loop;
  5. use React\EventLoop\LoopInterface;
  6. final class WritableResourceStream extends EventEmitter implements WritableStreamInterface
  7. {
  8. private $stream;
  9. /** @var LoopInterface */
  10. private $loop;
  11. /**
  12. * @var int
  13. */
  14. private $softLimit;
  15. /**
  16. * @var int
  17. */
  18. private $writeChunkSize;
  19. private $listening = false;
  20. private $writable = true;
  21. private $closed = false;
  22. private $data = '';
  23. /**
  24. * @param resource $stream
  25. * @param ?LoopInterface $loop
  26. * @param ?int $writeBufferSoftLimit
  27. * @param ?int $writeChunkSize
  28. */
  29. public function __construct($stream, $loop = null, $writeBufferSoftLimit = null, $writeChunkSize = null)
  30. {
  31. if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") {
  32. throw new \InvalidArgumentException('First parameter must be a valid stream resource');
  33. }
  34. // ensure resource is opened for writing (fopen mode must contain either of "waxc+")
  35. $meta = \stream_get_meta_data($stream);
  36. if (isset($meta['mode']) && $meta['mode'] !== '' && \strtr($meta['mode'], 'waxc+', '.....') === $meta['mode']) {
  37. throw new \InvalidArgumentException('Given stream resource is not opened in write mode');
  38. }
  39. // this class relies on non-blocking I/O in order to not interrupt the event loop
  40. // e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
  41. if (\stream_set_blocking($stream, false) !== true) {
  42. throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
  43. }
  44. if ($loop !== null && !$loop instanceof LoopInterface) { // manual type check to support legacy PHP < 7.1
  45. throw new \InvalidArgumentException('Argument #2 ($loop) expected null|React\EventLoop\LoopInterface');
  46. }
  47. $this->stream = $stream;
  48. $this->loop = $loop ?: Loop::get();
  49. $this->softLimit = ($writeBufferSoftLimit === null) ? 65536 : (int)$writeBufferSoftLimit;
  50. $this->writeChunkSize = ($writeChunkSize === null) ? -1 : (int)$writeChunkSize;
  51. }
  52. public function isWritable()
  53. {
  54. return $this->writable;
  55. }
  56. public function write($data)
  57. {
  58. if (!$this->writable) {
  59. return false;
  60. }
  61. $this->data .= $data;
  62. if (!$this->listening && $this->data !== '') {
  63. $this->listening = true;
  64. $this->loop->addWriteStream($this->stream, array($this, 'handleWrite'));
  65. }
  66. return !isset($this->data[$this->softLimit - 1]);
  67. }
  68. public function end($data = null)
  69. {
  70. if (null !== $data) {
  71. $this->write($data);
  72. }
  73. $this->writable = false;
  74. // close immediately if buffer is already empty
  75. // otherwise wait for buffer to flush first
  76. if ($this->data === '') {
  77. $this->close();
  78. }
  79. }
  80. public function close()
  81. {
  82. if ($this->closed) {
  83. return;
  84. }
  85. if ($this->listening) {
  86. $this->listening = false;
  87. $this->loop->removeWriteStream($this->stream);
  88. }
  89. $this->closed = true;
  90. $this->writable = false;
  91. $this->data = '';
  92. $this->emit('close');
  93. $this->removeAllListeners();
  94. if (\is_resource($this->stream)) {
  95. \fclose($this->stream);
  96. }
  97. }
  98. /** @internal */
  99. public function handleWrite()
  100. {
  101. $error = null;
  102. \set_error_handler(function ($_, $errstr) use (&$error) {
  103. $error = $errstr;
  104. });
  105. if ($this->writeChunkSize === -1) {
  106. $sent = \fwrite($this->stream, $this->data);
  107. } else {
  108. $sent = \fwrite($this->stream, $this->data, $this->writeChunkSize);
  109. }
  110. \restore_error_handler();
  111. // Only report errors if *nothing* could be sent and an error has been raised.
  112. // Ignore non-fatal warnings if *some* data could be sent.
  113. // Any hard (permanent) error will fail to send any data at all.
  114. // Sending excessive amounts of data will only flush *some* data and then
  115. // report a temporary error (EAGAIN) which we do not raise here in order
  116. // to keep the stream open for further tries to write.
  117. // Should this turn out to be a permanent error later, it will eventually
  118. // send *nothing* and we can detect this.
  119. if (($sent === 0 || $sent === false) && $error !== null) {
  120. $this->emit('error', array(new \RuntimeException('Unable to write to stream: ' . $error)));
  121. $this->close();
  122. return;
  123. }
  124. $exceeded = isset($this->data[$this->softLimit - 1]);
  125. $this->data = (string) \substr($this->data, $sent);
  126. // buffer has been above limit and is now below limit
  127. if ($exceeded && !isset($this->data[$this->softLimit - 1])) {
  128. $this->emit('drain');
  129. }
  130. // buffer is now completely empty => stop trying to write
  131. if ($this->data === '') {
  132. // stop waiting for resource to be writable
  133. if ($this->listening) {
  134. $this->loop->removeWriteStream($this->stream);
  135. $this->listening = false;
  136. }
  137. // buffer is end()ing and now completely empty => close buffer
  138. if (!$this->writable) {
  139. $this->close();
  140. }
  141. }
  142. }
  143. }