channel = new Channel($limit); } public function __call($name, $arguments) { if (in_array($name, ['isFull', 'isEmpty'])) { return $this->channel->{$name}(...$arguments); } throw new InvalidArgumentException(sprintf('The method %s is not supported.', $name)); } public function getLimit(): int { return $this->limit; } public function length(): int { return $this->channel->getLength(); } public function getLength(): int { return $this->channel->getLength(); } public function getRunningCoroutineCount(): int { return $this->getLength(); } public function getChannel(): Channel { return $this->channel; } public function create(callable $callable): void { $this->channel->push(true); Coroutine::create(function () use ($callable) { try { $callable(); } catch (Throwable $exception) { if (ApplicationContext::hasContainer()) { $container = ApplicationContext::getContainer(); if ($container->has(StdoutLoggerInterface::class) && $container->has(FormatterInterface::class)) { $logger = $container->get(StdoutLoggerInterface::class); $formatter = $container->get(FormatterInterface::class); $logger->error($formatter->format($exception)); } } } finally { $this->channel->pop(); } }); } }