Util.php 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. <?php
  2. namespace React\Stream;
  3. final class Util
  4. {
  5. /**
  6. * Pipes all the data from the given $source into the $dest
  7. *
  8. * @param ReadableStreamInterface $source
  9. * @param WritableStreamInterface $dest
  10. * @param array $options
  11. * @return WritableStreamInterface $dest stream as-is
  12. * @see ReadableStreamInterface::pipe() for more details
  13. */
  14. public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = array())
  15. {
  16. // source not readable => NO-OP
  17. if (!$source->isReadable()) {
  18. return $dest;
  19. }
  20. // destination not writable => just pause() source
  21. if (!$dest->isWritable()) {
  22. $source->pause();
  23. return $dest;
  24. }
  25. $dest->emit('pipe', array($source));
  26. // forward all source data events as $dest->write()
  27. $source->on('data', $dataer = function ($data) use ($source, $dest) {
  28. $feedMore = $dest->write($data);
  29. if (false === $feedMore) {
  30. $source->pause();
  31. }
  32. });
  33. $dest->on('close', function () use ($source, $dataer) {
  34. $source->removeListener('data', $dataer);
  35. $source->pause();
  36. });
  37. // forward destination drain as $source->resume()
  38. $dest->on('drain', $drainer = function () use ($source) {
  39. $source->resume();
  40. });
  41. $source->on('close', function () use ($dest, $drainer) {
  42. $dest->removeListener('drain', $drainer);
  43. });
  44. // forward end event from source as $dest->end()
  45. $end = isset($options['end']) ? $options['end'] : true;
  46. if ($end) {
  47. $source->on('end', $ender = function () use ($dest) {
  48. $dest->end();
  49. });
  50. $dest->on('close', function () use ($source, $ender) {
  51. $source->removeListener('end', $ender);
  52. });
  53. }
  54. return $dest;
  55. }
  56. public static function forwardEvents($source, $target, array $events)
  57. {
  58. foreach ($events as $event) {
  59. $source->on($event, function () use ($event, $target) {
  60. $target->emit($event, \func_get_args());
  61. });
  62. }
  63. }
  64. }