CompositeStream.php 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. <?php
  2. namespace React\Stream;
  3. use Evenement\EventEmitter;
  4. final class CompositeStream extends EventEmitter implements DuplexStreamInterface
  5. {
  6. private $readable;
  7. private $writable;
  8. private $closed = false;
  9. public function __construct(ReadableStreamInterface $readable, WritableStreamInterface $writable)
  10. {
  11. $this->readable = $readable;
  12. $this->writable = $writable;
  13. if (!$readable->isReadable() || !$writable->isWritable()) {
  14. $this->close();
  15. return;
  16. }
  17. Util::forwardEvents($this->readable, $this, array('data', 'end', 'error'));
  18. Util::forwardEvents($this->writable, $this, array('drain', 'error', 'pipe'));
  19. $this->readable->on('close', array($this, 'close'));
  20. $this->writable->on('close', array($this, 'close'));
  21. }
  22. public function isReadable()
  23. {
  24. return $this->readable->isReadable();
  25. }
  26. public function pause()
  27. {
  28. $this->readable->pause();
  29. }
  30. public function resume()
  31. {
  32. if (!$this->writable->isWritable()) {
  33. return;
  34. }
  35. $this->readable->resume();
  36. }
  37. public function pipe(WritableStreamInterface $dest, array $options = array())
  38. {
  39. return Util::pipe($this, $dest, $options);
  40. }
  41. public function isWritable()
  42. {
  43. return $this->writable->isWritable();
  44. }
  45. public function write($data)
  46. {
  47. return $this->writable->write($data);
  48. }
  49. public function end($data = null)
  50. {
  51. $this->readable->pause();
  52. $this->writable->end($data);
  53. }
  54. public function close()
  55. {
  56. if ($this->closed) {
  57. return;
  58. }
  59. $this->closed = true;
  60. $this->readable->close();
  61. $this->writable->close();
  62. $this->emit('close');
  63. $this->removeAllListeners();
  64. }
  65. }