WritableResourceStream.php 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. <?php
  2. namespace React\Stream;
  3. use Evenement\EventEmitter;
  4. use React\EventLoop\Loop;
  5. use React\EventLoop\LoopInterface;
  6. final class WritableResourceStream extends EventEmitter implements WritableStreamInterface
  7. {
  8. private $stream;
  9. /** @var LoopInterface */
  10. private $loop;
  11. /**
  12. * @var int
  13. */
  14. private $softLimit;
  15. /**
  16. * @var int
  17. */
  18. private $writeChunkSize;
  19. private $listening = false;
  20. private $writable = true;
  21. private $closed = false;
  22. private $data = '';
  23. public function __construct($stream, LoopInterface $loop = null, $writeBufferSoftLimit = null, $writeChunkSize = null)
  24. {
  25. if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") {
  26. throw new \InvalidArgumentException('First parameter must be a valid stream resource');
  27. }
  28. // ensure resource is opened for writing (fopen mode must contain either of "waxc+")
  29. $meta = \stream_get_meta_data($stream);
  30. if (isset($meta['mode']) && $meta['mode'] !== '' && \strtr($meta['mode'], 'waxc+', '.....') === $meta['mode']) {
  31. throw new \InvalidArgumentException('Given stream resource is not opened in write mode');
  32. }
  33. // this class relies on non-blocking I/O in order to not interrupt the event loop
  34. // e.g. pipes on Windows do not support this: https://bugs.php.net/bug.php?id=47918
  35. if (\stream_set_blocking($stream, false) !== true) {
  36. throw new \RuntimeException('Unable to set stream resource to non-blocking mode');
  37. }
  38. $this->stream = $stream;
  39. $this->loop = $loop ?: Loop::get();
  40. $this->softLimit = ($writeBufferSoftLimit === null) ? 65536 : (int)$writeBufferSoftLimit;
  41. $this->writeChunkSize = ($writeChunkSize === null) ? -1 : (int)$writeChunkSize;
  42. }
  43. public function isWritable()
  44. {
  45. return $this->writable;
  46. }
  47. public function write($data)
  48. {
  49. if (!$this->writable) {
  50. return false;
  51. }
  52. $this->data .= $data;
  53. if (!$this->listening && $this->data !== '') {
  54. $this->listening = true;
  55. $this->loop->addWriteStream($this->stream, array($this, 'handleWrite'));
  56. }
  57. return !isset($this->data[$this->softLimit - 1]);
  58. }
  59. public function end($data = null)
  60. {
  61. if (null !== $data) {
  62. $this->write($data);
  63. }
  64. $this->writable = false;
  65. // close immediately if buffer is already empty
  66. // otherwise wait for buffer to flush first
  67. if ($this->data === '') {
  68. $this->close();
  69. }
  70. }
  71. public function close()
  72. {
  73. if ($this->closed) {
  74. return;
  75. }
  76. if ($this->listening) {
  77. $this->listening = false;
  78. $this->loop->removeWriteStream($this->stream);
  79. }
  80. $this->closed = true;
  81. $this->writable = false;
  82. $this->data = '';
  83. $this->emit('close');
  84. $this->removeAllListeners();
  85. if (\is_resource($this->stream)) {
  86. \fclose($this->stream);
  87. }
  88. }
  89. /** @internal */
  90. public function handleWrite()
  91. {
  92. $error = null;
  93. \set_error_handler(function ($_, $errstr) use (&$error) {
  94. $error = $errstr;
  95. });
  96. if ($this->writeChunkSize === -1) {
  97. $sent = \fwrite($this->stream, $this->data);
  98. } else {
  99. $sent = \fwrite($this->stream, $this->data, $this->writeChunkSize);
  100. }
  101. \restore_error_handler();
  102. // Only report errors if *nothing* could be sent and an error has been raised.
  103. // Ignore non-fatal warnings if *some* data could be sent.
  104. // Any hard (permanent) error will fail to send any data at all.
  105. // Sending excessive amounts of data will only flush *some* data and then
  106. // report a temporary error (EAGAIN) which we do not raise here in order
  107. // to keep the stream open for further tries to write.
  108. // Should this turn out to be a permanent error later, it will eventually
  109. // send *nothing* and we can detect this.
  110. if (($sent === 0 || $sent === false) && $error !== null) {
  111. $this->emit('error', array(new \RuntimeException('Unable to write to stream: ' . $error)));
  112. $this->close();
  113. return;
  114. }
  115. $exceeded = isset($this->data[$this->softLimit - 1]);
  116. $this->data = (string) \substr($this->data, $sent);
  117. // buffer has been above limit and is now below limit
  118. if ($exceeded && !isset($this->data[$this->softLimit - 1])) {
  119. $this->emit('drain');
  120. }
  121. // buffer is now completely empty => stop trying to write
  122. if ($this->data === '') {
  123. // stop waiting for resource to be writable
  124. if ($this->listening) {
  125. $this->loop->removeWriteStream($this->stream);
  126. $this->listening = false;
  127. }
  128. // buffer is end()ing and now completely empty => close buffer
  129. if (!$this->writable) {
  130. $this->close();
  131. }
  132. }
  133. }
  134. }