PoolHandler.php 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * This file is part of Hyperf.
  5. *
  6. * @link https://www.hyperf.io
  7. * @document https://hyperf.wiki
  8. * @contact group@hyperf.io
  9. * @license https://github.com/hyperf/hyperf/blob/master/LICENSE
  10. */
  11. namespace Hyperf\Guzzle;
  12. use Exception;
  13. use GuzzleHttp\Exception\ConnectException;
  14. use GuzzleHttp\Promise\Create;
  15. use GuzzleHttp\Promise\FulfilledPromise;
  16. use Hyperf\Pool\SimplePool\PoolFactory;
  17. use Psr\Http\Message\RequestInterface;
  18. use Psr\Http\Message\UriInterface;
  19. class PoolHandler extends CoroutineHandler
  20. {
  21. public function __construct(protected PoolFactory $factory, protected array $option = [])
  22. {
  23. }
  24. public function __invoke(RequestInterface $request, array $options)
  25. {
  26. $uri = $request->getUri();
  27. $host = $uri->getHost();
  28. $port = $uri->getPort();
  29. $ssl = $uri->getScheme() === 'https';
  30. $path = $uri->getPath();
  31. $query = $uri->getQuery();
  32. if (empty($port)) {
  33. $port = $ssl ? 443 : 80;
  34. }
  35. if (empty($path)) {
  36. $path = '/';
  37. }
  38. if ($query !== '') {
  39. $path .= '?' . $query;
  40. }
  41. $pool = $this->factory->get($this->getPoolName($uri), function () use ($host, $port, $ssl) {
  42. return $this->makeClient($host, $port, $ssl);
  43. }, $this->option);
  44. $connection = $pool->get();
  45. $response = null;
  46. try {
  47. $client = $connection->getConnection();
  48. $headers = $this->initHeaders($request, $options);
  49. $settings = $this->getSettings($request, $options);
  50. if (! empty($settings)) {
  51. $client->set($settings);
  52. }
  53. $ms = microtime(true);
  54. try {
  55. $raw = $client->request($request->getMethod(), $path, $headers, (string) $request->getBody());
  56. } catch (Exception $exception) {
  57. $connection->close();
  58. $exception = new ConnectException($exception->getMessage(), $request, null, [
  59. 'errCode' => $exception->getCode(),
  60. ]);
  61. return Create::rejectionFor($exception);
  62. }
  63. $response = $this->getResponse($raw, $request, $options, microtime(true) - $ms);
  64. } finally {
  65. $connection->release();
  66. }
  67. return new FulfilledPromise($response);
  68. }
  69. protected function getPoolName(UriInterface $uri)
  70. {
  71. return sprintf('guzzle.handler.%s.%d.%s', $uri->getHost(), $uri->getPort(), $uri->getScheme());
  72. }
  73. }