summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-11-18 10:02:58 +0300
committerValery Piashchynski <[email protected]>2020-11-18 10:02:58 +0300
commita2ba8de5eb519f73044a9b1c66f087a5b65e3d45 (patch)
treedbcb1649a5ea8f7b50706ffc1cae775f1ef65b42 /src
parent3cbdd3d3e44b3b4e72565d666391e3b732950774 (diff)
Add Reset
Diffstat (limited to 'src')
-rw-r--r--src/Exception/RoadRunnerException.php2
-rw-r--r--src/Http/HttpClient.php75
-rw-r--r--src/Http/PSR7Client.php217
-rw-r--r--src/Logger/.empty0
-rw-r--r--src/Metrics/Metrics.php80
-rw-r--r--src/Metrics/MetricsInterface.php64
-rw-r--r--src/WorkerInterface.php0
7 files changed, 437 insertions, 1 deletions
diff --git a/src/Exception/RoadRunnerException.php b/src/Exception/RoadRunnerException.php
index f83c3dd4..cd657502 100644
--- a/src/Exception/RoadRunnerException.php
+++ b/src/Exception/RoadRunnerException.php
@@ -9,6 +9,6 @@ declare(strict_types=1);
namespace Spiral\RoadRunner\Exception;
-class RoadRunnerException extends \Spiral\RoadRunner\Exceptions\RoadRunnerException
+class RoadRunnerException extends \RuntimeException
{
}
diff --git a/src/Http/HttpClient.php b/src/Http/HttpClient.php
new file mode 100644
index 00000000..4ca152c8
--- /dev/null
+++ b/src/Http/HttpClient.php
@@ -0,0 +1,75 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go
+ *
+ * @author Alex Bond
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+final class HttpClient
+{
+ /** @var Worker */
+ private $worker;
+
+ /**
+ * @param Worker $worker
+ */
+ public function __construct(Worker $worker)
+ {
+ $this->worker = $worker;
+ }
+
+ /**
+ * @return Worker
+ */
+ public function getWorker(): Worker
+ {
+ return $this->worker;
+ }
+
+ /**
+ * @return mixed[]|null Request information as ['ctx'=>[], 'body'=>string]
+ * or null if termination request or invalid context.
+ */
+ public function acceptRequest(): ?array
+ {
+ $body = $this->getWorker()->receive($ctx);
+ if (empty($body) && empty($ctx)) {
+ // termination request
+ return null;
+ }
+
+ $ctx = json_decode($ctx, true);
+ if ($ctx === null) {
+ // invalid context
+ return null;
+ }
+
+ return ['ctx' => $ctx, 'body' => $body];
+ }
+
+ /**
+ * Send response to the application server.
+ *
+ * @param int $status Http status code
+ * @param string $body Body of response
+ * @param string[][] $headers An associative array of the message's headers. Each
+ * key MUST be a header name, and each value MUST be an array of strings
+ * for that header.
+ */
+ public function respond(int $status, string $body, array $headers = []): void
+ {
+ if (empty($headers)) {
+ // this is required to represent empty header set as map and not as array
+ $headers = new \stdClass();
+ }
+
+ $this->getWorker()->send(
+ $body,
+ (string) json_encode(['status' => $status, 'headers' => $headers])
+ );
+ }
+}
diff --git a/src/Http/PSR7Client.php b/src/Http/PSR7Client.php
new file mode 100644
index 00000000..777dd891
--- /dev/null
+++ b/src/Http/PSR7Client.php
@@ -0,0 +1,217 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go
+ *
+ * @author Wolfy-J
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestFactoryInterface;
+use Psr\Http\Message\ServerRequestInterface;
+use Psr\Http\Message\StreamFactoryInterface;
+use Psr\Http\Message\UploadedFileFactoryInterface;
+use Psr\Http\Message\UploadedFileInterface;
+
+/**
+ * Manages PSR-7 request and response.
+ */
+class PSR7Client
+{
+ /** @var HttpClient */
+ private $httpClient;
+
+ /** @var ServerRequestFactoryInterface */
+ private $requestFactory;
+
+ /** @var StreamFactoryInterface */
+ private $streamFactory;
+
+ /** @var UploadedFileFactoryInterface */
+ private $uploadsFactory;
+
+ /** @var mixed[] */
+ private $originalServer = [];
+
+ /** @var string[] Valid values for HTTP protocol version */
+ private static $allowedVersions = ['1.0', '1.1', '2',];
+
+ /**
+ * @param Worker $worker
+ * @param ServerRequestFactoryInterface|null $requestFactory
+ * @param StreamFactoryInterface|null $streamFactory
+ * @param UploadedFileFactoryInterface|null $uploadsFactory
+ */
+ public function __construct(
+ Worker $worker,
+ ServerRequestFactoryInterface $requestFactory = null,
+ StreamFactoryInterface $streamFactory = null,
+ UploadedFileFactoryInterface $uploadsFactory = null
+ ) {
+ $this->httpClient = new HttpClient($worker);
+ $this->requestFactory = $requestFactory ?? new Diactoros\ServerRequestFactory();
+ $this->streamFactory = $streamFactory ?? new Diactoros\StreamFactory();
+ $this->uploadsFactory = $uploadsFactory ?? new Diactoros\UploadedFileFactory();
+ $this->originalServer = $_SERVER;
+ }
+
+ /**
+ * @return Worker
+ */
+ public function getWorker(): Worker
+ {
+ return $this->httpClient->getWorker();
+ }
+
+ /**
+ * @return ServerRequestInterface|null
+ */
+ public function acceptRequest(): ?ServerRequestInterface
+ {
+ $rawRequest = $this->httpClient->acceptRequest();
+ if ($rawRequest === null) {
+ return null;
+ }
+
+ $_SERVER = $this->configureServer($rawRequest['ctx']);
+
+ $request = $this->requestFactory->createServerRequest(
+ $rawRequest['ctx']['method'],
+ $rawRequest['ctx']['uri'],
+ $_SERVER
+ );
+
+ parse_str($rawRequest['ctx']['rawQuery'], $query);
+
+ $request = $request
+ ->withProtocolVersion(static::fetchProtocolVersion($rawRequest['ctx']['protocol']))
+ ->withCookieParams($rawRequest['ctx']['cookies'])
+ ->withQueryParams($query)
+ ->withUploadedFiles($this->wrapUploads($rawRequest['ctx']['uploads']));
+
+ foreach ($rawRequest['ctx']['attributes'] as $name => $value) {
+ $request = $request->withAttribute($name, $value);
+ }
+
+ foreach ($rawRequest['ctx']['headers'] as $name => $value) {
+ $request = $request->withHeader($name, $value);
+ }
+
+ if ($rawRequest['ctx']['parsed']) {
+ return $request->withParsedBody(json_decode($rawRequest['body'], true));
+ }
+
+ if ($rawRequest['body'] !== null) {
+ return $request->withBody($this->streamFactory->createStream($rawRequest['body']));
+ }
+
+ return $request;
+ }
+
+ /**
+ * Send response to the application server.
+ *
+ * @param ResponseInterface $response
+ */
+ public function respond(ResponseInterface $response): void
+ {
+ $this->httpClient->respond(
+ $response->getStatusCode(),
+ $response->getBody()->__toString(),
+ $response->getHeaders()
+ );
+ }
+
+ /**
+ * Returns altered copy of _SERVER variable. Sets ip-address,
+ * request-time and other values.
+ *
+ * @param mixed[] $ctx
+ * @return mixed[]
+ */
+ protected function configureServer(array $ctx): array
+ {
+ $server = $this->originalServer;
+
+ $server['REQUEST_URI'] = $ctx['uri'];
+ $server['REQUEST_TIME'] = time();
+ $server['REQUEST_TIME_FLOAT'] = microtime(true);
+ $server['REMOTE_ADDR'] = $ctx['attributes']['ipAddress'] ?? $ctx['remoteAddr'] ?? '127.0.0.1';
+ $server['REQUEST_METHOD'] = $ctx['method'];
+
+ $server['HTTP_USER_AGENT'] = '';
+ foreach ($ctx['headers'] as $key => $value) {
+ $key = strtoupper(str_replace('-', '_', $key));
+ if (\in_array($key, ['CONTENT_TYPE', 'CONTENT_LENGTH'])) {
+ $server[$key] = implode(', ', $value);
+ } else {
+ $server['HTTP_' . $key] = implode(', ', $value);
+ }
+ }
+
+ return $server;
+ }
+
+ /**
+ * Wraps all uploaded files with UploadedFile.
+ *
+ * @param array[] $files
+ *
+ * @return UploadedFileInterface[]|mixed[]
+ */
+ private function wrapUploads($files): array
+ {
+ if (empty($files)) {
+ return [];
+ }
+
+ $result = [];
+ foreach ($files as $index => $f) {
+ if (!isset($f['name'])) {
+ $result[$index] = $this->wrapUploads($f);
+ continue;
+ }
+
+ if (UPLOAD_ERR_OK === $f['error']) {
+ $stream = $this->streamFactory->createStreamFromFile($f['tmpName']);
+ } else {
+ $stream = $this->streamFactory->createStream();
+ }
+
+ $result[$index] = $this->uploadsFactory->createUploadedFile(
+ $stream,
+ $f['size'],
+ $f['error'],
+ $f['name'],
+ $f['mime']
+ );
+ }
+
+ return $result;
+ }
+
+ /**
+ * Normalize HTTP protocol version to valid values
+ *
+ * @param string $version
+ * @return string
+ */
+ private static function fetchProtocolVersion(string $version): string
+ {
+ $v = substr($version, 5);
+
+ if ($v === '2.0') {
+ return '2';
+ }
+
+ // Fallback for values outside of valid protocol versions
+ if (!in_array($v, static::$allowedVersions, true)) {
+ return '1.1';
+ }
+
+ return $v;
+ }
+}
diff --git a/src/Logger/.empty b/src/Logger/.empty
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/Logger/.empty
diff --git a/src/Metrics/Metrics.php b/src/Metrics/Metrics.php
new file mode 100644
index 00000000..d6b6e1da
--- /dev/null
+++ b/src/Metrics/Metrics.php
@@ -0,0 +1,80 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+use Spiral\Goridge\Exceptions\RPCException;
+use Spiral\Goridge\RPC;
+use Spiral\RoadRunner\Exception\MetricException;
+
+/**
+ * Application metrics.
+ */
+final class Metrics implements MetricsInterface
+{
+ /** @var RPC */
+ private $rpc;
+
+ /**
+ * @param RPC $rpc
+ */
+ public function __construct(RPC $rpc)
+ {
+ $this->rpc = $rpc;
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function add(string $name, float $value, array $labels = []): void
+ {
+ try {
+ $this->rpc->call('metrics.Add', compact('name', 'value', 'labels'));
+ } catch (RPCException $e) {
+ throw new MetricException($e->getMessage(), $e->getCode(), $e);
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function sub(string $name, float $value, array $labels = []): void
+ {
+ try {
+ $this->rpc->call('metrics.Sub', compact('name', 'value', 'labels'));
+ } catch (RPCException $e) {
+ throw new MetricException($e->getMessage(), $e->getCode(), $e);
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function observe(string $name, float $value, array $labels = []): void
+ {
+ try {
+ $this->rpc->call('metrics.Observe', compact('name', 'value', 'labels'));
+ } catch (RPCException $e) {
+ throw new MetricException($e->getMessage(), $e->getCode(), $e);
+ }
+ }
+
+ /**
+ * @inheritDoc
+ */
+ public function set(string $name, float $value, array $labels = []): void
+ {
+ try {
+ $this->rpc->call('metrics.Set', compact('name', 'value', 'labels'));
+ } catch (RPCException $e) {
+ throw new MetricException($e->getMessage(), $e->getCode(), $e);
+ }
+ }
+}
diff --git a/src/Metrics/MetricsInterface.php b/src/Metrics/MetricsInterface.php
new file mode 100644
index 00000000..ec2009b0
--- /dev/null
+++ b/src/Metrics/MetricsInterface.php
@@ -0,0 +1,64 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+use Spiral\RoadRunner\Exception\MetricException;
+
+interface MetricsInterface
+{
+ /**
+ * Add collector value. Fallback to appropriate method of related collector.
+ *
+ * @param string $collector
+ * @param float $value
+ * @param mixed[] $labels
+ *
+ * @throws MetricException
+ * @return void
+ */
+ public function add(string $collector, float $value, array $labels = []);
+
+ /**
+ * Subtract the collector value, only for gauge collector.
+ *
+ * @param string $collector
+ * @param float $value
+ * @param mixed[] $labels
+ *
+ * @throws MetricException
+ * @return void
+ */
+ public function sub(string $collector, float $value, array $labels = []);
+
+ /**
+ * Observe collector value, only for histogram and summary collectors.
+ *
+ * @param string $collector
+ * @param float $value
+ * @param mixed[] $labels
+ *
+ * @throws MetricException
+ * @return void
+ */
+ public function observe(string $collector, float $value, array $labels = []);
+
+ /**
+ * Set collector value, only for gauge collector.
+ *
+ * @param string $collector
+ * @param float $value
+ * @param mixed[] $labels
+ *
+ * @throws MetricException
+ * @return void
+ */
+ public function set(string $collector, float $value, array $labels = []);
+}
diff --git a/src/WorkerInterface.php b/src/WorkerInterface.php
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/src/WorkerInterface.php