Decoder.php 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. <?php
  2. namespace Clue\React\NDJson;
  3. use Evenement\EventEmitter;
  4. use React\Stream\ReadableStreamInterface;
  5. use React\Stream\Util;
  6. use React\Stream\WritableStreamInterface;
  7. /**
  8. * The Decoder / Parser reads from a plain stream and emits data objects for each JSON element
  9. */
  10. class Decoder extends EventEmitter implements ReadableStreamInterface
  11. {
  12. private $input;
  13. private $assoc;
  14. private $depth;
  15. private $options;
  16. /** @var int */
  17. private $maxlength;
  18. private $buffer = '';
  19. private $closed = false;
  20. /**
  21. * @param ReadableStreamInterface $input
  22. * @param bool $assoc
  23. * @param int $depth
  24. * @param int $options (requires PHP 5.4+)
  25. * @param int $maxlength
  26. * @throws \BadMethodCallException
  27. */
  28. public function __construct(ReadableStreamInterface $input, $assoc = false, $depth = 512, $options = 0, $maxlength = 65536)
  29. {
  30. // @codeCoverageIgnoreStart
  31. if ($options !== 0 && \PHP_VERSION < 5.4) {
  32. throw new \BadMethodCallException('Options parameter is only supported on PHP 5.4+');
  33. }
  34. if (\defined('JSON_THROW_ON_ERROR')) {
  35. $options = $options & ~\JSON_THROW_ON_ERROR;
  36. }
  37. // @codeCoverageIgnoreEnd
  38. $this->input = $input;
  39. if (!$input->isReadable()) {
  40. $this->close();
  41. return;
  42. }
  43. $this->assoc = $assoc;
  44. $this->depth = $depth;
  45. $this->options = $options;
  46. $this->maxlength = $maxlength;
  47. $this->input->on('data', array($this, 'handleData'));
  48. $this->input->on('end', array($this, 'handleEnd'));
  49. $this->input->on('error', array($this, 'handleError'));
  50. $this->input->on('close', array($this, 'close'));
  51. }
  52. public function isReadable()
  53. {
  54. return !$this->closed;
  55. }
  56. public function close()
  57. {
  58. if ($this->closed) {
  59. return;
  60. }
  61. $this->closed = true;
  62. $this->buffer = '';
  63. $this->input->close();
  64. $this->emit('close');
  65. $this->removeAllListeners();
  66. }
  67. public function pause()
  68. {
  69. $this->input->pause();
  70. }
  71. public function resume()
  72. {
  73. $this->input->resume();
  74. }
  75. public function pipe(WritableStreamInterface $dest, array $options = array())
  76. {
  77. Util::pipe($this, $dest, $options);
  78. return $dest;
  79. }
  80. /** @internal */
  81. public function handleData($data)
  82. {
  83. if (!\is_string($data)) {
  84. $this->handleError(new \UnexpectedValueException('Expected stream to emit string, but got ' . \gettype($data)));
  85. return;
  86. }
  87. $this->buffer .= $data;
  88. // keep parsing while a newline has been found
  89. while (($newline = \strpos($this->buffer, "\n")) !== false && $newline <= $this->maxlength) {
  90. // read data up until newline and remove from buffer
  91. $data = (string)\substr($this->buffer, 0, $newline);
  92. $this->buffer = (string)\substr($this->buffer, $newline + 1);
  93. // decode data with options given in ctor
  94. // @codeCoverageIgnoreStart
  95. if ($this->options === 0) {
  96. $data = \json_decode($data, $this->assoc, $this->depth);
  97. } else {
  98. assert(\PHP_VERSION_ID >= 50400);
  99. $data = \json_decode($data, $this->assoc, $this->depth, $this->options);
  100. }
  101. // @codeCoverageIgnoreEnd
  102. // abort stream if decoding failed
  103. if ($data === null && \json_last_error() !== \JSON_ERROR_NONE) {
  104. // @codeCoverageIgnoreStart
  105. if (\PHP_VERSION_ID > 50500) {
  106. $errstr = \json_last_error_msg();
  107. } elseif (\json_last_error() === \JSON_ERROR_SYNTAX) {
  108. $errstr = 'Syntax error';
  109. } else {
  110. $errstr = 'Unknown error';
  111. }
  112. // @codeCoverageIgnoreEnd
  113. return $this->handleError(new \RuntimeException('Unable to decode JSON: ' . $errstr, \json_last_error()));
  114. }
  115. $this->emit('data', array($data));
  116. }
  117. if (isset($this->buffer[$this->maxlength])) {
  118. $this->handleError(new \OverflowException('Buffer size exceeded'));
  119. }
  120. }
  121. /** @internal */
  122. public function handleEnd()
  123. {
  124. if ($this->buffer !== '') {
  125. $this->handleData("\n");
  126. }
  127. if (!$this->closed) {
  128. $this->emit('end');
  129. $this->close();
  130. }
  131. }
  132. /** @internal */
  133. public function handleError(\Exception $error)
  134. {
  135. $this->emit('error', array($error));
  136. $this->close();
  137. }
  138. }