diff options
author | Valery Piashchynski <[email protected]> | 2020-11-18 10:02:58 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-11-18 10:02:58 +0300 |
commit | a2ba8de5eb519f73044a9b1c66f087a5b65e3d45 (patch) | |
tree | dbcb1649a5ea8f7b50706ffc1cae775f1ef65b42 /src | |
parent | 3cbdd3d3e44b3b4e72565d666391e3b732950774 (diff) |
Add Reset
Diffstat (limited to 'src')
-rw-r--r-- | src/Exception/RoadRunnerException.php | 2 | ||||
-rw-r--r-- | src/Http/HttpClient.php | 75 | ||||
-rw-r--r-- | src/Http/PSR7Client.php | 217 | ||||
-rw-r--r-- | src/Logger/.empty | 0 | ||||
-rw-r--r-- | src/Metrics/Metrics.php | 80 | ||||
-rw-r--r-- | src/Metrics/MetricsInterface.php | 64 | ||||
-rw-r--r-- | src/WorkerInterface.php | 0 |
7 files changed, 437 insertions, 1 deletions
diff --git a/src/Exception/RoadRunnerException.php b/src/Exception/RoadRunnerException.php index f83c3dd4..cd657502 100644 --- a/src/Exception/RoadRunnerException.php +++ b/src/Exception/RoadRunnerException.php @@ -9,6 +9,6 @@ declare(strict_types=1); namespace Spiral\RoadRunner\Exception; -class RoadRunnerException extends \Spiral\RoadRunner\Exceptions\RoadRunnerException +class RoadRunnerException extends \RuntimeException { } diff --git a/src/Http/HttpClient.php b/src/Http/HttpClient.php new file mode 100644 index 00000000..4ca152c8 --- /dev/null +++ b/src/Http/HttpClient.php @@ -0,0 +1,75 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Alex Bond + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +final class HttpClient +{ + /** @var Worker */ + private $worker; + + /** + * @param Worker $worker + */ + public function __construct(Worker $worker) + { + $this->worker = $worker; + } + + /** + * @return Worker + */ + public function getWorker(): Worker + { + return $this->worker; + } + + /** + * @return mixed[]|null Request information as ['ctx'=>[], 'body'=>string] + * or null if termination request or invalid context. + */ + public function acceptRequest(): ?array + { + $body = $this->getWorker()->receive($ctx); + if (empty($body) && empty($ctx)) { + // termination request + return null; + } + + $ctx = json_decode($ctx, true); + if ($ctx === null) { + // invalid context + return null; + } + + return ['ctx' => $ctx, 'body' => $body]; + } + + /** + * Send response to the application server. + * + * @param int $status Http status code + * @param string $body Body of response + * @param string[][] $headers An associative array of the message's headers. Each + * key MUST be a header name, and each value MUST be an array of strings + * for that header. + */ + public function respond(int $status, string $body, array $headers = []): void + { + if (empty($headers)) { + // this is required to represent empty header set as map and not as array + $headers = new \stdClass(); + } + + $this->getWorker()->send( + $body, + (string) json_encode(['status' => $status, 'headers' => $headers]) + ); + } +} diff --git a/src/Http/PSR7Client.php b/src/Http/PSR7Client.php new file mode 100644 index 00000000..777dd891 --- /dev/null +++ b/src/Http/PSR7Client.php @@ -0,0 +1,217 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Wolfy-J + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\ServerRequestFactoryInterface; +use Psr\Http\Message\ServerRequestInterface; +use Psr\Http\Message\StreamFactoryInterface; +use Psr\Http\Message\UploadedFileFactoryInterface; +use Psr\Http\Message\UploadedFileInterface; + +/** + * Manages PSR-7 request and response. + */ +class PSR7Client +{ + /** @var HttpClient */ + private $httpClient; + + /** @var ServerRequestFactoryInterface */ + private $requestFactory; + + /** @var StreamFactoryInterface */ + private $streamFactory; + + /** @var UploadedFileFactoryInterface */ + private $uploadsFactory; + + /** @var mixed[] */ + private $originalServer = []; + + /** @var string[] Valid values for HTTP protocol version */ + private static $allowedVersions = ['1.0', '1.1', '2',]; + + /** + * @param Worker $worker + * @param ServerRequestFactoryInterface|null $requestFactory + * @param StreamFactoryInterface|null $streamFactory + * @param UploadedFileFactoryInterface|null $uploadsFactory + */ + public function __construct( + Worker $worker, + ServerRequestFactoryInterface $requestFactory = null, + StreamFactoryInterface $streamFactory = null, + UploadedFileFactoryInterface $uploadsFactory = null + ) { + $this->httpClient = new HttpClient($worker); + $this->requestFactory = $requestFactory ?? new Diactoros\ServerRequestFactory(); + $this->streamFactory = $streamFactory ?? new Diactoros\StreamFactory(); + $this->uploadsFactory = $uploadsFactory ?? new Diactoros\UploadedFileFactory(); + $this->originalServer = $_SERVER; + } + + /** + * @return Worker + */ + public function getWorker(): Worker + { + return $this->httpClient->getWorker(); + } + + /** + * @return ServerRequestInterface|null + */ + public function acceptRequest(): ?ServerRequestInterface + { + $rawRequest = $this->httpClient->acceptRequest(); + if ($rawRequest === null) { + return null; + } + + $_SERVER = $this->configureServer($rawRequest['ctx']); + + $request = $this->requestFactory->createServerRequest( + $rawRequest['ctx']['method'], + $rawRequest['ctx']['uri'], + $_SERVER + ); + + parse_str($rawRequest['ctx']['rawQuery'], $query); + + $request = $request + ->withProtocolVersion(static::fetchProtocolVersion($rawRequest['ctx']['protocol'])) + ->withCookieParams($rawRequest['ctx']['cookies']) + ->withQueryParams($query) + ->withUploadedFiles($this->wrapUploads($rawRequest['ctx']['uploads'])); + + foreach ($rawRequest['ctx']['attributes'] as $name => $value) { + $request = $request->withAttribute($name, $value); + } + + foreach ($rawRequest['ctx']['headers'] as $name => $value) { + $request = $request->withHeader($name, $value); + } + + if ($rawRequest['ctx']['parsed']) { + return $request->withParsedBody(json_decode($rawRequest['body'], true)); + } + + if ($rawRequest['body'] !== null) { + return $request->withBody($this->streamFactory->createStream($rawRequest['body'])); + } + + return $request; + } + + /** + * Send response to the application server. + * + * @param ResponseInterface $response + */ + public function respond(ResponseInterface $response): void + { + $this->httpClient->respond( + $response->getStatusCode(), + $response->getBody()->__toString(), + $response->getHeaders() + ); + } + + /** + * Returns altered copy of _SERVER variable. Sets ip-address, + * request-time and other values. + * + * @param mixed[] $ctx + * @return mixed[] + */ + protected function configureServer(array $ctx): array + { + $server = $this->originalServer; + + $server['REQUEST_URI'] = $ctx['uri']; + $server['REQUEST_TIME'] = time(); + $server['REQUEST_TIME_FLOAT'] = microtime(true); + $server['REMOTE_ADDR'] = $ctx['attributes']['ipAddress'] ?? $ctx['remoteAddr'] ?? '127.0.0.1'; + $server['REQUEST_METHOD'] = $ctx['method']; + + $server['HTTP_USER_AGENT'] = ''; + foreach ($ctx['headers'] as $key => $value) { + $key = strtoupper(str_replace('-', '_', $key)); + if (\in_array($key, ['CONTENT_TYPE', 'CONTENT_LENGTH'])) { + $server[$key] = implode(', ', $value); + } else { + $server['HTTP_' . $key] = implode(', ', $value); + } + } + + return $server; + } + + /** + * Wraps all uploaded files with UploadedFile. + * + * @param array[] $files + * + * @return UploadedFileInterface[]|mixed[] + */ + private function wrapUploads($files): array + { + if (empty($files)) { + return []; + } + + $result = []; + foreach ($files as $index => $f) { + if (!isset($f['name'])) { + $result[$index] = $this->wrapUploads($f); + continue; + } + + if (UPLOAD_ERR_OK === $f['error']) { + $stream = $this->streamFactory->createStreamFromFile($f['tmpName']); + } else { + $stream = $this->streamFactory->createStream(); + } + + $result[$index] = $this->uploadsFactory->createUploadedFile( + $stream, + $f['size'], + $f['error'], + $f['name'], + $f['mime'] + ); + } + + return $result; + } + + /** + * Normalize HTTP protocol version to valid values + * + * @param string $version + * @return string + */ + private static function fetchProtocolVersion(string $version): string + { + $v = substr($version, 5); + + if ($v === '2.0') { + return '2'; + } + + // Fallback for values outside of valid protocol versions + if (!in_array($v, static::$allowedVersions, true)) { + return '1.1'; + } + + return $v; + } +} diff --git a/src/Logger/.empty b/src/Logger/.empty new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/src/Logger/.empty diff --git a/src/Metrics/Metrics.php b/src/Metrics/Metrics.php new file mode 100644 index 00000000..d6b6e1da --- /dev/null +++ b/src/Metrics/Metrics.php @@ -0,0 +1,80 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +use Spiral\Goridge\Exceptions\RPCException; +use Spiral\Goridge\RPC; +use Spiral\RoadRunner\Exception\MetricException; + +/** + * Application metrics. + */ +final class Metrics implements MetricsInterface +{ + /** @var RPC */ + private $rpc; + + /** + * @param RPC $rpc + */ + public function __construct(RPC $rpc) + { + $this->rpc = $rpc; + } + + /** + * @inheritDoc + */ + public function add(string $name, float $value, array $labels = []): void + { + try { + $this->rpc->call('metrics.Add', compact('name', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } + + /** + * @inheritDoc + */ + public function sub(string $name, float $value, array $labels = []): void + { + try { + $this->rpc->call('metrics.Sub', compact('name', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } + + /** + * @inheritDoc + */ + public function observe(string $name, float $value, array $labels = []): void + { + try { + $this->rpc->call('metrics.Observe', compact('name', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } + + /** + * @inheritDoc + */ + public function set(string $name, float $value, array $labels = []): void + { + try { + $this->rpc->call('metrics.Set', compact('name', 'value', 'labels')); + } catch (RPCException $e) { + throw new MetricException($e->getMessage(), $e->getCode(), $e); + } + } +} diff --git a/src/Metrics/MetricsInterface.php b/src/Metrics/MetricsInterface.php new file mode 100644 index 00000000..ec2009b0 --- /dev/null +++ b/src/Metrics/MetricsInterface.php @@ -0,0 +1,64 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +use Spiral\RoadRunner\Exception\MetricException; + +interface MetricsInterface +{ + /** + * Add collector value. Fallback to appropriate method of related collector. + * + * @param string $collector + * @param float $value + * @param mixed[] $labels + * + * @throws MetricException + * @return void + */ + public function add(string $collector, float $value, array $labels = []); + + /** + * Subtract the collector value, only for gauge collector. + * + * @param string $collector + * @param float $value + * @param mixed[] $labels + * + * @throws MetricException + * @return void + */ + public function sub(string $collector, float $value, array $labels = []); + + /** + * Observe collector value, only for histogram and summary collectors. + * + * @param string $collector + * @param float $value + * @param mixed[] $labels + * + * @throws MetricException + * @return void + */ + public function observe(string $collector, float $value, array $labels = []); + + /** + * Set collector value, only for gauge collector. + * + * @param string $collector + * @param float $value + * @param mixed[] $labels + * + * @throws MetricException + * @return void + */ + public function set(string $collector, float $value, array $labels = []); +} diff --git a/src/WorkerInterface.php b/src/WorkerInterface.php new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/src/WorkerInterface.php |