CoroutineHandler.php 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. <?php
  2. declare(strict_types=1);
  3. /**
  4. * This file is part of Hyperf.
  5. *
  6. * @link https://www.hyperf.io
  7. * @document https://hyperf.wiki
  8. * @contact group@hyperf.io
  9. * @license https://github.com/hyperf/hyperf/blob/master/LICENSE
  10. */
  11. namespace Hyperf\Guzzle;
  12. use Exception;
  13. use GuzzleHttp;
  14. use GuzzleHttp\Exception\ConnectException;
  15. use GuzzleHttp\Promise\Create;
  16. use GuzzleHttp\Promise\FulfilledPromise;
  17. use GuzzleHttp\Promise\PromiseInterface;
  18. use GuzzleHttp\Psr7;
  19. use GuzzleHttp\Psr7\Uri;
  20. use GuzzleHttp\Psr7\Utils;
  21. use GuzzleHttp\RequestOptions;
  22. use GuzzleHttp\TransferStats;
  23. use Hyperf\Engine\Http\Client;
  24. use Hyperf\Engine\Http\RawResponse;
  25. use InvalidArgumentException;
  26. use Psr\Http\Message\RequestInterface;
  27. use Psr\Http\Message\StreamInterface;
  28. use Psr\Http\Message\UriInterface;
  29. /**
  30. * Http handler that uses Swoole/Swow Coroutine as a transport layer.
  31. */
  32. class CoroutineHandler
  33. {
  34. /**
  35. * @see \GuzzleHttp\Psr7\Uri::$defaultPorts
  36. */
  37. private static array $defaultPorts = [
  38. 'http' => 80,
  39. 'https' => 443,
  40. ];
  41. /**
  42. * @return PromiseInterface
  43. */
  44. public function __invoke(RequestInterface $request, array $options)
  45. {
  46. $uri = $request->getUri();
  47. $host = $uri->getHost();
  48. $port = $this->getPort($uri);
  49. $ssl = $uri->getScheme() === 'https';
  50. $path = $uri->getPath();
  51. $query = $uri->getQuery();
  52. if (empty($path)) {
  53. $path = '/';
  54. }
  55. if ($query !== '') {
  56. $path .= '?' . $query;
  57. }
  58. $client = $this->makeClient($host, $port, $ssl);
  59. // Init Headers
  60. $headers = $this->initHeaders($request, $options);
  61. // Init Settings
  62. $settings = $this->getSettings($request, $options);
  63. if (! empty($settings)) {
  64. $client->set($settings);
  65. }
  66. $ms = microtime(true);
  67. try {
  68. $raw = $client->request($request->getMethod(), $path, $headers, (string) $request->getBody());
  69. } catch (Exception $exception) {
  70. $exception = new ConnectException($exception->getMessage(), $request, null, [
  71. 'errCode' => $exception->getCode(),
  72. ]);
  73. return Create::rejectionFor($exception);
  74. }
  75. $response = $this->getResponse($raw, $request, $options, microtime(true) - $ms);
  76. return new FulfilledPromise($response);
  77. }
  78. protected function makeClient(string $host, int $port, bool $ssl): Client
  79. {
  80. return new Client($host, $port, $ssl);
  81. }
  82. protected function initHeaders(RequestInterface $request, array $options): array
  83. {
  84. $headers = $request->getHeaders();
  85. $userInfo = $request->getUri()->getUserInfo();
  86. if ($userInfo) {
  87. $headers['Authorization'] = sprintf('Basic %s', base64_encode($userInfo));
  88. }
  89. return $this->rewriteHeaders($headers);
  90. }
  91. protected function rewriteHeaders(array $headers): array
  92. {
  93. // Unknown reason, Content-Length will cause 400 some time.
  94. // Expect header is not supported by \Swoole\Coroutine\Http\Client.
  95. unset($headers['Content-Length'], $headers['Expect']);
  96. return $headers;
  97. }
  98. protected function getSettings(RequestInterface $request, array $options): array
  99. {
  100. $settings = [];
  101. if (isset($options['delay']) && $options['delay'] > 0) {
  102. usleep(intval($options['delay'] * 1000));
  103. }
  104. // 验证服务端证书
  105. if (isset($options['verify'])) {
  106. $settings['ssl_verify_peer'] = false;
  107. if ($options['verify'] !== false) {
  108. $settings['ssl_verify_peer'] = true;
  109. $settings['ssl_allow_self_signed'] = true;
  110. $settings['ssl_host_name'] = $request->getUri()->getHost();
  111. if (is_string($options['verify'])) {
  112. // Throw an error if the file/folder/link path is not valid or doesn't exist.
  113. if (! file_exists($options['verify'])) {
  114. throw new InvalidArgumentException("SSL CA bundle not found: {$options['verify']}");
  115. }
  116. // If it's a directory or a link to a directory use CURLOPT_CAPATH.
  117. // If not, it's probably a file, or a link to a file, so use CURLOPT_CAINFO.
  118. if (is_dir($options['verify'])
  119. || (is_link($options['verify']) && is_dir(readlink($options['verify'])))) {
  120. $settings['ssl_capath'] = $options['verify'];
  121. } else {
  122. $settings['ssl_cafile'] = $options['verify'];
  123. }
  124. }
  125. }
  126. }
  127. // 超时
  128. if (isset($options['timeout']) && $options['timeout'] > 0) {
  129. $settings['timeout'] = $options['timeout'];
  130. }
  131. // Proxy
  132. if (! empty($options['proxy'])) {
  133. $uri = null;
  134. if (is_array($options['proxy'])) {
  135. $scheme = $request->getUri()->getScheme();
  136. if (isset($options['proxy'][$scheme])) {
  137. $host = $request->getUri()->getHost();
  138. if (! isset($options['proxy']['no']) || ! GuzzleHttp\Utils::isHostInNoProxy($host, $options['proxy']['no'])) {
  139. $uri = new Uri($options['proxy'][$scheme]);
  140. }
  141. }
  142. } else {
  143. $uri = new Uri($options['proxy']);
  144. }
  145. if ($uri) {
  146. $settings['http_proxy_host'] = $uri->getHost();
  147. $settings['http_proxy_port'] = $this->getPort($uri);
  148. if ($uri->getUserInfo()) {
  149. [$user, $password] = explode(':', $uri->getUserInfo());
  150. $settings['http_proxy_user'] = $user;
  151. if (! empty($password)) {
  152. $settings['http_proxy_password'] = $password;
  153. }
  154. }
  155. }
  156. }
  157. // SSL KEY
  158. isset($options['ssl_key']) && $settings['ssl_key_file'] = $options['ssl_key'];
  159. isset($options['cert']) && $settings['ssl_cert_file'] = $options['cert'];
  160. // Swoole Setting
  161. if (isset($options['swoole']) && is_array($options['swoole'])) {
  162. $settings = array_replace($settings, $options['swoole']);
  163. }
  164. return $settings;
  165. }
  166. protected function getResponse(RawResponse $raw, RequestInterface $request, array $options, float $transferTime)
  167. {
  168. $body = $raw->body;
  169. $sink = $options['sink'] ?? null;
  170. if (isset($sink) && (is_string($sink) || is_resource($sink))) {
  171. $body = $this->createSink($body, $sink);
  172. }
  173. $response = new Psr7\Response(
  174. $raw->statusCode,
  175. $raw->headers,
  176. $body
  177. );
  178. if ($callback = $options[RequestOptions::ON_STATS] ?? null) {
  179. $stats = new TransferStats(
  180. $request,
  181. $response,
  182. $transferTime,
  183. $raw->statusCode,
  184. []
  185. );
  186. $callback($stats);
  187. }
  188. return $response;
  189. }
  190. protected function createStream(string $body): StreamInterface
  191. {
  192. return Utils::streamFor($body);
  193. }
  194. /**
  195. * @param resource|string $stream
  196. */
  197. protected function createSink(string $body, $stream)
  198. {
  199. if (is_string($stream)) {
  200. $stream = fopen($stream, 'w+');
  201. }
  202. if ($body !== '') {
  203. fwrite($stream, $body);
  204. }
  205. return $stream;
  206. }
  207. /**
  208. * @throws InvalidArgumentException
  209. */
  210. protected function getPort(UriInterface $uri): int
  211. {
  212. if ($port = $uri->getPort()) {
  213. return $port;
  214. }
  215. if (isset(self::$defaultPorts[$uri->getScheme()])) {
  216. return self::$defaultPorts[$uri->getScheme()];
  217. }
  218. throw new InvalidArgumentException("Unsupported scheme from the URI {$uri->__toString()}");
  219. }
  220. }