HappyEyeBallsConnectionBuilder.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. <?php
  2. namespace React\Socket;
  3. use React\Dns\Model\Message;
  4. use React\Dns\Resolver\ResolverInterface;
  5. use React\EventLoop\LoopInterface;
  6. use React\EventLoop\TimerInterface;
  7. use React\Promise;
  8. use React\Promise\PromiseInterface;
  9. /**
  10. * @internal
  11. */
  12. final class HappyEyeBallsConnectionBuilder
  13. {
  14. /**
  15. * As long as we haven't connected yet keep popping an IP address of the connect queue until one of them
  16. * succeeds or they all fail. We will wait 100ms between connection attempts as per RFC.
  17. *
  18. * @link https://tools.ietf.org/html/rfc8305#section-5
  19. */
  20. const CONNECTION_ATTEMPT_DELAY = 0.1;
  21. /**
  22. * Delay `A` lookup by 50ms sending out connection to IPv4 addresses when IPv6 records haven't
  23. * resolved yet as per RFC.
  24. *
  25. * @link https://tools.ietf.org/html/rfc8305#section-3
  26. */
  27. const RESOLUTION_DELAY = 0.05;
  28. public $loop;
  29. public $connector;
  30. public $resolver;
  31. public $uri;
  32. public $host;
  33. public $resolved = array(
  34. Message::TYPE_A => false,
  35. Message::TYPE_AAAA => false,
  36. );
  37. public $resolverPromises = array();
  38. public $connectionPromises = array();
  39. public $connectQueue = array();
  40. public $nextAttemptTimer;
  41. public $parts;
  42. public $ipsCount = 0;
  43. public $failureCount = 0;
  44. public $resolve;
  45. public $reject;
  46. public $lastErrorFamily;
  47. public $lastError6;
  48. public $lastError4;
  49. public function __construct(LoopInterface $loop, ConnectorInterface $connector, ResolverInterface $resolver, $uri, $host, $parts)
  50. {
  51. $this->loop = $loop;
  52. $this->connector = $connector;
  53. $this->resolver = $resolver;
  54. $this->uri = $uri;
  55. $this->host = $host;
  56. $this->parts = $parts;
  57. }
  58. public function connect()
  59. {
  60. $that = $this;
  61. return new Promise\Promise(function ($resolve, $reject) use ($that) {
  62. $lookupResolve = function ($type) use ($that, $resolve, $reject) {
  63. return function (array $ips) use ($that, $type, $resolve, $reject) {
  64. unset($that->resolverPromises[$type]);
  65. $that->resolved[$type] = true;
  66. $that->mixIpsIntoConnectQueue($ips);
  67. // start next connection attempt if not already awaiting next
  68. if ($that->nextAttemptTimer === null && $that->connectQueue) {
  69. $that->check($resolve, $reject);
  70. }
  71. };
  72. };
  73. $that->resolverPromises[Message::TYPE_AAAA] = $that->resolve(Message::TYPE_AAAA, $reject)->then($lookupResolve(Message::TYPE_AAAA));
  74. $that->resolverPromises[Message::TYPE_A] = $that->resolve(Message::TYPE_A, $reject)->then(function (array $ips) use ($that) {
  75. // happy path: IPv6 has resolved already (or could not resolve), continue with IPv4 addresses
  76. if ($that->resolved[Message::TYPE_AAAA] === true || !$ips) {
  77. return $ips;
  78. }
  79. // Otherwise delay processing IPv4 lookup until short timer passes or IPv6 resolves in the meantime
  80. $deferred = new Promise\Deferred(function () use (&$ips) {
  81. // discard all IPv4 addresses if cancelled
  82. $ips = array();
  83. });
  84. $timer = $that->loop->addTimer($that::RESOLUTION_DELAY, function () use ($deferred, $ips) {
  85. $deferred->resolve($ips);
  86. });
  87. $that->resolverPromises[Message::TYPE_AAAA]->then(function () use ($that, $timer, $deferred, &$ips) {
  88. $that->loop->cancelTimer($timer);
  89. $deferred->resolve($ips);
  90. });
  91. return $deferred->promise();
  92. })->then($lookupResolve(Message::TYPE_A));
  93. }, function ($_, $reject) use ($that) {
  94. $reject(new \RuntimeException(
  95. 'Connection to ' . $that->uri . ' cancelled' . (!$that->connectionPromises ? ' during DNS lookup' : '') . ' (ECONNABORTED)',
  96. \defined('SOCKET_ECONNABORTED') ? \SOCKET_ECONNABORTED : 103
  97. ));
  98. $_ = $reject = null;
  99. $that->cleanUp();
  100. });
  101. }
  102. /**
  103. * @internal
  104. * @param int $type DNS query type
  105. * @param callable $reject
  106. * @return \React\Promise\PromiseInterface<string[]> Returns a promise that
  107. * always resolves with a list of IP addresses on success or an empty
  108. * list on error.
  109. */
  110. public function resolve($type, $reject)
  111. {
  112. $that = $this;
  113. return $that->resolver->resolveAll($that->host, $type)->then(null, function (\Exception $e) use ($type, $reject, $that) {
  114. unset($that->resolverPromises[$type]);
  115. $that->resolved[$type] = true;
  116. if ($type === Message::TYPE_A) {
  117. $that->lastError4 = $e->getMessage();
  118. $that->lastErrorFamily = 4;
  119. } else {
  120. $that->lastError6 = $e->getMessage();
  121. $that->lastErrorFamily = 6;
  122. }
  123. // cancel next attempt timer when there are no more IPs to connect to anymore
  124. if ($that->nextAttemptTimer !== null && !$that->connectQueue) {
  125. $that->loop->cancelTimer($that->nextAttemptTimer);
  126. $that->nextAttemptTimer = null;
  127. }
  128. if ($that->hasBeenResolved() && $that->ipsCount === 0) {
  129. $reject(new \RuntimeException(
  130. $that->error(),
  131. 0,
  132. $e
  133. ));
  134. }
  135. // Exception already handled above, so don't throw an unhandled rejection here
  136. return array();
  137. });
  138. }
  139. /**
  140. * @internal
  141. */
  142. public function check($resolve, $reject)
  143. {
  144. $ip = \array_shift($this->connectQueue);
  145. // start connection attempt and remember array position to later unset again
  146. $this->connectionPromises[] = $this->attemptConnection($ip);
  147. \end($this->connectionPromises);
  148. $index = \key($this->connectionPromises);
  149. $that = $this;
  150. $that->connectionPromises[$index]->then(function ($connection) use ($that, $index, $resolve) {
  151. unset($that->connectionPromises[$index]);
  152. $that->cleanUp();
  153. $resolve($connection);
  154. }, function (\Exception $e) use ($that, $index, $ip, $resolve, $reject) {
  155. unset($that->connectionPromises[$index]);
  156. $that->failureCount++;
  157. $message = \preg_replace('/^(Connection to [^ ]+)[&?]hostname=[^ &]+/', '$1', $e->getMessage());
  158. if (\strpos($ip, ':') === false) {
  159. $that->lastError4 = $message;
  160. $that->lastErrorFamily = 4;
  161. } else {
  162. $that->lastError6 = $message;
  163. $that->lastErrorFamily = 6;
  164. }
  165. // start next connection attempt immediately on error
  166. if ($that->connectQueue) {
  167. if ($that->nextAttemptTimer !== null) {
  168. $that->loop->cancelTimer($that->nextAttemptTimer);
  169. $that->nextAttemptTimer = null;
  170. }
  171. $that->check($resolve, $reject);
  172. }
  173. if ($that->hasBeenResolved() === false) {
  174. return;
  175. }
  176. if ($that->ipsCount === $that->failureCount) {
  177. $that->cleanUp();
  178. $reject(new \RuntimeException(
  179. $that->error(),
  180. $e->getCode(),
  181. $e
  182. ));
  183. }
  184. });
  185. // Allow next connection attempt in 100ms: https://tools.ietf.org/html/rfc8305#section-5
  186. // Only start timer when more IPs are queued or when DNS query is still pending (might add more IPs)
  187. if ($this->nextAttemptTimer === null && (\count($this->connectQueue) > 0 || $this->resolved[Message::TYPE_A] === false || $this->resolved[Message::TYPE_AAAA] === false)) {
  188. $this->nextAttemptTimer = $this->loop->addTimer(self::CONNECTION_ATTEMPT_DELAY, function () use ($that, $resolve, $reject) {
  189. $that->nextAttemptTimer = null;
  190. if ($that->connectQueue) {
  191. $that->check($resolve, $reject);
  192. }
  193. });
  194. }
  195. }
  196. /**
  197. * @internal
  198. */
  199. public function attemptConnection($ip)
  200. {
  201. $uri = Connector::uri($this->parts, $this->host, $ip);
  202. return $this->connector->connect($uri);
  203. }
  204. /**
  205. * @internal
  206. */
  207. public function cleanUp()
  208. {
  209. // clear list of outstanding IPs to avoid creating new connections
  210. $this->connectQueue = array();
  211. // cancel pending connection attempts
  212. foreach ($this->connectionPromises as $connectionPromise) {
  213. if ($connectionPromise instanceof PromiseInterface && \method_exists($connectionPromise, 'cancel')) {
  214. $connectionPromise->cancel();
  215. }
  216. }
  217. // cancel pending DNS resolution (cancel IPv4 first in case it is awaiting IPv6 resolution delay)
  218. foreach (\array_reverse($this->resolverPromises) as $resolverPromise) {
  219. if ($resolverPromise instanceof PromiseInterface && \method_exists($resolverPromise, 'cancel')) {
  220. $resolverPromise->cancel();
  221. }
  222. }
  223. if ($this->nextAttemptTimer instanceof TimerInterface) {
  224. $this->loop->cancelTimer($this->nextAttemptTimer);
  225. $this->nextAttemptTimer = null;
  226. }
  227. }
  228. /**
  229. * @internal
  230. */
  231. public function hasBeenResolved()
  232. {
  233. foreach ($this->resolved as $typeHasBeenResolved) {
  234. if ($typeHasBeenResolved === false) {
  235. return false;
  236. }
  237. }
  238. return true;
  239. }
  240. /**
  241. * Mixes an array of IP addresses into the connect queue in such a way they alternate when attempting to connect.
  242. * The goal behind it is first attempt to connect to IPv6, then to IPv4, then to IPv6 again until one of those
  243. * attempts succeeds.
  244. *
  245. * @link https://tools.ietf.org/html/rfc8305#section-4
  246. *
  247. * @internal
  248. */
  249. public function mixIpsIntoConnectQueue(array $ips)
  250. {
  251. \shuffle($ips);
  252. $this->ipsCount += \count($ips);
  253. $connectQueueStash = $this->connectQueue;
  254. $this->connectQueue = array();
  255. while (\count($connectQueueStash) > 0 || \count($ips) > 0) {
  256. if (\count($ips) > 0) {
  257. $this->connectQueue[] = \array_shift($ips);
  258. }
  259. if (\count($connectQueueStash) > 0) {
  260. $this->connectQueue[] = \array_shift($connectQueueStash);
  261. }
  262. }
  263. }
  264. /**
  265. * @internal
  266. * @return string
  267. */
  268. public function error()
  269. {
  270. if ($this->lastError4 === $this->lastError6) {
  271. $message = $this->lastError6;
  272. } elseif ($this->lastErrorFamily === 6) {
  273. $message = 'Last error for IPv6: ' . $this->lastError6 . '. Previous error for IPv4: ' . $this->lastError4;
  274. } else {
  275. $message = 'Last error for IPv4: ' . $this->lastError4 . '. Previous error for IPv6: ' . $this->lastError6;
  276. }
  277. if ($this->hasBeenResolved() && $this->ipsCount === 0) {
  278. if ($this->lastError6 === $this->lastError4) {
  279. $message = ' during DNS lookup: ' . $this->lastError6;
  280. } else {
  281. $message = ' during DNS lookup. ' . $message;
  282. }
  283. } else {
  284. $message = ': ' . $message;
  285. }
  286. return 'Connection to ' . $this->uri . ' failed' . $message;
  287. }
  288. }