diff options
author | Wolfy-J <[email protected]> | 2020-12-14 13:27:35 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2020-12-14 13:27:35 +0300 |
commit | 00b42663891713f142a6cc67bcccdc31353daeb2 (patch) | |
tree | 1c3966f18c15f3815e47cb065917b314be23472c | |
parent | 8203dc4d76624f4fceddff49b8a1aba9d525fc73 (diff) |
- removed old RoadRunner code
- added new RR source code
35 files changed, 665 insertions, 700 deletions
@@ -24,7 +24,6 @@ uninstall: ## Uninstall locally installed RR rm -f /usr/local/bin/rr test: ## Run application tests - test -d ./vendor_php || composer update --prefer-dist --ansi go test -v -race -cover go test -v -race -cover ./util go test -v -race -cover ./service diff --git a/composer.json b/composer.json deleted file mode 100755 index e3017b97..00000000 --- a/composer.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "name": "spiral/roadrunner", - "type": "server", - "description": "High-performance PHP application server, load-balancer and process manager written in Golang", - "license": "MIT", - "authors": [ - { - "name": "Anton Titov / Wolfy-J", - "email": "[email protected]" - }, - { - "name": "RoadRunner Community", - "homepage": "https://github.com/spiral/roadrunner/graphs/contributors" - } - ], - "require": { - "php": "^7.4 || ^8.0", - "ext-json": "*", - "ext-curl": "*", - "spiral/goridge": "^2.4.2", - "psr/http-factory": "^1.0.1", - "psr/http-message": "^1.0.1", - "symfony/console": "^2.5.0 || ^3.0.0 || ^4.0.0 || ^5.0.0", - "laminas/laminas-diactoros": "^1.3.6 || ^2.0", - "composer/package-versions-deprecated": "^1.8" - }, - "config": { - "vendor-dir": "vendor_php" - }, - "require-dev": { - "phpstan/phpstan": "~0.12.34" - }, - "scripts": { - "analyze": "phpstan analyze -c ./phpstan.neon.dist --no-progress --ansi" - }, - "autoload": { - "psr-4": { - "Spiral\\RoadRunner\\": "src/" - } - }, - "bin": [ - "bin/rr" - ] -}
\ No newline at end of file diff --git a/src/Diactoros/ServerRequestFactory.php b/src/Diactoros/ServerRequestFactory.php deleted file mode 100644 index 6a42f207..00000000 --- a/src/Diactoros/ServerRequestFactory.php +++ /dev/null @@ -1,28 +0,0 @@ -<?php - -/** - * High-performance PHP process supervisor and load balancer written in Go - * - * @author Wolfy-J - */ -declare(strict_types=1); - -namespace Spiral\RoadRunner\Diactoros; - -use Psr\Http\Message\ServerRequestFactoryInterface; -use Psr\Http\Message\ServerRequestInterface; -use Laminas\Diactoros\ServerRequest; - -final class ServerRequestFactory implements ServerRequestFactoryInterface -{ - /** - * @inheritdoc - * - * @param array<mixed> $serverParams Array of SAPI parameters with which to seed the generated request instance. - */ - public function createServerRequest(string $method, $uri, array $serverParams = []): ServerRequestInterface - { - $uploadedFiles = []; - return new ServerRequest($serverParams, $uploadedFiles, $uri, $method); - } -} diff --git a/src/Diactoros/StreamFactory.php b/src/Diactoros/StreamFactory.php deleted file mode 100644 index 68a77e92..00000000 --- a/src/Diactoros/StreamFactory.php +++ /dev/null @@ -1,57 +0,0 @@ -<?php - -/** - * High-performance PHP process supervisor and load balancer written in Go - * - * @author Wolfy-J - */ -declare(strict_types=1); - -namespace Spiral\RoadRunner\Diactoros; - -use RuntimeException; -use Psr\Http\Message\StreamFactoryInterface; -use Psr\Http\Message\StreamInterface; -use Laminas\Diactoros\Stream; - -final class StreamFactory implements StreamFactoryInterface -{ - /** - * @inheritdoc - * @throws RuntimeException - */ - public function createStream(string $content = ''): StreamInterface - { - $resource = fopen('php://temp', 'rb+'); - - if (! \is_resource($resource)) { - throw new RuntimeException('Cannot create stream'); - } - - fwrite($resource, $content); - rewind($resource); - return $this->createStreamFromResource($resource); - } - - /** - * @inheritdoc - */ - public function createStreamFromFile(string $file, string $mode = 'rb'): StreamInterface - { - $resource = fopen($file, $mode); - - if (! \is_resource($resource)) { - throw new RuntimeException('Cannot create stream'); - } - - return $this->createStreamFromResource($resource); - } - - /** - * @inheritdoc - */ - public function createStreamFromResource($resource): StreamInterface - { - return new Stream($resource); - } -} diff --git a/src/Diactoros/UploadedFileFactory.php b/src/Diactoros/UploadedFileFactory.php deleted file mode 100644 index daa475c1..00000000 --- a/src/Diactoros/UploadedFileFactory.php +++ /dev/null @@ -1,36 +0,0 @@ -<?php - -/** - * High-performance PHP process supervisor and load balancer written in Go - * - * @author Wolfy-J - */ -declare(strict_types=1); - -namespace Spiral\RoadRunner\Diactoros; - -use Psr\Http\Message\StreamInterface; -use Psr\Http\Message\UploadedFileFactoryInterface; -use Psr\Http\Message\UploadedFileInterface; -use Laminas\Diactoros\UploadedFile; - -final class UploadedFileFactory implements UploadedFileFactoryInterface -{ - /** - * @inheritdoc - */ - public function createUploadedFile( - StreamInterface $stream, - int $size = null, - int $error = \UPLOAD_ERR_OK, - string $clientFilename = null, - string $clientMediaType = null - ): UploadedFileInterface { - if ($size === null) { - $size = (int) $stream->getSize(); - } - - /** @var resource $stream */ - return new UploadedFile($stream, $size, $error, $clientFilename, $clientMediaType); - } -} diff --git a/src/Exception/MetricException.php b/src/Exception/MetricException.php deleted file mode 100644 index d5b738b8..00000000 --- a/src/Exception/MetricException.php +++ /dev/null @@ -1,17 +0,0 @@ -<?php - -/** - * Spiral Framework. - * - * @license MIT - * @author Anton Titov (Wolfy-J) - */ -declare(strict_types=1); - -namespace Spiral\RoadRunner\Exception; - -use Spiral\Goridge\Exceptions\RPCException; - -class MetricException extends RPCException -{ -} diff --git a/src/Exceptions/RoadRunnerException.php b/src/Exceptions/RoadRunnerException.php deleted file mode 100644 index 43967893..00000000 --- a/src/Exceptions/RoadRunnerException.php +++ /dev/null @@ -1,18 +0,0 @@ -<?php - -/** - * Spiral Framework. - * - * @license MIT - * @author Anton Titov (Wolfy-J) - */ -declare(strict_types=1); - -namespace Spiral\RoadRunner\Exceptions; - -/** - * @deprecated use \Spiral\RoadRunner\Exception\RoadRunnerException instead - */ -class RoadRunnerException extends \RuntimeException -{ -} diff --git a/src/HttpClient.php b/src/HttpClient.php deleted file mode 100644 index 9b9048ca..00000000 --- a/src/HttpClient.php +++ /dev/null @@ -1,74 +0,0 @@ -<?php - -/** - * High-performance PHP process supervisor and load balancer written in Go - * - * @author Alex Bond - */ -declare(strict_types=1); - -namespace Spiral\RoadRunner; - -final class HttpClient -{ - /** @var Worker */ - private $worker; - - /** - * @param Worker $worker - */ - public function __construct(Worker $worker) - { - $this->worker = $worker; - } - - /** - * @return Worker - */ - public function getWorker(): Worker - { - return $this->worker; - } - - /** - * @return mixed[]|null Request information as ['ctx'=>[], 'body'=>string] - * or null if termination request or invalid context. - */ - public function acceptRequest(): ?array - { - $body = $this->getWorker()->receive($ctx); - if (empty($body) && empty($ctx)) { - // termination request - return null; - } - - $ctx = json_decode($ctx, true); - if ($ctx === null) { - // invalid context - return null; - } - - return ['ctx' => $ctx, 'body' => $body]; - } - - /** - * Send response to the application server. - * - * @param int $status Http status code - * @param string $body Body of response - * @param string[][] $headers An associative array of the message's headers. Each - * key MUST be a header name, and each value MUST be an array of strings - * for that header. - */ - public function respond(int $status, string $body, array $headers = []): void - { - $sendHeaders = empty($headers) - ? new \stdClass() // this is required to represent empty header set as map and not as array - : $headers; - - $this->getWorker()->send( - $body, - (string) json_encode(['status' => $status, 'headers' => $sendHeaders]) - ); - } -} diff --git a/src/Metrics.php b/src/Metrics.php deleted file mode 100644 index d6b6e1da..00000000 --- a/src/Metrics.php +++ /dev/null @@ -1,80 +0,0 @@ -<?php - -/** - * Spiral Framework. - * - * @license MIT - * @author Anton Titov (Wolfy-J) - */ -declare(strict_types=1); - -namespace Spiral\RoadRunner; - -use Spiral\Goridge\Exceptions\RPCException; -use Spiral\Goridge\RPC; -use Spiral\RoadRunner\Exception\MetricException; - -/** - * Application metrics. - */ -final class Metrics implements MetricsInterface -{ - /** @var RPC */ - private $rpc; - - /** - * @param RPC $rpc - */ - public function __construct(RPC $rpc) - { - $this->rpc = $rpc; - } - - /** - * @inheritDoc - */ - public function add(string $name, float $value, array $labels = []): void - { - try { - $this->rpc->call('metrics.Add', compact('name', 'value', 'labels')); - } catch (RPCException $e) { - throw new MetricException($e->getMessage(), $e->getCode(), $e); - } - } - - /** - * @inheritDoc - */ - public function sub(string $name, float $value, array $labels = []): void - { - try { - $this->rpc->call('metrics.Sub', compact('name', 'value', 'labels')); - } catch (RPCException $e) { - throw new MetricException($e->getMessage(), $e->getCode(), $e); - } - } - - /** - * @inheritDoc - */ - public function observe(string $name, float $value, array $labels = []): void - { - try { - $this->rpc->call('metrics.Observe', compact('name', 'value', 'labels')); - } catch (RPCException $e) { - throw new MetricException($e->getMessage(), $e->getCode(), $e); - } - } - - /** - * @inheritDoc - */ - public function set(string $name, float $value, array $labels = []): void - { - try { - $this->rpc->call('metrics.Set', compact('name', 'value', 'labels')); - } catch (RPCException $e) { - throw new MetricException($e->getMessage(), $e->getCode(), $e); - } - } -} diff --git a/src/MetricsInterface.php b/src/MetricsInterface.php deleted file mode 100644 index ec2009b0..00000000 --- a/src/MetricsInterface.php +++ /dev/null @@ -1,64 +0,0 @@ -<?php - -/** - * Spiral Framework. - * - * @license MIT - * @author Anton Titov (Wolfy-J) - */ -declare(strict_types=1); - -namespace Spiral\RoadRunner; - -use Spiral\RoadRunner\Exception\MetricException; - -interface MetricsInterface -{ - /** - * Add collector value. Fallback to appropriate method of related collector. - * - * @param string $collector - * @param float $value - * @param mixed[] $labels - * - * @throws MetricException - * @return void - */ - public function add(string $collector, float $value, array $labels = []); - - /** - * Subtract the collector value, only for gauge collector. - * - * @param string $collector - * @param float $value - * @param mixed[] $labels - * - * @throws MetricException - * @return void - */ - public function sub(string $collector, float $value, array $labels = []); - - /** - * Observe collector value, only for histogram and summary collectors. - * - * @param string $collector - * @param float $value - * @param mixed[] $labels - * - * @throws MetricException - * @return void - */ - public function observe(string $collector, float $value, array $labels = []); - - /** - * Set collector value, only for gauge collector. - * - * @param string $collector - * @param float $value - * @param mixed[] $labels - * - * @throws MetricException - * @return void - */ - public function set(string $collector, float $value, array $labels = []); -} diff --git a/src/Worker.php b/src/Worker.php deleted file mode 100644 index d509562e..00000000 --- a/src/Worker.php +++ /dev/null @@ -1,178 +0,0 @@ -<?php - -/** - * High-performance PHP process supervisor and load balancer written in Go - * - * @author Wolfy-J - */ -declare(strict_types=1); - -namespace Spiral\RoadRunner; - -use Spiral\Goridge\Exceptions\GoridgeException; -use Spiral\Goridge\RelayInterface as Relay; -use Spiral\Goridge\SendPackageRelayInterface; -use Spiral\RoadRunner\Exception\RoadRunnerException; - -/** - * Accepts connection from RoadRunner server over given Goridge relay. - * - * Example: - * - * $worker = new Worker(new Goridge\StreamRelay(STDIN, STDOUT)); - * while ($task = $worker->receive($context)) { - * $worker->send("DONE", json_encode($context)); - * } - */ -class Worker -{ - // Send as response context to request worker termination - public const STOP = '{"stop":true}'; - - /** @var Relay */ - private $relay; - - /** - * @param Relay $relay - */ - public function __construct(Relay $relay) - { - $this->relay = $relay; - } - - /** - * Receive packet of information to process, returns null when process must be stopped. Might - * return Error to wrap error message from server. - * - * @param mixed $header - * @return \Error|null|string - * - * @throws GoridgeException - */ - public function receive(&$header) - { - $body = $this->relay->receiveSync($flags); - - if ($flags & Relay::PAYLOAD_CONTROL) { - if ($this->handleControl($body, $header, $flags)) { - // wait for the next command - return $this->receive($header); - } - - // no context for the termination. - $header = null; - - // Expect process termination - return null; - } - - if ($flags & Relay::PAYLOAD_ERROR) { - return new \Error((string)$body); - } - - return $body; - } - - /** - * Respond to the server with result of task execution and execution context. - * - * Example: - * $worker->respond((string)$response->getBody(), json_encode($response->getHeaders())); - * - * @param string|null $payload - * @param string|null $header - */ - public function send(string $payload = null, string $header = null): void - { - if (!$this->relay instanceof SendPackageRelayInterface) { - if ($header === null) { - $this->relay->send('', Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_NONE); - } else { - $this->relay->send($header, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW); - } - - $this->relay->send((string)$payload, Relay::PAYLOAD_RAW); - } else { - $this->relay->sendPackage( - (string)$header, - Relay::PAYLOAD_CONTROL | ($header === null ? Relay::PAYLOAD_NONE : Relay::PAYLOAD_RAW), - (string)$payload, - Relay::PAYLOAD_RAW - ); - } - } - - /** - * Respond to the server with an error. Error must be treated as TaskError and might not cause - * worker destruction. - * - * Example: - * - * $worker->error("invalid payload"); - * - * @param string $message - */ - public function error(string $message): void - { - $this->relay->send( - $message, - Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW | Relay::PAYLOAD_ERROR - ); - } - - /** - * Terminate the process. Server must automatically pass task to the next available process. - * Worker will receive StopCommand context after calling this method. - * - * Attention, you MUST use continue; after invoking this method to let rr to properly - * stop worker. - * - * @throws GoridgeException - */ - public function stop(): void - { - $this->send(null, self::STOP); - } - - /** - * Handles incoming control command payload and executes it if required. - * - * @param string $body - * @param mixed $header Exported context (if any). - * @param int $flags - * @return bool True when continue processing. - * - * @throws RoadRunnerException - */ - private function handleControl(string $body = null, &$header = null, int $flags = 0): bool - { - $header = $body; - if ($body === null || $flags & Relay::PAYLOAD_RAW) { - // empty or raw prefix - return true; - } - - $p = json_decode($body, true); - if ($p === false) { - throw new RoadRunnerException('invalid task context, JSON payload is expected'); - } - - // PID negotiation (socket connections only) - if (!empty($p['pid'])) { - $this->relay->send( - sprintf('{"pid":%s}', getmypid()), - Relay::PAYLOAD_CONTROL - ); - } - - // termination request - if (!empty($p['stop'])) { - return false; - } - - // parsed header - $header = $p; - - return true; - } -} diff --git a/tests/broken.php b/tests/broken.php index 42b4e7c2..1f869b2d 100644 --- a/tests/broken.php +++ b/tests/broken.php @@ -8,7 +8,7 @@ use Spiral\RoadRunner; $rr = new RoadRunner\Worker($relay); -while ($in = $rr->receive($ctx)) { +while ($in = $rr->waitPayload()) { echo undefined_function(); - $rr->send((string)$in); + $rr->send((string)$in->body, null); } diff --git a/tests/client.php b/tests/client.php index 835b1c6c..c00cece1 100644 --- a/tests/client.php +++ b/tests/client.php @@ -3,7 +3,7 @@ use Spiral\Goridge; ini_set('display_errors', 'stderr'); -require dirname(__DIR__) . "/vendor_php/autoload.php"; +require __DIR__ . "/vendor/autoload.php"; if (count($argv) < 3) { die("need 2 arguments"); diff --git a/tests/composer.json b/tests/composer.json new file mode 100644 index 00000000..24702c37 --- /dev/null +++ b/tests/composer.json @@ -0,0 +1,12 @@ +{ + "minimum-stability": "beta", + "require": { + "nyholm/psr7": "^1.3", + "spiral/goridge": "^3.0@beta" + }, + "autoload": { + "psr-4": { + "Spiral\\RoadRunner\\": "src/" + } + } +} diff --git a/tests/delay.php b/tests/delay.php index bf9ecc12..f0435b05 100644 --- a/tests/delay.php +++ b/tests/delay.php @@ -8,9 +8,9 @@ use Spiral\RoadRunner; $rr = new RoadRunner\Worker($relay); -while ($in = $rr->receive($ctx)) { +while ($in = $rr->waitPayload()) { try { - usleep($in * 1000); + usleep($in->body * 1000); $rr->send(''); } catch (\Throwable $e) { $rr->error((string)$e); diff --git a/tests/echo.php b/tests/echo.php index 1570e3df..83eec92e 100644 --- a/tests/echo.php +++ b/tests/echo.php @@ -8,9 +8,9 @@ use Spiral\RoadRunner; $rr = new RoadRunner\Worker($relay); -while ($in = $rr->receive($ctx)) { +while ($in = $rr->waitPayload()) { try { - $rr->send((string)$in); + $rr->send((string)$in->body); } catch (\Throwable $e) { $rr->error((string)$e); } diff --git a/tests/error.php b/tests/error.php index 8e1c8d0d..c77e6817 100644 --- a/tests/error.php +++ b/tests/error.php @@ -8,6 +8,6 @@ use Spiral\RoadRunner; $rr = new RoadRunner\Worker($relay); -while ($in = $rr->receive($ctx)) { - $rr->error((string)$in); +while ($in = $rr->waitPayload()) { + $rr->error((string)$in->body); } diff --git a/tests/head.php b/tests/head.php index 88ebd3f2..3c57258f 100644 --- a/tests/head.php +++ b/tests/head.php @@ -8,9 +8,9 @@ use Spiral\RoadRunner; $rr = new RoadRunner\Worker($relay); -while ($in = $rr->receive($ctx)) { +while ($in = $rr->waitPayload()) { try { - $rr->send("", (string)$ctx); + $rr->send("", (string)$in->header); } catch (\Throwable $e) { $rr->error((string)$e); } diff --git a/tests/memleak.php b/tests/memleak.php index b78a76c0..9a5376f0 100644 --- a/tests/memleak.php +++ b/tests/memleak.php @@ -5,11 +5,11 @@ declare(strict_types=1); use Spiral\Goridge\StreamRelay; use Spiral\RoadRunner\Worker as RoadRunner; -require dirname(__DIR__) . "/vendor_php/autoload.php"; +require __DIR__ . "/vendor/autoload.php"; $rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT)); $mem = ''; -while($rr->receive($ctx)){ +while($rr->waitPayload()){ $mem .= str_repeat(" ", 1024*1024); $rr->send(""); -}
\ No newline at end of file +} diff --git a/tests/pid.php b/tests/pid.php index bf10a025..f8b2515d 100644 --- a/tests/pid.php +++ b/tests/pid.php @@ -8,7 +8,7 @@ $rr = new RoadRunner\Worker($relay); - while ($in = $rr->receive($ctx)) { + while ($in = $rr->waitPayload()) { try { $rr->send((string)getmypid()); } catch (\Throwable $e) { diff --git a/tests/sleep.php b/tests/sleep.php index b3ea8235..e34a6834 100644 --- a/tests/sleep.php +++ b/tests/sleep.php @@ -5,11 +5,11 @@ declare(strict_types=1); use Spiral\Goridge\StreamRelay; use Spiral\RoadRunner\Worker as RoadRunner; -require dirname(__DIR__) . "/vendor_php/autoload.php"; +require __DIR__ . "/vendor/autoload.php"; $rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT)); -while($rr->receive($ctx)){ +while($rr->waitPayload()){ sleep(3); $rr->send(""); -}
\ No newline at end of file +} diff --git a/tests/slow-client.php b/tests/slow-client.php index ece0a439..7737f0b1 100644 --- a/tests/slow-client.php +++ b/tests/slow-client.php @@ -3,7 +3,7 @@ use Spiral\Goridge; ini_set('display_errors', 'stderr'); -require dirname(__DIR__) . "/vendor_php/autoload.php"; +require __DIR__ . "/vendor/autoload.php"; if (count($argv) < 3) { die("need 2 arguments"); diff --git a/tests/slow-destroy.php b/tests/slow-destroy.php index e2a01af2..900bb68a 100644 --- a/tests/slow-destroy.php +++ b/tests/slow-destroy.php @@ -3,7 +3,7 @@ use Spiral\Goridge; ini_set('display_errors', 'stderr'); -require dirname(__DIR__) . "/vendor_php/autoload.php"; +require __DIR__ . "/vendor/autoload.php"; if (count($argv) < 3) { die("need 2 arguments"); diff --git a/tests/slow-pid.php b/tests/slow-pid.php index 747e7e86..3660cb40 100644 --- a/tests/slow-pid.php +++ b/tests/slow-pid.php @@ -8,7 +8,7 @@ $rr = new RoadRunner\Worker($relay); - while ($in = $rr->receive($ctx)) { + while ($in = $rr->waitPayload()) { try { sleep(1); $rr->send((string)getmypid()); diff --git a/tests/src/Environment.php b/tests/src/Environment.php new file mode 100644 index 00000000..9b306063 --- /dev/null +++ b/tests/src/Environment.php @@ -0,0 +1,82 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go. + * + * @author Wolfy-J + */ + +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +use Spiral\RoadRunner\Exception\EnvironmentException; + +class Environment implements EnvironmentInterface +{ + /** @var array */ + private array $env; + + /** + * @param array $env + */ + public function __construct(array $env) + { + $this->env = $env; + } + + /** + * Returns worker mode assigned to the PHP process. + * + * @return string + * @throws EnvironmentException + */ + public function getMode(): string + { + return $this->getValue('RR_MODE'); + } + + /** + * Address worker should be connected to (or pipes). + * + * @return string + * @throws EnvironmentException + */ + public function getRelayAddress(): string + { + return $this->getValue('RR_RELAY'); + } + + /** + * RPC address. + * + * @return string + * @throws EnvironmentException + */ + public function getRPCAddress(): string + { + return $this->getValue('RR_RPC'); + } + + /** + * @param string $name + * @return string + * @throws EnvironmentException + */ + private function getValue(string $name): string + { + if (!isset($this->env[$name])) { + throw new EnvironmentException(sprintf("Missing environment value `%s`", $name)); + } + + return (string) $this->env[$name]; + } + + /** + * @return EnvironmentInterface + */ + public static function fromGlobals(): EnvironmentInterface + { + return new static(array_merge($_SERVER, $_ENV)); + } +} diff --git a/tests/src/EnvironmentInterface.php b/tests/src/EnvironmentInterface.php new file mode 100644 index 00000000..bc0ae043 --- /dev/null +++ b/tests/src/EnvironmentInterface.php @@ -0,0 +1,43 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go. + * + * @author Wolfy-J + */ + +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +use Spiral\RoadRunner\Exception\EnvironmentException; + +/** + * Provides base values to configure roadrunner worker. + */ +interface EnvironmentInterface +{ + /** + * Returns worker mode assigned to the PHP process. + * + * @return string + * @throws EnvironmentException + */ + public function getMode(): string; + + /** + * Address worker should be connected to (or pipes). + * + * @return string + * @throws EnvironmentException + */ + public function getRelayAddress(): string; + + /** + * RPC address. + * + * @return string + * @throws EnvironmentException + */ + public function getRPCAddress(): string; +} diff --git a/tests/src/Exception/EnvironmentException.php b/tests/src/Exception/EnvironmentException.php new file mode 100644 index 00000000..227507c5 --- /dev/null +++ b/tests/src/Exception/EnvironmentException.php @@ -0,0 +1,16 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go. + * + * @author Wolfy-J + */ + +declare(strict_types=1); + +namespace Spiral\RoadRunner\Exception; + +class EnvironmentException extends RoadRunnerException +{ + +} diff --git a/src/Exception/RoadRunnerException.php b/tests/src/Exception/RoadRunnerException.php index f83c3dd4..2329370c 100644 --- a/src/Exception/RoadRunnerException.php +++ b/tests/src/Exception/RoadRunnerException.php @@ -1,14 +1,15 @@ <?php /** - * High-performance PHP process supervisor and load balancer written in Go + * High-performance PHP process supervisor and load balancer written in Go. * - * @author Wolfy-J + * @author Wolfy-J */ + declare(strict_types=1); namespace Spiral\RoadRunner\Exception; -class RoadRunnerException extends \Spiral\RoadRunner\Exceptions\RoadRunnerException +class RoadRunnerException extends \RuntimeException { } diff --git a/tests/src/Http/HttpWorker.php b/tests/src/Http/HttpWorker.php new file mode 100644 index 00000000..13fd6c27 --- /dev/null +++ b/tests/src/Http/HttpWorker.php @@ -0,0 +1,103 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Alex Bond + */ +declare(strict_types=1); + +namespace Spiral\RoadRunner\Http; + +use Spiral\RoadRunner\WorkerInterface; + +class HttpWorker +{ + /** @var WorkerInterface */ + private WorkerInterface $worker; + + /** + * @param WorkerInterface $worker + */ + public function __construct(WorkerInterface $worker) + { + $this->worker = $worker; + } + + /** + * @return WorkerInterface + */ + public function getWorker(): WorkerInterface + { + return $this->worker; + } + + /** + * Wait for incoming http request. + * + * @return Request|null + */ + public function waitRequest(): ?Request + { + $payload = $this->getWorker()->waitPayload(); + if (empty($payload->body) && empty($payload->header)) { + // termination request + return null; + } + + $request = new Request(); + $request->body = $payload->body; + + $context = json_decode($payload->header, true); + if ($context === null) { + // invalid context + return null; + } + + $this->hydrateRequest($request, $context); + + return $request; + } + + /** + * Send response to the application server. + * + * @param int $status Http status code + * @param string $body Body of response + * @param string[][] $headers An associative array of the message's headers. Each + * key MUST be a header name, and each value MUST be an array of strings + * for that header. + */ + public function respond(int $status, string $body, array $headers = []): void + { + if ($headers === []) { + // this is required to represent empty header set as map and not as array + $headers = new \stdClass(); + } + + $this->getWorker()->send( + $body, + (string) json_encode(['status' => $status, 'headers' => $headers]) + ); + } + + /** + * @param Request $request + * @param array $context + */ + private function hydrateRequest(Request $request, array $context): void + { + $request->remoteAddr = $context['remoteAddr']; + $request->protocol = $context['protocol']; + $request->method = $context['method']; + $request->uri = $context['uri']; + $request->attributes = $context['attributes']; + $request->headers = $context['headers']; + $request->cookies = $context['cookies']; + $request->uploads = $context['uploads']; + parse_str($context['rawQuery'], $request->query); + + // indicates that body was parsed + $request->parsed = $context['parsed']; + } +} diff --git a/src/PSR7Client.php b/tests/src/Http/PSR7Worker.php index 777dd891..b985d288 100644 --- a/src/PSR7Client.php +++ b/tests/src/Http/PSR7Worker.php @@ -7,7 +7,7 @@ */ declare(strict_types=1); -namespace Spiral\RoadRunner; +namespace Spiral\RoadRunner\Http; use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\ServerRequestFactoryInterface; @@ -15,100 +15,64 @@ use Psr\Http\Message\ServerRequestInterface; use Psr\Http\Message\StreamFactoryInterface; use Psr\Http\Message\UploadedFileFactoryInterface; use Psr\Http\Message\UploadedFileInterface; +use Spiral\RoadRunner\WorkerInterface; /** * Manages PSR-7 request and response. */ -class PSR7Client +class PSR7Worker { - /** @var HttpClient */ - private $httpClient; - - /** @var ServerRequestFactoryInterface */ - private $requestFactory; - - /** @var StreamFactoryInterface */ - private $streamFactory; - - /** @var UploadedFileFactoryInterface */ - private $uploadsFactory; + private HttpWorker $httpWorker; + private ServerRequestFactoryInterface $requestFactory; + private StreamFactoryInterface $streamFactory; + private UploadedFileFactoryInterface $uploadsFactory; /** @var mixed[] */ - private $originalServer = []; + private array $originalServer = []; /** @var string[] Valid values for HTTP protocol version */ - private static $allowedVersions = ['1.0', '1.1', '2',]; + private static array $allowedVersions = ['1.0', '1.1', '2',]; /** - * @param Worker $worker - * @param ServerRequestFactoryInterface|null $requestFactory - * @param StreamFactoryInterface|null $streamFactory - * @param UploadedFileFactoryInterface|null $uploadsFactory + * @param WorkerInterface $worker + * @param ServerRequestFactoryInterface $requestFactory + * @param StreamFactoryInterface $streamFactory + * @param UploadedFileFactoryInterface $uploadsFactory */ public function __construct( - Worker $worker, - ServerRequestFactoryInterface $requestFactory = null, - StreamFactoryInterface $streamFactory = null, - UploadedFileFactoryInterface $uploadsFactory = null + WorkerInterface $worker, + ServerRequestFactoryInterface $requestFactory, + StreamFactoryInterface $streamFactory, + UploadedFileFactoryInterface $uploadsFactory ) { - $this->httpClient = new HttpClient($worker); - $this->requestFactory = $requestFactory ?? new Diactoros\ServerRequestFactory(); - $this->streamFactory = $streamFactory ?? new Diactoros\StreamFactory(); - $this->uploadsFactory = $uploadsFactory ?? new Diactoros\UploadedFileFactory(); + $this->httpWorker = new HttpWorker($worker); + $this->requestFactory = $requestFactory; + $this->streamFactory = $streamFactory; + $this->uploadsFactory = $uploadsFactory; $this->originalServer = $_SERVER; } /** - * @return Worker + * @return WorkerInterface */ - public function getWorker(): Worker + public function getWorker(): WorkerInterface { - return $this->httpClient->getWorker(); + return $this->httpWorker->getWorker(); } /** * @return ServerRequestInterface|null */ - public function acceptRequest(): ?ServerRequestInterface + public function waitRequest(): ?ServerRequestInterface { - $rawRequest = $this->httpClient->acceptRequest(); - if ($rawRequest === null) { + $httpRequest = $this->httpWorker->waitRequest(); + if ($httpRequest === null) { return null; } - $_SERVER = $this->configureServer($rawRequest['ctx']); - - $request = $this->requestFactory->createServerRequest( - $rawRequest['ctx']['method'], - $rawRequest['ctx']['uri'], - $_SERVER - ); - - parse_str($rawRequest['ctx']['rawQuery'], $query); - - $request = $request - ->withProtocolVersion(static::fetchProtocolVersion($rawRequest['ctx']['protocol'])) - ->withCookieParams($rawRequest['ctx']['cookies']) - ->withQueryParams($query) - ->withUploadedFiles($this->wrapUploads($rawRequest['ctx']['uploads'])); + $_SERVER = $this->configureServer($httpRequest['ctx']); - foreach ($rawRequest['ctx']['attributes'] as $name => $value) { - $request = $request->withAttribute($name, $value); - } - - foreach ($rawRequest['ctx']['headers'] as $name => $value) { - $request = $request->withHeader($name, $value); - } - - if ($rawRequest['ctx']['parsed']) { - return $request->withParsedBody(json_decode($rawRequest['body'], true)); - } - - if ($rawRequest['body'] !== null) { - return $request->withBody($this->streamFactory->createStream($rawRequest['body'])); - } - - return $request; + return $this->mapRequest($httpRequest, $_SERVER); } /** @@ -118,7 +82,7 @@ class PSR7Client */ public function respond(ResponseInterface $response): void { - $this->httpClient->respond( + $this->httpWorker->respond( $response->getStatusCode(), $response->getBody()->__toString(), $response->getHeaders() @@ -129,21 +93,21 @@ class PSR7Client * Returns altered copy of _SERVER variable. Sets ip-address, * request-time and other values. * - * @param mixed[] $ctx + * @param Request $request * @return mixed[] */ - protected function configureServer(array $ctx): array + protected function configureServer(Request $request): array { $server = $this->originalServer; - $server['REQUEST_URI'] = $ctx['uri']; + $server['REQUEST_URI'] = $request->uri; $server['REQUEST_TIME'] = time(); $server['REQUEST_TIME_FLOAT'] = microtime(true); - $server['REMOTE_ADDR'] = $ctx['attributes']['ipAddress'] ?? $ctx['remoteAddr'] ?? '127.0.0.1'; - $server['REQUEST_METHOD'] = $ctx['method']; + $server['REMOTE_ADDR'] = $request->getRemoteAddr(); + $server['REQUEST_METHOD'] = $request->method; $server['HTTP_USER_AGENT'] = ''; - foreach ($ctx['headers'] as $key => $value) { + foreach ($request->headers as $key => $value) { $key = strtoupper(str_replace('-', '_', $key)); if (\in_array($key, ['CONTENT_TYPE', 'CONTENT_LENGTH'])) { $server[$key] = implode(', ', $value); @@ -156,18 +120,51 @@ class PSR7Client } /** + * @param Request $httpRequest + * @param array $server + * @return ServerRequestInterface + */ + protected function mapRequest(Request $httpRequest, array $server): ServerRequestInterface + { + $request = $this->requestFactory->createServerRequest( + $httpRequest->method, + $httpRequest->uri, + $_SERVER + ); + + $request = $request + ->withProtocolVersion(static::fetchProtocolVersion($httpRequest->protocol)) + ->withCookieParams($httpRequest->cookies) + ->withQueryParams($httpRequest->query) + ->withUploadedFiles($this->wrapUploads($httpRequest->uploads)); + + foreach ($httpRequest->attributes as $name => $value) { + $request = $request->withAttribute($name, $value); + } + + foreach ($httpRequest->headers as $name => $value) { + $request = $request->withHeader($name, $value); + } + + if ($httpRequest->parsed) { + return $request->withParsedBody($httpRequest->getParsedBody()); + } + + if ($httpRequest->body !== null) { + return $request->withBody($this->streamFactory->createStream($httpRequest->body)); + } + + return $request; + } + + /** * Wraps all uploaded files with UploadedFile. * * @param array[] $files - * * @return UploadedFileInterface[]|mixed[] */ - private function wrapUploads($files): array + protected function wrapUploads(array $files): array { - if (empty($files)) { - return []; - } - $result = []; foreach ($files as $index => $f) { if (!isset($f['name'])) { diff --git a/tests/src/Http/Request.php b/tests/src/Http/Request.php new file mode 100644 index 00000000..ef67e28d --- /dev/null +++ b/tests/src/Http/Request.php @@ -0,0 +1,48 @@ +<?php + +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +declare(strict_types=1); + +namespace Spiral\RoadRunner\Http; + +final class Request +{ + + public string $remoteAddr; + public string $protocol; + public string $method; + public string $uri; + public array $headers; + public array $cookies; + public array $uploads; + public array $attributes; + public array $query; + public ?string $body; + public bool $parsed; + + /** + * @return string + */ + public function getRemoteAddr(): string + { + return $this->attributes['ipAddress'] ?? $this->remoteAddr ?? '127.0.0.1'; + } + + /** + * @return array|null + */ + public function getParsedBody(): ?array + { + if ($this->parsed) { + return json_decode($this->body, true); + } + + return null; + } +} diff --git a/tests/src/Payload.php b/tests/src/Payload.php new file mode 100644 index 00000000..c9b8c198 --- /dev/null +++ b/tests/src/Payload.php @@ -0,0 +1,43 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go. + * + * @author Wolfy-J + */ + +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +/** + * Class Payload + * + * @package Spiral\RoadRunner + */ +final class Payload +{ + /** + * Execution payload (binary). + * + * @var string|null + */ + public ?string $body; + + /** + * Execution context (binary). + * + * @var string|null + */ + public ?string $header; + + /** + * @param string|null $body + * @param string|null $header + */ + public function __construct(?string $body, ?string $header = null) + { + $this->body = $body; + $this->header = $header; + } +} diff --git a/tests/src/Worker.php b/tests/src/Worker.php new file mode 100644 index 00000000..53cf6cef --- /dev/null +++ b/tests/src/Worker.php @@ -0,0 +1,162 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go. + * + * @author Wolfy-J + */ + +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +use Spiral\Goridge\Exception\GoridgeException; +use Spiral\Goridge\Frame; +use Spiral\Goridge\RelayInterface as Relay; +use Spiral\RoadRunner\Exception\EnvironmentException; +use Spiral\RoadRunner\Exception\RoadRunnerException; + +/** + * Accepts connection from RoadRunner server over given Goridge relay. + * + * $worker = Worker::create(); + * while ($p = $worker->waitPayload()) { + * $worker->send(new Payload("DONE", json_encode($context))); + * } + */ +class Worker implements WorkerInterface +{ + // Request graceful worker termination. + private const STOP_REQUEST = '{"stop":true}'; + + private Relay $relay; + + /** + * @param Relay $relay + */ + public function __construct(Relay $relay) + { + $this->relay = $relay; + } + + /** + * Wait for incoming payload from the server. Must return null when worker stopped. + * + * @return Payload|null + * @throws GoridgeException + * @throws RoadRunnerException + */ + public function waitPayload(): ?Payload + { + $frame = $this->relay->waitFrame(); + + if ($frame->hasFlag(Frame::CONTROL)) { + $continue = $this->handleControl($frame->payload); + + if ($continue) { + return $this->waitPayload(); + } else { + return null; + } + } + + return new Payload( + substr($frame->payload, $frame->options[0]), + substr($frame->payload, 0, $frame->options[0]) + ); + } + + /** + * Respond to the server with the processing result. + * + * @param Payload $payload + * @throws GoridgeException + */ + public function respond(Payload $payload): void + { + $this->send($payload->body, $payload->header); + } + + /** + * Respond to the server with an error. Error must be treated as TaskError and might not cause + * worker destruction. + * + * Example: + * + * $worker->error("invalid payload"); + * + * @param string $message + */ + public function error(string $message): void + { + $this->relay->send(new Frame($message, [], Frame::ERROR)); + } + + /** + * Terminate the process. Server must automatically pass task to the next available process. + * Worker will receive StopCommand context after calling this method. + * + * Attention, you MUST use continue; after invoking this method to let rr to properly + * stop worker. + * + * @throws GoridgeException + */ + public function stop(): void + { + $this->send("", self::STOP_REQUEST); + } + + /** + * @param string $body + * @param string|null $context + * @throws GoridgeException + */ + public function send(string $body, string $context = null): void + { + $this->relay->send(new Frame( + (string) $context . $body, + [strlen((string) $context)] + )); + } + + /** + * Return true if continue. + * + * @param string $header + * @return bool + * + * @throws RoadRunnerException + */ + private function handleControl(string $header): bool + { + $command = json_decode($header, true); + if ($command === false) { + throw new RoadRunnerException('Invalid task header, JSON payload is expected'); + } + + switch (true) { + case !empty($command['pid']): + $this->relay->send(new Frame(sprintf('{"pid":%s}', getmypid()), [], Frame::CONTROL)); + return true; + + case !empty($command['stop']): + return false; + + default: + throw new RoadRunnerException('Invalid task header, undefined control package'); + } + } + + /** + * Create Worker using global environment configuration. + * + * @return WorkerInterface + * @throws EnvironmentException + */ + public static function create(): WorkerInterface + { + $env = Environment::fromGlobals(); + + return new static(\Spiral\Goridge\Relay::create($env->getRelayAddress())); + } +} diff --git a/tests/src/WorkerInterface.php b/tests/src/WorkerInterface.php new file mode 100644 index 00000000..bf0b6e06 --- /dev/null +++ b/tests/src/WorkerInterface.php @@ -0,0 +1,55 @@ +<?php + +/** + * High-performance PHP process supervisor and load balancer written in Go. + * + * @author Wolfy-J + */ + +declare(strict_types=1); + +namespace Spiral\RoadRunner; + +use Spiral\Goridge\Exception\GoridgeException; +use Spiral\RoadRunner\Exception\RoadRunnerException; + +interface WorkerInterface +{ + /** + * Wait for incoming payload from the server. Must return null when worker stopped. + * + * @return Payload|null + * @throws GoridgeException + * @throws RoadRunnerException + */ + public function waitPayload(): ?Payload; + + /** + * Respond to the server with the processing result. + * + * @param Payload $payload + * @throws GoridgeException + */ + public function respond(Payload $payload): void; + + /** + * Respond to the server with an error. Error must be treated as TaskError and might not cause + * worker destruction. + * + * Example: + * + * $worker->error("invalid payload"); + * + * @param string $error + * @throws GoridgeException + */ + public function error(string $error): void; + + /** + * Terminate the process. Server must automatically pass task to the next available process. + * Worker will receive stop command after calling this method. + * + * Attention, you MUST use continue; after invoking this method to let rr to properly stop worker. + */ + public function stop(): void; +} diff --git a/tests/stop.php b/tests/stop.php index 0100ad0f..f83d3f29 100644 --- a/tests/stop.php +++ b/tests/stop.php @@ -9,7 +9,7 @@ use Spiral\RoadRunner; $rr = new RoadRunner\Worker($relay); $used = false; -while ($in = $rr->receive($ctx)) { +while ($in = $rr->waitPayload()) { try { if ($used) { // kill on second attempt |