From 00b42663891713f142a6cc67bcccdc31353daeb2 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Mon, 14 Dec 2020 13:27:35 +0300 Subject: - removed old RoadRunner code - added new RR source code --- tests/broken.php | 4 +- tests/client.php | 2 +- tests/composer.json | 12 ++ tests/delay.php | 4 +- tests/echo.php | 4 +- tests/error.php | 4 +- tests/head.php | 4 +- tests/memleak.php | 6 +- tests/pid.php | 2 +- tests/sleep.php | 6 +- tests/slow-client.php | 2 +- tests/slow-destroy.php | 2 +- tests/slow-pid.php | 2 +- tests/src/Environment.php | 82 ++++++++++ tests/src/EnvironmentInterface.php | 43 ++++++ tests/src/Exception/EnvironmentException.php | 16 ++ tests/src/Exception/RoadRunnerException.php | 15 ++ tests/src/Http/HttpWorker.php | 103 +++++++++++++ tests/src/Http/PSR7Worker.php | 214 +++++++++++++++++++++++++++ tests/src/Http/Request.php | 48 ++++++ tests/src/Payload.php | 43 ++++++ tests/src/Worker.php | 162 ++++++++++++++++++++ tests/src/WorkerInterface.php | 55 +++++++ tests/stop.php | 2 +- 24 files changed, 815 insertions(+), 22 deletions(-) create mode 100644 tests/composer.json create mode 100644 tests/src/Environment.php create mode 100644 tests/src/EnvironmentInterface.php create mode 100644 tests/src/Exception/EnvironmentException.php create mode 100644 tests/src/Exception/RoadRunnerException.php create mode 100644 tests/src/Http/HttpWorker.php create mode 100644 tests/src/Http/PSR7Worker.php create mode 100644 tests/src/Http/Request.php create mode 100644 tests/src/Payload.php create mode 100644 tests/src/Worker.php create mode 100644 tests/src/WorkerInterface.php (limited to 'tests') diff --git a/tests/broken.php b/tests/broken.php index 42b4e7c2..1f869b2d 100644 --- a/tests/broken.php +++ b/tests/broken.php @@ -8,7 +8,7 @@ use Spiral\RoadRunner; $rr = new RoadRunner\Worker($relay); -while ($in = $rr->receive($ctx)) { +while ($in = $rr->waitPayload()) { echo undefined_function(); - $rr->send((string)$in); + $rr->send((string)$in->body, null); } diff --git a/tests/client.php b/tests/client.php index 835b1c6c..c00cece1 100644 --- a/tests/client.php +++ b/tests/client.php @@ -3,7 +3,7 @@ use Spiral\Goridge; ini_set('display_errors', 'stderr'); -require dirname(__DIR__) . "/vendor_php/autoload.php"; +require __DIR__ . "/vendor/autoload.php"; if (count($argv) < 3) { die("need 2 arguments"); diff --git a/tests/composer.json b/tests/composer.json new file mode 100644 index 00000000..24702c37 --- /dev/null +++ b/tests/composer.json @@ -0,0 +1,12 @@ +{ + "minimum-stability": "beta", + "require": { + "nyholm/psr7": "^1.3", + "spiral/goridge": "^3.0@beta" + }, + "autoload": { + "psr-4": { + "Spiral\\RoadRunner\\": "src/" + } + } +} diff --git a/tests/delay.php b/tests/delay.php index bf9ecc12..f0435b05 100644 --- a/tests/delay.php +++ b/tests/delay.php @@ -8,9 +8,9 @@ use Spiral\RoadRunner; $rr = new RoadRunner\Worker($relay); -while ($in = $rr->receive($ctx)) { +while ($in = $rr->waitPayload()) { try { - usleep($in * 1000); + usleep($in->body * 1000); $rr->send(''); } catch (\Throwable $e) { $rr->error((string)$e); diff --git a/tests/echo.php b/tests/echo.php index 1570e3df..83eec92e 100644 --- a/tests/echo.php +++ b/tests/echo.php @@ -8,9 +8,9 @@ use Spiral\RoadRunner; $rr = new RoadRunner\Worker($relay); -while ($in = $rr->receive($ctx)) { +while ($in = $rr->waitPayload()) { try { - $rr->send((string)$in); + $rr->send((string)$in->body); } catch (\Throwable $e) { $rr->error((string)$e); } diff --git a/tests/error.php b/tests/error.php index 8e1c8d0d..c77e6817 100644 --- a/tests/error.php +++ b/tests/error.php @@ -8,6 +8,6 @@ use Spiral\RoadRunner; $rr = new RoadRunner\Worker($relay); -while ($in = $rr->receive($ctx)) { - $rr->error((string)$in); +while ($in = $rr->waitPayload()) { + $rr->error((string)$in->body); } diff --git a/tests/head.php b/tests/head.php index 88ebd3f2..3c57258f 100644 --- a/tests/head.php +++ b/tests/head.php @@ -8,9 +8,9 @@ use Spiral\RoadRunner; $rr = new RoadRunner\Worker($relay); -while ($in = $rr->receive($ctx)) { +while ($in = $rr->waitPayload()) { try { - $rr->send("", (string)$ctx); + $rr->send("", (string)$in->header); } catch (\Throwable $e) { $rr->error((string)$e); } diff --git a/tests/memleak.php b/tests/memleak.php index b78a76c0..9a5376f0 100644 --- a/tests/memleak.php +++ b/tests/memleak.php @@ -5,11 +5,11 @@ declare(strict_types=1); use Spiral\Goridge\StreamRelay; use Spiral\RoadRunner\Worker as RoadRunner; -require dirname(__DIR__) . "/vendor_php/autoload.php"; +require __DIR__ . "/vendor/autoload.php"; $rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT)); $mem = ''; -while($rr->receive($ctx)){ +while($rr->waitPayload()){ $mem .= str_repeat(" ", 1024*1024); $rr->send(""); -} \ No newline at end of file +} diff --git a/tests/pid.php b/tests/pid.php index bf10a025..f8b2515d 100644 --- a/tests/pid.php +++ b/tests/pid.php @@ -8,7 +8,7 @@ $rr = new RoadRunner\Worker($relay); - while ($in = $rr->receive($ctx)) { + while ($in = $rr->waitPayload()) { try { $rr->send((string)getmypid()); } catch (\Throwable $e) { diff --git a/tests/sleep.php b/tests/sleep.php index b3ea8235..e34a6834 100644 --- a/tests/sleep.php +++ b/tests/sleep.php @@ -5,11 +5,11 @@ declare(strict_types=1); use Spiral\Goridge\StreamRelay; use Spiral\RoadRunner\Worker as RoadRunner; -require dirname(__DIR__) . "/vendor_php/autoload.php"; +require __DIR__ . "/vendor/autoload.php"; $rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT)); -while($rr->receive($ctx)){ +while($rr->waitPayload()){ sleep(3); $rr->send(""); -} \ No newline at end of file +} diff --git a/tests/slow-client.php b/tests/slow-client.php index ece0a439..7737f0b1 100644 --- a/tests/slow-client.php +++ b/tests/slow-client.php @@ -3,7 +3,7 @@ use Spiral\Goridge; ini_set('display_errors', 'stderr'); -require dirname(__DIR__) . "/vendor_php/autoload.php"; +require __DIR__ . "/vendor/autoload.php"; if (count($argv) < 3) { die("need 2 arguments"); diff --git a/tests/slow-destroy.php b/tests/slow-destroy.php index e2a01af2..900bb68a 100644 --- a/tests/slow-destroy.php +++ b/tests/slow-destroy.php @@ -3,7 +3,7 @@ use Spiral\Goridge; ini_set('display_errors', 'stderr'); -require dirname(__DIR__) . "/vendor_php/autoload.php"; +require __DIR__ . "/vendor/autoload.php"; if (count($argv) < 3) { die("need 2 arguments"); diff --git a/tests/slow-pid.php b/tests/slow-pid.php index 747e7e86..3660cb40 100644 --- a/tests/slow-pid.php +++ b/tests/slow-pid.php @@ -8,7 +8,7 @@ $rr = new RoadRunner\Worker($relay); - while ($in = $rr->receive($ctx)) { + while ($in = $rr->waitPayload()) { try { sleep(1); $rr->send((string)getmypid()); diff --git a/tests/src/Environment.php b/tests/src/Environment.php new file mode 100644 index 00000000..9b306063 --- /dev/null +++ b/tests/src/Environment.php @@ -0,0 +1,82 @@ +env = $env; + } + + /** + * Returns worker mode assigned to the PHP process. + * + * @return string + * @throws EnvironmentException + */ + public function getMode(): string + { + return $this->getValue('RR_MODE'); + } + + /** + * Address worker should be connected to (or pipes). + * + * @return string + * @throws EnvironmentException + */ + public function getRelayAddress(): string + { + return $this->getValue('RR_RELAY'); + } + + /** + * RPC address. + * + * @return string + * @throws EnvironmentException + */ + public function getRPCAddress(): string + { + return $this->getValue('RR_RPC'); + } + + /** + * @param string $name + * @return string + * @throws EnvironmentException + */ + private function getValue(string $name): string + { + if (!isset($this->env[$name])) { + throw new EnvironmentException(sprintf("Missing environment value `%s`", $name)); + } + + return (string) $this->env[$name]; + } + + /** + * @return EnvironmentInterface + */ + public static function fromGlobals(): EnvironmentInterface + { + return new static(array_merge($_SERVER, $_ENV)); + } +} diff --git a/tests/src/EnvironmentInterface.php b/tests/src/EnvironmentInterface.php new file mode 100644 index 00000000..bc0ae043 --- /dev/null +++ b/tests/src/EnvironmentInterface.php @@ -0,0 +1,43 @@ +worker = $worker; + } + + /** + * @return WorkerInterface + */ + public function getWorker(): WorkerInterface + { + return $this->worker; + } + + /** + * Wait for incoming http request. + * + * @return Request|null + */ + public function waitRequest(): ?Request + { + $payload = $this->getWorker()->waitPayload(); + if (empty($payload->body) && empty($payload->header)) { + // termination request + return null; + } + + $request = new Request(); + $request->body = $payload->body; + + $context = json_decode($payload->header, true); + if ($context === null) { + // invalid context + return null; + } + + $this->hydrateRequest($request, $context); + + return $request; + } + + /** + * Send response to the application server. + * + * @param int $status Http status code + * @param string $body Body of response + * @param string[][] $headers An associative array of the message's headers. Each + * key MUST be a header name, and each value MUST be an array of strings + * for that header. + */ + public function respond(int $status, string $body, array $headers = []): void + { + if ($headers === []) { + // this is required to represent empty header set as map and not as array + $headers = new \stdClass(); + } + + $this->getWorker()->send( + $body, + (string) json_encode(['status' => $status, 'headers' => $headers]) + ); + } + + /** + * @param Request $request + * @param array $context + */ + private function hydrateRequest(Request $request, array $context): void + { + $request->remoteAddr = $context['remoteAddr']; + $request->protocol = $context['protocol']; + $request->method = $context['method']; + $request->uri = $context['uri']; + $request->attributes = $context['attributes']; + $request->headers = $context['headers']; + $request->cookies = $context['cookies']; + $request->uploads = $context['uploads']; + parse_str($context['rawQuery'], $request->query); + + // indicates that body was parsed + $request->parsed = $context['parsed']; + } +} diff --git a/tests/src/Http/PSR7Worker.php b/tests/src/Http/PSR7Worker.php new file mode 100644 index 00000000..b985d288 --- /dev/null +++ b/tests/src/Http/PSR7Worker.php @@ -0,0 +1,214 @@ +httpWorker = new HttpWorker($worker); + $this->requestFactory = $requestFactory; + $this->streamFactory = $streamFactory; + $this->uploadsFactory = $uploadsFactory; + $this->originalServer = $_SERVER; + } + + /** + * @return WorkerInterface + */ + public function getWorker(): WorkerInterface + { + return $this->httpWorker->getWorker(); + } + + /** + * @return ServerRequestInterface|null + */ + public function waitRequest(): ?ServerRequestInterface + { + $httpRequest = $this->httpWorker->waitRequest(); + if ($httpRequest === null) { + return null; + } + + $_SERVER = $this->configureServer($httpRequest['ctx']); + + return $this->mapRequest($httpRequest, $_SERVER); + } + + /** + * Send response to the application server. + * + * @param ResponseInterface $response + */ + public function respond(ResponseInterface $response): void + { + $this->httpWorker->respond( + $response->getStatusCode(), + $response->getBody()->__toString(), + $response->getHeaders() + ); + } + + /** + * Returns altered copy of _SERVER variable. Sets ip-address, + * request-time and other values. + * + * @param Request $request + * @return mixed[] + */ + protected function configureServer(Request $request): array + { + $server = $this->originalServer; + + $server['REQUEST_URI'] = $request->uri; + $server['REQUEST_TIME'] = time(); + $server['REQUEST_TIME_FLOAT'] = microtime(true); + $server['REMOTE_ADDR'] = $request->getRemoteAddr(); + $server['REQUEST_METHOD'] = $request->method; + + $server['HTTP_USER_AGENT'] = ''; + foreach ($request->headers as $key => $value) { + $key = strtoupper(str_replace('-', '_', $key)); + if (\in_array($key, ['CONTENT_TYPE', 'CONTENT_LENGTH'])) { + $server[$key] = implode(', ', $value); + } else { + $server['HTTP_' . $key] = implode(', ', $value); + } + } + + return $server; + } + + /** + * @param Request $httpRequest + * @param array $server + * @return ServerRequestInterface + */ + protected function mapRequest(Request $httpRequest, array $server): ServerRequestInterface + { + $request = $this->requestFactory->createServerRequest( + $httpRequest->method, + $httpRequest->uri, + $_SERVER + ); + + $request = $request + ->withProtocolVersion(static::fetchProtocolVersion($httpRequest->protocol)) + ->withCookieParams($httpRequest->cookies) + ->withQueryParams($httpRequest->query) + ->withUploadedFiles($this->wrapUploads($httpRequest->uploads)); + + foreach ($httpRequest->attributes as $name => $value) { + $request = $request->withAttribute($name, $value); + } + + foreach ($httpRequest->headers as $name => $value) { + $request = $request->withHeader($name, $value); + } + + if ($httpRequest->parsed) { + return $request->withParsedBody($httpRequest->getParsedBody()); + } + + if ($httpRequest->body !== null) { + return $request->withBody($this->streamFactory->createStream($httpRequest->body)); + } + + return $request; + } + + /** + * Wraps all uploaded files with UploadedFile. + * + * @param array[] $files + * @return UploadedFileInterface[]|mixed[] + */ + protected function wrapUploads(array $files): array + { + $result = []; + foreach ($files as $index => $f) { + if (!isset($f['name'])) { + $result[$index] = $this->wrapUploads($f); + continue; + } + + if (UPLOAD_ERR_OK === $f['error']) { + $stream = $this->streamFactory->createStreamFromFile($f['tmpName']); + } else { + $stream = $this->streamFactory->createStream(); + } + + $result[$index] = $this->uploadsFactory->createUploadedFile( + $stream, + $f['size'], + $f['error'], + $f['name'], + $f['mime'] + ); + } + + return $result; + } + + /** + * Normalize HTTP protocol version to valid values + * + * @param string $version + * @return string + */ + private static function fetchProtocolVersion(string $version): string + { + $v = substr($version, 5); + + if ($v === '2.0') { + return '2'; + } + + // Fallback for values outside of valid protocol versions + if (!in_array($v, static::$allowedVersions, true)) { + return '1.1'; + } + + return $v; + } +} diff --git a/tests/src/Http/Request.php b/tests/src/Http/Request.php new file mode 100644 index 00000000..ef67e28d --- /dev/null +++ b/tests/src/Http/Request.php @@ -0,0 +1,48 @@ +attributes['ipAddress'] ?? $this->remoteAddr ?? '127.0.0.1'; + } + + /** + * @return array|null + */ + public function getParsedBody(): ?array + { + if ($this->parsed) { + return json_decode($this->body, true); + } + + return null; + } +} diff --git a/tests/src/Payload.php b/tests/src/Payload.php new file mode 100644 index 00000000..c9b8c198 --- /dev/null +++ b/tests/src/Payload.php @@ -0,0 +1,43 @@ +body = $body; + $this->header = $header; + } +} diff --git a/tests/src/Worker.php b/tests/src/Worker.php new file mode 100644 index 00000000..53cf6cef --- /dev/null +++ b/tests/src/Worker.php @@ -0,0 +1,162 @@ +waitPayload()) { + * $worker->send(new Payload("DONE", json_encode($context))); + * } + */ +class Worker implements WorkerInterface +{ + // Request graceful worker termination. + private const STOP_REQUEST = '{"stop":true}'; + + private Relay $relay; + + /** + * @param Relay $relay + */ + public function __construct(Relay $relay) + { + $this->relay = $relay; + } + + /** + * Wait for incoming payload from the server. Must return null when worker stopped. + * + * @return Payload|null + * @throws GoridgeException + * @throws RoadRunnerException + */ + public function waitPayload(): ?Payload + { + $frame = $this->relay->waitFrame(); + + if ($frame->hasFlag(Frame::CONTROL)) { + $continue = $this->handleControl($frame->payload); + + if ($continue) { + return $this->waitPayload(); + } else { + return null; + } + } + + return new Payload( + substr($frame->payload, $frame->options[0]), + substr($frame->payload, 0, $frame->options[0]) + ); + } + + /** + * Respond to the server with the processing result. + * + * @param Payload $payload + * @throws GoridgeException + */ + public function respond(Payload $payload): void + { + $this->send($payload->body, $payload->header); + } + + /** + * Respond to the server with an error. Error must be treated as TaskError and might not cause + * worker destruction. + * + * Example: + * + * $worker->error("invalid payload"); + * + * @param string $message + */ + public function error(string $message): void + { + $this->relay->send(new Frame($message, [], Frame::ERROR)); + } + + /** + * Terminate the process. Server must automatically pass task to the next available process. + * Worker will receive StopCommand context after calling this method. + * + * Attention, you MUST use continue; after invoking this method to let rr to properly + * stop worker. + * + * @throws GoridgeException + */ + public function stop(): void + { + $this->send("", self::STOP_REQUEST); + } + + /** + * @param string $body + * @param string|null $context + * @throws GoridgeException + */ + public function send(string $body, string $context = null): void + { + $this->relay->send(new Frame( + (string) $context . $body, + [strlen((string) $context)] + )); + } + + /** + * Return true if continue. + * + * @param string $header + * @return bool + * + * @throws RoadRunnerException + */ + private function handleControl(string $header): bool + { + $command = json_decode($header, true); + if ($command === false) { + throw new RoadRunnerException('Invalid task header, JSON payload is expected'); + } + + switch (true) { + case !empty($command['pid']): + $this->relay->send(new Frame(sprintf('{"pid":%s}', getmypid()), [], Frame::CONTROL)); + return true; + + case !empty($command['stop']): + return false; + + default: + throw new RoadRunnerException('Invalid task header, undefined control package'); + } + } + + /** + * Create Worker using global environment configuration. + * + * @return WorkerInterface + * @throws EnvironmentException + */ + public static function create(): WorkerInterface + { + $env = Environment::fromGlobals(); + + return new static(\Spiral\Goridge\Relay::create($env->getRelayAddress())); + } +} diff --git a/tests/src/WorkerInterface.php b/tests/src/WorkerInterface.php new file mode 100644 index 00000000..bf0b6e06 --- /dev/null +++ b/tests/src/WorkerInterface.php @@ -0,0 +1,55 @@ +error("invalid payload"); + * + * @param string $error + * @throws GoridgeException + */ + public function error(string $error): void; + + /** + * Terminate the process. Server must automatically pass task to the next available process. + * Worker will receive stop command after calling this method. + * + * Attention, you MUST use continue; after invoking this method to let rr to properly stop worker. + */ + public function stop(): void; +} diff --git a/tests/stop.php b/tests/stop.php index 0100ad0f..f83d3f29 100644 --- a/tests/stop.php +++ b/tests/stop.php @@ -9,7 +9,7 @@ use Spiral\RoadRunner; $rr = new RoadRunner\Worker($relay); $used = false; -while ($in = $rr->receive($ctx)) { +while ($in = $rr->waitPayload()) { try { if ($used) { // kill on second attempt -- cgit v1.2.3