summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2020-12-14 13:27:35 +0300
committerWolfy-J <[email protected]>2020-12-14 13:27:35 +0300
commit00b42663891713f142a6cc67bcccdc31353daeb2 (patch)
tree1c3966f18c15f3815e47cb065917b314be23472c
parent8203dc4d76624f4fceddff49b8a1aba9d525fc73 (diff)
- removed old RoadRunner code
- added new RR source code
-rwxr-xr-xMakefile1
-rwxr-xr-xcomposer.json44
-rw-r--r--src/Diactoros/ServerRequestFactory.php28
-rw-r--r--src/Diactoros/StreamFactory.php57
-rw-r--r--src/Diactoros/UploadedFileFactory.php36
-rw-r--r--src/Exception/MetricException.php17
-rw-r--r--src/Exceptions/RoadRunnerException.php18
-rw-r--r--src/HttpClient.php74
-rw-r--r--src/Metrics.php80
-rw-r--r--src/MetricsInterface.php64
-rw-r--r--src/Worker.php178
-rw-r--r--tests/broken.php4
-rw-r--r--tests/client.php2
-rw-r--r--tests/composer.json12
-rw-r--r--tests/delay.php4
-rw-r--r--tests/echo.php4
-rw-r--r--tests/error.php4
-rw-r--r--tests/head.php4
-rw-r--r--tests/memleak.php6
-rw-r--r--tests/pid.php2
-rw-r--r--tests/sleep.php6
-rw-r--r--tests/slow-client.php2
-rw-r--r--tests/slow-destroy.php2
-rw-r--r--tests/slow-pid.php2
-rw-r--r--tests/src/Environment.php82
-rw-r--r--tests/src/EnvironmentInterface.php43
-rw-r--r--tests/src/Exception/EnvironmentException.php16
-rw-r--r--tests/src/Exception/RoadRunnerException.php (renamed from src/Exception/RoadRunnerException.php)7
-rw-r--r--tests/src/Http/HttpWorker.php103
-rw-r--r--tests/src/Http/PSR7Worker.php (renamed from src/PSR7Client.php)153
-rw-r--r--tests/src/Http/Request.php48
-rw-r--r--tests/src/Payload.php43
-rw-r--r--tests/src/Worker.php162
-rw-r--r--tests/src/WorkerInterface.php55
-rw-r--r--tests/stop.php2
35 files changed, 665 insertions, 700 deletions
diff --git a/Makefile b/Makefile
index 9ad158ba..bb1e1975 100755
--- a/Makefile
+++ b/Makefile
@@ -24,7 +24,6 @@ uninstall: ## Uninstall locally installed RR
rm -f /usr/local/bin/rr
test: ## Run application tests
- test -d ./vendor_php || composer update --prefer-dist --ansi
go test -v -race -cover
go test -v -race -cover ./util
go test -v -race -cover ./service
diff --git a/composer.json b/composer.json
deleted file mode 100755
index e3017b97..00000000
--- a/composer.json
+++ /dev/null
@@ -1,44 +0,0 @@
-{
- "name": "spiral/roadrunner",
- "type": "server",
- "description": "High-performance PHP application server, load-balancer and process manager written in Golang",
- "license": "MIT",
- "authors": [
- {
- "name": "Anton Titov / Wolfy-J",
- "email": "[email protected]"
- },
- {
- "name": "RoadRunner Community",
- "homepage": "https://github.com/spiral/roadrunner/graphs/contributors"
- }
- ],
- "require": {
- "php": "^7.4 || ^8.0",
- "ext-json": "*",
- "ext-curl": "*",
- "spiral/goridge": "^2.4.2",
- "psr/http-factory": "^1.0.1",
- "psr/http-message": "^1.0.1",
- "symfony/console": "^2.5.0 || ^3.0.0 || ^4.0.0 || ^5.0.0",
- "laminas/laminas-diactoros": "^1.3.6 || ^2.0",
- "composer/package-versions-deprecated": "^1.8"
- },
- "config": {
- "vendor-dir": "vendor_php"
- },
- "require-dev": {
- "phpstan/phpstan": "~0.12.34"
- },
- "scripts": {
- "analyze": "phpstan analyze -c ./phpstan.neon.dist --no-progress --ansi"
- },
- "autoload": {
- "psr-4": {
- "Spiral\\RoadRunner\\": "src/"
- }
- },
- "bin": [
- "bin/rr"
- ]
-} \ No newline at end of file
diff --git a/src/Diactoros/ServerRequestFactory.php b/src/Diactoros/ServerRequestFactory.php
deleted file mode 100644
index 6a42f207..00000000
--- a/src/Diactoros/ServerRequestFactory.php
+++ /dev/null
@@ -1,28 +0,0 @@
-<?php
-
-/**
- * High-performance PHP process supervisor and load balancer written in Go
- *
- * @author Wolfy-J
- */
-declare(strict_types=1);
-
-namespace Spiral\RoadRunner\Diactoros;
-
-use Psr\Http\Message\ServerRequestFactoryInterface;
-use Psr\Http\Message\ServerRequestInterface;
-use Laminas\Diactoros\ServerRequest;
-
-final class ServerRequestFactory implements ServerRequestFactoryInterface
-{
- /**
- * @inheritdoc
- *
- * @param array<mixed> $serverParams Array of SAPI parameters with which to seed the generated request instance.
- */
- public function createServerRequest(string $method, $uri, array $serverParams = []): ServerRequestInterface
- {
- $uploadedFiles = [];
- return new ServerRequest($serverParams, $uploadedFiles, $uri, $method);
- }
-}
diff --git a/src/Diactoros/StreamFactory.php b/src/Diactoros/StreamFactory.php
deleted file mode 100644
index 68a77e92..00000000
--- a/src/Diactoros/StreamFactory.php
+++ /dev/null
@@ -1,57 +0,0 @@
-<?php
-
-/**
- * High-performance PHP process supervisor and load balancer written in Go
- *
- * @author Wolfy-J
- */
-declare(strict_types=1);
-
-namespace Spiral\RoadRunner\Diactoros;
-
-use RuntimeException;
-use Psr\Http\Message\StreamFactoryInterface;
-use Psr\Http\Message\StreamInterface;
-use Laminas\Diactoros\Stream;
-
-final class StreamFactory implements StreamFactoryInterface
-{
- /**
- * @inheritdoc
- * @throws RuntimeException
- */
- public function createStream(string $content = ''): StreamInterface
- {
- $resource = fopen('php://temp', 'rb+');
-
- if (! \is_resource($resource)) {
- throw new RuntimeException('Cannot create stream');
- }
-
- fwrite($resource, $content);
- rewind($resource);
- return $this->createStreamFromResource($resource);
- }
-
- /**
- * @inheritdoc
- */
- public function createStreamFromFile(string $file, string $mode = 'rb'): StreamInterface
- {
- $resource = fopen($file, $mode);
-
- if (! \is_resource($resource)) {
- throw new RuntimeException('Cannot create stream');
- }
-
- return $this->createStreamFromResource($resource);
- }
-
- /**
- * @inheritdoc
- */
- public function createStreamFromResource($resource): StreamInterface
- {
- return new Stream($resource);
- }
-}
diff --git a/src/Diactoros/UploadedFileFactory.php b/src/Diactoros/UploadedFileFactory.php
deleted file mode 100644
index daa475c1..00000000
--- a/src/Diactoros/UploadedFileFactory.php
+++ /dev/null
@@ -1,36 +0,0 @@
-<?php
-
-/**
- * High-performance PHP process supervisor and load balancer written in Go
- *
- * @author Wolfy-J
- */
-declare(strict_types=1);
-
-namespace Spiral\RoadRunner\Diactoros;
-
-use Psr\Http\Message\StreamInterface;
-use Psr\Http\Message\UploadedFileFactoryInterface;
-use Psr\Http\Message\UploadedFileInterface;
-use Laminas\Diactoros\UploadedFile;
-
-final class UploadedFileFactory implements UploadedFileFactoryInterface
-{
- /**
- * @inheritdoc
- */
- public function createUploadedFile(
- StreamInterface $stream,
- int $size = null,
- int $error = \UPLOAD_ERR_OK,
- string $clientFilename = null,
- string $clientMediaType = null
- ): UploadedFileInterface {
- if ($size === null) {
- $size = (int) $stream->getSize();
- }
-
- /** @var resource $stream */
- return new UploadedFile($stream, $size, $error, $clientFilename, $clientMediaType);
- }
-}
diff --git a/src/Exception/MetricException.php b/src/Exception/MetricException.php
deleted file mode 100644
index d5b738b8..00000000
--- a/src/Exception/MetricException.php
+++ /dev/null
@@ -1,17 +0,0 @@
-<?php
-
-/**
- * Spiral Framework.
- *
- * @license MIT
- * @author Anton Titov (Wolfy-J)
- */
-declare(strict_types=1);
-
-namespace Spiral\RoadRunner\Exception;
-
-use Spiral\Goridge\Exceptions\RPCException;
-
-class MetricException extends RPCException
-{
-}
diff --git a/src/Exceptions/RoadRunnerException.php b/src/Exceptions/RoadRunnerException.php
deleted file mode 100644
index 43967893..00000000
--- a/src/Exceptions/RoadRunnerException.php
+++ /dev/null
@@ -1,18 +0,0 @@
-<?php
-
-/**
- * Spiral Framework.
- *
- * @license MIT
- * @author Anton Titov (Wolfy-J)
- */
-declare(strict_types=1);
-
-namespace Spiral\RoadRunner\Exceptions;
-
-/**
- * @deprecated use \Spiral\RoadRunner\Exception\RoadRunnerException instead
- */
-class RoadRunnerException extends \RuntimeException
-{
-}
diff --git a/src/HttpClient.php b/src/HttpClient.php
deleted file mode 100644
index 9b9048ca..00000000
--- a/src/HttpClient.php
+++ /dev/null
@@ -1,74 +0,0 @@
-<?php
-
-/**
- * High-performance PHP process supervisor and load balancer written in Go
- *
- * @author Alex Bond
- */
-declare(strict_types=1);
-
-namespace Spiral\RoadRunner;
-
-final class HttpClient
-{
- /** @var Worker */
- private $worker;
-
- /**
- * @param Worker $worker
- */
- public function __construct(Worker $worker)
- {
- $this->worker = $worker;
- }
-
- /**
- * @return Worker
- */
- public function getWorker(): Worker
- {
- return $this->worker;
- }
-
- /**
- * @return mixed[]|null Request information as ['ctx'=>[], 'body'=>string]
- * or null if termination request or invalid context.
- */
- public function acceptRequest(): ?array
- {
- $body = $this->getWorker()->receive($ctx);
- if (empty($body) && empty($ctx)) {
- // termination request
- return null;
- }
-
- $ctx = json_decode($ctx, true);
- if ($ctx === null) {
- // invalid context
- return null;
- }
-
- return ['ctx' => $ctx, 'body' => $body];
- }
-
- /**
- * Send response to the application server.
- *
- * @param int $status Http status code
- * @param string $body Body of response
- * @param string[][] $headers An associative array of the message's headers. Each
- * key MUST be a header name, and each value MUST be an array of strings
- * for that header.
- */
- public function respond(int $status, string $body, array $headers = []): void
- {
- $sendHeaders = empty($headers)
- ? new \stdClass() // this is required to represent empty header set as map and not as array
- : $headers;
-
- $this->getWorker()->send(
- $body,
- (string) json_encode(['status' => $status, 'headers' => $sendHeaders])
- );
- }
-}
diff --git a/src/Metrics.php b/src/Metrics.php
deleted file mode 100644
index d6b6e1da..00000000
--- a/src/Metrics.php
+++ /dev/null
@@ -1,80 +0,0 @@
-<?php
-
-/**
- * Spiral Framework.
- *
- * @license MIT
- * @author Anton Titov (Wolfy-J)
- */
-declare(strict_types=1);
-
-namespace Spiral\RoadRunner;
-
-use Spiral\Goridge\Exceptions\RPCException;
-use Spiral\Goridge\RPC;
-use Spiral\RoadRunner\Exception\MetricException;
-
-/**
- * Application metrics.
- */
-final class Metrics implements MetricsInterface
-{
- /** @var RPC */
- private $rpc;
-
- /**
- * @param RPC $rpc
- */
- public function __construct(RPC $rpc)
- {
- $this->rpc = $rpc;
- }
-
- /**
- * @inheritDoc
- */
- public function add(string $name, float $value, array $labels = []): void
- {
- try {
- $this->rpc->call('metrics.Add', compact('name', 'value', 'labels'));
- } catch (RPCException $e) {
- throw new MetricException($e->getMessage(), $e->getCode(), $e);
- }
- }
-
- /**
- * @inheritDoc
- */
- public function sub(string $name, float $value, array $labels = []): void
- {
- try {
- $this->rpc->call('metrics.Sub', compact('name', 'value', 'labels'));
- } catch (RPCException $e) {
- throw new MetricException($e->getMessage(), $e->getCode(), $e);
- }
- }
-
- /**
- * @inheritDoc
- */
- public function observe(string $name, float $value, array $labels = []): void
- {
- try {
- $this->rpc->call('metrics.Observe', compact('name', 'value', 'labels'));
- } catch (RPCException $e) {
- throw new MetricException($e->getMessage(), $e->getCode(), $e);
- }
- }
-
- /**
- * @inheritDoc
- */
- public function set(string $name, float $value, array $labels = []): void
- {
- try {
- $this->rpc->call('metrics.Set', compact('name', 'value', 'labels'));
- } catch (RPCException $e) {
- throw new MetricException($e->getMessage(), $e->getCode(), $e);
- }
- }
-}
diff --git a/src/MetricsInterface.php b/src/MetricsInterface.php
deleted file mode 100644
index ec2009b0..00000000
--- a/src/MetricsInterface.php
+++ /dev/null
@@ -1,64 +0,0 @@
-<?php
-
-/**
- * Spiral Framework.
- *
- * @license MIT
- * @author Anton Titov (Wolfy-J)
- */
-declare(strict_types=1);
-
-namespace Spiral\RoadRunner;
-
-use Spiral\RoadRunner\Exception\MetricException;
-
-interface MetricsInterface
-{
- /**
- * Add collector value. Fallback to appropriate method of related collector.
- *
- * @param string $collector
- * @param float $value
- * @param mixed[] $labels
- *
- * @throws MetricException
- * @return void
- */
- public function add(string $collector, float $value, array $labels = []);
-
- /**
- * Subtract the collector value, only for gauge collector.
- *
- * @param string $collector
- * @param float $value
- * @param mixed[] $labels
- *
- * @throws MetricException
- * @return void
- */
- public function sub(string $collector, float $value, array $labels = []);
-
- /**
- * Observe collector value, only for histogram and summary collectors.
- *
- * @param string $collector
- * @param float $value
- * @param mixed[] $labels
- *
- * @throws MetricException
- * @return void
- */
- public function observe(string $collector, float $value, array $labels = []);
-
- /**
- * Set collector value, only for gauge collector.
- *
- * @param string $collector
- * @param float $value
- * @param mixed[] $labels
- *
- * @throws MetricException
- * @return void
- */
- public function set(string $collector, float $value, array $labels = []);
-}
diff --git a/src/Worker.php b/src/Worker.php
deleted file mode 100644
index d509562e..00000000
--- a/src/Worker.php
+++ /dev/null
@@ -1,178 +0,0 @@
-<?php
-
-/**
- * High-performance PHP process supervisor and load balancer written in Go
- *
- * @author Wolfy-J
- */
-declare(strict_types=1);
-
-namespace Spiral\RoadRunner;
-
-use Spiral\Goridge\Exceptions\GoridgeException;
-use Spiral\Goridge\RelayInterface as Relay;
-use Spiral\Goridge\SendPackageRelayInterface;
-use Spiral\RoadRunner\Exception\RoadRunnerException;
-
-/**
- * Accepts connection from RoadRunner server over given Goridge relay.
- *
- * Example:
- *
- * $worker = new Worker(new Goridge\StreamRelay(STDIN, STDOUT));
- * while ($task = $worker->receive($context)) {
- * $worker->send("DONE", json_encode($context));
- * }
- */
-class Worker
-{
- // Send as response context to request worker termination
- public const STOP = '{"stop":true}';
-
- /** @var Relay */
- private $relay;
-
- /**
- * @param Relay $relay
- */
- public function __construct(Relay $relay)
- {
- $this->relay = $relay;
- }
-
- /**
- * Receive packet of information to process, returns null when process must be stopped. Might
- * return Error to wrap error message from server.
- *
- * @param mixed $header
- * @return \Error|null|string
- *
- * @throws GoridgeException
- */
- public function receive(&$header)
- {
- $body = $this->relay->receiveSync($flags);
-
- if ($flags & Relay::PAYLOAD_CONTROL) {
- if ($this->handleControl($body, $header, $flags)) {
- // wait for the next command
- return $this->receive($header);
- }
-
- // no context for the termination.
- $header = null;
-
- // Expect process termination
- return null;
- }
-
- if ($flags & Relay::PAYLOAD_ERROR) {
- return new \Error((string)$body);
- }
-
- return $body;
- }
-
- /**
- * Respond to the server with result of task execution and execution context.
- *
- * Example:
- * $worker->respond((string)$response->getBody(), json_encode($response->getHeaders()));
- *
- * @param string|null $payload
- * @param string|null $header
- */
- public function send(string $payload = null, string $header = null): void
- {
- if (!$this->relay instanceof SendPackageRelayInterface) {
- if ($header === null) {
- $this->relay->send('', Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_NONE);
- } else {
- $this->relay->send($header, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW);
- }
-
- $this->relay->send((string)$payload, Relay::PAYLOAD_RAW);
- } else {
- $this->relay->sendPackage(
- (string)$header,
- Relay::PAYLOAD_CONTROL | ($header === null ? Relay::PAYLOAD_NONE : Relay::PAYLOAD_RAW),
- (string)$payload,
- Relay::PAYLOAD_RAW
- );
- }
- }
-
- /**
- * Respond to the server with an error. Error must be treated as TaskError and might not cause
- * worker destruction.
- *
- * Example:
- *
- * $worker->error("invalid payload");
- *
- * @param string $message
- */
- public function error(string $message): void
- {
- $this->relay->send(
- $message,
- Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW | Relay::PAYLOAD_ERROR
- );
- }
-
- /**
- * Terminate the process. Server must automatically pass task to the next available process.
- * Worker will receive StopCommand context after calling this method.
- *
- * Attention, you MUST use continue; after invoking this method to let rr to properly
- * stop worker.
- *
- * @throws GoridgeException
- */
- public function stop(): void
- {
- $this->send(null, self::STOP);
- }
-
- /**
- * Handles incoming control command payload and executes it if required.
- *
- * @param string $body
- * @param mixed $header Exported context (if any).
- * @param int $flags
- * @return bool True when continue processing.
- *
- * @throws RoadRunnerException
- */
- private function handleControl(string $body = null, &$header = null, int $flags = 0): bool
- {
- $header = $body;
- if ($body === null || $flags & Relay::PAYLOAD_RAW) {
- // empty or raw prefix
- return true;
- }
-
- $p = json_decode($body, true);
- if ($p === false) {
- throw new RoadRunnerException('invalid task context, JSON payload is expected');
- }
-
- // PID negotiation (socket connections only)
- if (!empty($p['pid'])) {
- $this->relay->send(
- sprintf('{"pid":%s}', getmypid()),
- Relay::PAYLOAD_CONTROL
- );
- }
-
- // termination request
- if (!empty($p['stop'])) {
- return false;
- }
-
- // parsed header
- $header = $p;
-
- return true;
- }
-}
diff --git a/tests/broken.php b/tests/broken.php
index 42b4e7c2..1f869b2d 100644
--- a/tests/broken.php
+++ b/tests/broken.php
@@ -8,7 +8,7 @@ use Spiral\RoadRunner;
$rr = new RoadRunner\Worker($relay);
-while ($in = $rr->receive($ctx)) {
+while ($in = $rr->waitPayload()) {
echo undefined_function();
- $rr->send((string)$in);
+ $rr->send((string)$in->body, null);
}
diff --git a/tests/client.php b/tests/client.php
index 835b1c6c..c00cece1 100644
--- a/tests/client.php
+++ b/tests/client.php
@@ -3,7 +3,7 @@
use Spiral\Goridge;
ini_set('display_errors', 'stderr');
-require dirname(__DIR__) . "/vendor_php/autoload.php";
+require __DIR__ . "/vendor/autoload.php";
if (count($argv) < 3) {
die("need 2 arguments");
diff --git a/tests/composer.json b/tests/composer.json
new file mode 100644
index 00000000..24702c37
--- /dev/null
+++ b/tests/composer.json
@@ -0,0 +1,12 @@
+{
+ "minimum-stability": "beta",
+ "require": {
+ "nyholm/psr7": "^1.3",
+ "spiral/goridge": "^3.0@beta"
+ },
+ "autoload": {
+ "psr-4": {
+ "Spiral\\RoadRunner\\": "src/"
+ }
+ }
+}
diff --git a/tests/delay.php b/tests/delay.php
index bf9ecc12..f0435b05 100644
--- a/tests/delay.php
+++ b/tests/delay.php
@@ -8,9 +8,9 @@ use Spiral\RoadRunner;
$rr = new RoadRunner\Worker($relay);
-while ($in = $rr->receive($ctx)) {
+while ($in = $rr->waitPayload()) {
try {
- usleep($in * 1000);
+ usleep($in->body * 1000);
$rr->send('');
} catch (\Throwable $e) {
$rr->error((string)$e);
diff --git a/tests/echo.php b/tests/echo.php
index 1570e3df..83eec92e 100644
--- a/tests/echo.php
+++ b/tests/echo.php
@@ -8,9 +8,9 @@ use Spiral\RoadRunner;
$rr = new RoadRunner\Worker($relay);
-while ($in = $rr->receive($ctx)) {
+while ($in = $rr->waitPayload()) {
try {
- $rr->send((string)$in);
+ $rr->send((string)$in->body);
} catch (\Throwable $e) {
$rr->error((string)$e);
}
diff --git a/tests/error.php b/tests/error.php
index 8e1c8d0d..c77e6817 100644
--- a/tests/error.php
+++ b/tests/error.php
@@ -8,6 +8,6 @@ use Spiral\RoadRunner;
$rr = new RoadRunner\Worker($relay);
-while ($in = $rr->receive($ctx)) {
- $rr->error((string)$in);
+while ($in = $rr->waitPayload()) {
+ $rr->error((string)$in->body);
}
diff --git a/tests/head.php b/tests/head.php
index 88ebd3f2..3c57258f 100644
--- a/tests/head.php
+++ b/tests/head.php
@@ -8,9 +8,9 @@ use Spiral\RoadRunner;
$rr = new RoadRunner\Worker($relay);
-while ($in = $rr->receive($ctx)) {
+while ($in = $rr->waitPayload()) {
try {
- $rr->send("", (string)$ctx);
+ $rr->send("", (string)$in->header);
} catch (\Throwable $e) {
$rr->error((string)$e);
}
diff --git a/tests/memleak.php b/tests/memleak.php
index b78a76c0..9a5376f0 100644
--- a/tests/memleak.php
+++ b/tests/memleak.php
@@ -5,11 +5,11 @@ declare(strict_types=1);
use Spiral\Goridge\StreamRelay;
use Spiral\RoadRunner\Worker as RoadRunner;
-require dirname(__DIR__) . "/vendor_php/autoload.php";
+require __DIR__ . "/vendor/autoload.php";
$rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT));
$mem = '';
-while($rr->receive($ctx)){
+while($rr->waitPayload()){
$mem .= str_repeat(" ", 1024*1024);
$rr->send("");
-} \ No newline at end of file
+}
diff --git a/tests/pid.php b/tests/pid.php
index bf10a025..f8b2515d 100644
--- a/tests/pid.php
+++ b/tests/pid.php
@@ -8,7 +8,7 @@
$rr = new RoadRunner\Worker($relay);
- while ($in = $rr->receive($ctx)) {
+ while ($in = $rr->waitPayload()) {
try {
$rr->send((string)getmypid());
} catch (\Throwable $e) {
diff --git a/tests/sleep.php b/tests/sleep.php
index b3ea8235..e34a6834 100644
--- a/tests/sleep.php
+++ b/tests/sleep.php
@@ -5,11 +5,11 @@ declare(strict_types=1);
use Spiral\Goridge\StreamRelay;
use Spiral\RoadRunner\Worker as RoadRunner;
-require dirname(__DIR__) . "/vendor_php/autoload.php";
+require __DIR__ . "/vendor/autoload.php";
$rr = new RoadRunner(new StreamRelay(\STDIN, \STDOUT));
-while($rr->receive($ctx)){
+while($rr->waitPayload()){
sleep(3);
$rr->send("");
-} \ No newline at end of file
+}
diff --git a/tests/slow-client.php b/tests/slow-client.php
index ece0a439..7737f0b1 100644
--- a/tests/slow-client.php
+++ b/tests/slow-client.php
@@ -3,7 +3,7 @@
use Spiral\Goridge;
ini_set('display_errors', 'stderr');
-require dirname(__DIR__) . "/vendor_php/autoload.php";
+require __DIR__ . "/vendor/autoload.php";
if (count($argv) < 3) {
die("need 2 arguments");
diff --git a/tests/slow-destroy.php b/tests/slow-destroy.php
index e2a01af2..900bb68a 100644
--- a/tests/slow-destroy.php
+++ b/tests/slow-destroy.php
@@ -3,7 +3,7 @@
use Spiral\Goridge;
ini_set('display_errors', 'stderr');
-require dirname(__DIR__) . "/vendor_php/autoload.php";
+require __DIR__ . "/vendor/autoload.php";
if (count($argv) < 3) {
die("need 2 arguments");
diff --git a/tests/slow-pid.php b/tests/slow-pid.php
index 747e7e86..3660cb40 100644
--- a/tests/slow-pid.php
+++ b/tests/slow-pid.php
@@ -8,7 +8,7 @@
$rr = new RoadRunner\Worker($relay);
- while ($in = $rr->receive($ctx)) {
+ while ($in = $rr->waitPayload()) {
try {
sleep(1);
$rr->send((string)getmypid());
diff --git a/tests/src/Environment.php b/tests/src/Environment.php
new file mode 100644
index 00000000..9b306063
--- /dev/null
+++ b/tests/src/Environment.php
@@ -0,0 +1,82 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go.
+ *
+ * @author Wolfy-J
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+use Spiral\RoadRunner\Exception\EnvironmentException;
+
+class Environment implements EnvironmentInterface
+{
+ /** @var array */
+ private array $env;
+
+ /**
+ * @param array $env
+ */
+ public function __construct(array $env)
+ {
+ $this->env = $env;
+ }
+
+ /**
+ * Returns worker mode assigned to the PHP process.
+ *
+ * @return string
+ * @throws EnvironmentException
+ */
+ public function getMode(): string
+ {
+ return $this->getValue('RR_MODE');
+ }
+
+ /**
+ * Address worker should be connected to (or pipes).
+ *
+ * @return string
+ * @throws EnvironmentException
+ */
+ public function getRelayAddress(): string
+ {
+ return $this->getValue('RR_RELAY');
+ }
+
+ /**
+ * RPC address.
+ *
+ * @return string
+ * @throws EnvironmentException
+ */
+ public function getRPCAddress(): string
+ {
+ return $this->getValue('RR_RPC');
+ }
+
+ /**
+ * @param string $name
+ * @return string
+ * @throws EnvironmentException
+ */
+ private function getValue(string $name): string
+ {
+ if (!isset($this->env[$name])) {
+ throw new EnvironmentException(sprintf("Missing environment value `%s`", $name));
+ }
+
+ return (string) $this->env[$name];
+ }
+
+ /**
+ * @return EnvironmentInterface
+ */
+ public static function fromGlobals(): EnvironmentInterface
+ {
+ return new static(array_merge($_SERVER, $_ENV));
+ }
+}
diff --git a/tests/src/EnvironmentInterface.php b/tests/src/EnvironmentInterface.php
new file mode 100644
index 00000000..bc0ae043
--- /dev/null
+++ b/tests/src/EnvironmentInterface.php
@@ -0,0 +1,43 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go.
+ *
+ * @author Wolfy-J
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+use Spiral\RoadRunner\Exception\EnvironmentException;
+
+/**
+ * Provides base values to configure roadrunner worker.
+ */
+interface EnvironmentInterface
+{
+ /**
+ * Returns worker mode assigned to the PHP process.
+ *
+ * @return string
+ * @throws EnvironmentException
+ */
+ public function getMode(): string;
+
+ /**
+ * Address worker should be connected to (or pipes).
+ *
+ * @return string
+ * @throws EnvironmentException
+ */
+ public function getRelayAddress(): string;
+
+ /**
+ * RPC address.
+ *
+ * @return string
+ * @throws EnvironmentException
+ */
+ public function getRPCAddress(): string;
+}
diff --git a/tests/src/Exception/EnvironmentException.php b/tests/src/Exception/EnvironmentException.php
new file mode 100644
index 00000000..227507c5
--- /dev/null
+++ b/tests/src/Exception/EnvironmentException.php
@@ -0,0 +1,16 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go.
+ *
+ * @author Wolfy-J
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner\Exception;
+
+class EnvironmentException extends RoadRunnerException
+{
+
+}
diff --git a/src/Exception/RoadRunnerException.php b/tests/src/Exception/RoadRunnerException.php
index f83c3dd4..2329370c 100644
--- a/src/Exception/RoadRunnerException.php
+++ b/tests/src/Exception/RoadRunnerException.php
@@ -1,14 +1,15 @@
<?php
/**
- * High-performance PHP process supervisor and load balancer written in Go
+ * High-performance PHP process supervisor and load balancer written in Go.
*
- * @author Wolfy-J
+ * @author Wolfy-J
*/
+
declare(strict_types=1);
namespace Spiral\RoadRunner\Exception;
-class RoadRunnerException extends \Spiral\RoadRunner\Exceptions\RoadRunnerException
+class RoadRunnerException extends \RuntimeException
{
}
diff --git a/tests/src/Http/HttpWorker.php b/tests/src/Http/HttpWorker.php
new file mode 100644
index 00000000..13fd6c27
--- /dev/null
+++ b/tests/src/Http/HttpWorker.php
@@ -0,0 +1,103 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go
+ *
+ * @author Alex Bond
+ */
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner\Http;
+
+use Spiral\RoadRunner\WorkerInterface;
+
+class HttpWorker
+{
+ /** @var WorkerInterface */
+ private WorkerInterface $worker;
+
+ /**
+ * @param WorkerInterface $worker
+ */
+ public function __construct(WorkerInterface $worker)
+ {
+ $this->worker = $worker;
+ }
+
+ /**
+ * @return WorkerInterface
+ */
+ public function getWorker(): WorkerInterface
+ {
+ return $this->worker;
+ }
+
+ /**
+ * Wait for incoming http request.
+ *
+ * @return Request|null
+ */
+ public function waitRequest(): ?Request
+ {
+ $payload = $this->getWorker()->waitPayload();
+ if (empty($payload->body) && empty($payload->header)) {
+ // termination request
+ return null;
+ }
+
+ $request = new Request();
+ $request->body = $payload->body;
+
+ $context = json_decode($payload->header, true);
+ if ($context === null) {
+ // invalid context
+ return null;
+ }
+
+ $this->hydrateRequest($request, $context);
+
+ return $request;
+ }
+
+ /**
+ * Send response to the application server.
+ *
+ * @param int $status Http status code
+ * @param string $body Body of response
+ * @param string[][] $headers An associative array of the message's headers. Each
+ * key MUST be a header name, and each value MUST be an array of strings
+ * for that header.
+ */
+ public function respond(int $status, string $body, array $headers = []): void
+ {
+ if ($headers === []) {
+ // this is required to represent empty header set as map and not as array
+ $headers = new \stdClass();
+ }
+
+ $this->getWorker()->send(
+ $body,
+ (string) json_encode(['status' => $status, 'headers' => $headers])
+ );
+ }
+
+ /**
+ * @param Request $request
+ * @param array $context
+ */
+ private function hydrateRequest(Request $request, array $context): void
+ {
+ $request->remoteAddr = $context['remoteAddr'];
+ $request->protocol = $context['protocol'];
+ $request->method = $context['method'];
+ $request->uri = $context['uri'];
+ $request->attributes = $context['attributes'];
+ $request->headers = $context['headers'];
+ $request->cookies = $context['cookies'];
+ $request->uploads = $context['uploads'];
+ parse_str($context['rawQuery'], $request->query);
+
+ // indicates that body was parsed
+ $request->parsed = $context['parsed'];
+ }
+}
diff --git a/src/PSR7Client.php b/tests/src/Http/PSR7Worker.php
index 777dd891..b985d288 100644
--- a/src/PSR7Client.php
+++ b/tests/src/Http/PSR7Worker.php
@@ -7,7 +7,7 @@
*/
declare(strict_types=1);
-namespace Spiral\RoadRunner;
+namespace Spiral\RoadRunner\Http;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestFactoryInterface;
@@ -15,100 +15,64 @@ use Psr\Http\Message\ServerRequestInterface;
use Psr\Http\Message\StreamFactoryInterface;
use Psr\Http\Message\UploadedFileFactoryInterface;
use Psr\Http\Message\UploadedFileInterface;
+use Spiral\RoadRunner\WorkerInterface;
/**
* Manages PSR-7 request and response.
*/
-class PSR7Client
+class PSR7Worker
{
- /** @var HttpClient */
- private $httpClient;
-
- /** @var ServerRequestFactoryInterface */
- private $requestFactory;
-
- /** @var StreamFactoryInterface */
- private $streamFactory;
-
- /** @var UploadedFileFactoryInterface */
- private $uploadsFactory;
+ private HttpWorker $httpWorker;
+ private ServerRequestFactoryInterface $requestFactory;
+ private StreamFactoryInterface $streamFactory;
+ private UploadedFileFactoryInterface $uploadsFactory;
/** @var mixed[] */
- private $originalServer = [];
+ private array $originalServer = [];
/** @var string[] Valid values for HTTP protocol version */
- private static $allowedVersions = ['1.0', '1.1', '2',];
+ private static array $allowedVersions = ['1.0', '1.1', '2',];
/**
- * @param Worker $worker
- * @param ServerRequestFactoryInterface|null $requestFactory
- * @param StreamFactoryInterface|null $streamFactory
- * @param UploadedFileFactoryInterface|null $uploadsFactory
+ * @param WorkerInterface $worker
+ * @param ServerRequestFactoryInterface $requestFactory
+ * @param StreamFactoryInterface $streamFactory
+ * @param UploadedFileFactoryInterface $uploadsFactory
*/
public function __construct(
- Worker $worker,
- ServerRequestFactoryInterface $requestFactory = null,
- StreamFactoryInterface $streamFactory = null,
- UploadedFileFactoryInterface $uploadsFactory = null
+ WorkerInterface $worker,
+ ServerRequestFactoryInterface $requestFactory,
+ StreamFactoryInterface $streamFactory,
+ UploadedFileFactoryInterface $uploadsFactory
) {
- $this->httpClient = new HttpClient($worker);
- $this->requestFactory = $requestFactory ?? new Diactoros\ServerRequestFactory();
- $this->streamFactory = $streamFactory ?? new Diactoros\StreamFactory();
- $this->uploadsFactory = $uploadsFactory ?? new Diactoros\UploadedFileFactory();
+ $this->httpWorker = new HttpWorker($worker);
+ $this->requestFactory = $requestFactory;
+ $this->streamFactory = $streamFactory;
+ $this->uploadsFactory = $uploadsFactory;
$this->originalServer = $_SERVER;
}
/**
- * @return Worker
+ * @return WorkerInterface
*/
- public function getWorker(): Worker
+ public function getWorker(): WorkerInterface
{
- return $this->httpClient->getWorker();
+ return $this->httpWorker->getWorker();
}
/**
* @return ServerRequestInterface|null
*/
- public function acceptRequest(): ?ServerRequestInterface
+ public function waitRequest(): ?ServerRequestInterface
{
- $rawRequest = $this->httpClient->acceptRequest();
- if ($rawRequest === null) {
+ $httpRequest = $this->httpWorker->waitRequest();
+ if ($httpRequest === null) {
return null;
}
- $_SERVER = $this->configureServer($rawRequest['ctx']);
-
- $request = $this->requestFactory->createServerRequest(
- $rawRequest['ctx']['method'],
- $rawRequest['ctx']['uri'],
- $_SERVER
- );
-
- parse_str($rawRequest['ctx']['rawQuery'], $query);
-
- $request = $request
- ->withProtocolVersion(static::fetchProtocolVersion($rawRequest['ctx']['protocol']))
- ->withCookieParams($rawRequest['ctx']['cookies'])
- ->withQueryParams($query)
- ->withUploadedFiles($this->wrapUploads($rawRequest['ctx']['uploads']));
+ $_SERVER = $this->configureServer($httpRequest['ctx']);
- foreach ($rawRequest['ctx']['attributes'] as $name => $value) {
- $request = $request->withAttribute($name, $value);
- }
-
- foreach ($rawRequest['ctx']['headers'] as $name => $value) {
- $request = $request->withHeader($name, $value);
- }
-
- if ($rawRequest['ctx']['parsed']) {
- return $request->withParsedBody(json_decode($rawRequest['body'], true));
- }
-
- if ($rawRequest['body'] !== null) {
- return $request->withBody($this->streamFactory->createStream($rawRequest['body']));
- }
-
- return $request;
+ return $this->mapRequest($httpRequest, $_SERVER);
}
/**
@@ -118,7 +82,7 @@ class PSR7Client
*/
public function respond(ResponseInterface $response): void
{
- $this->httpClient->respond(
+ $this->httpWorker->respond(
$response->getStatusCode(),
$response->getBody()->__toString(),
$response->getHeaders()
@@ -129,21 +93,21 @@ class PSR7Client
* Returns altered copy of _SERVER variable. Sets ip-address,
* request-time and other values.
*
- * @param mixed[] $ctx
+ * @param Request $request
* @return mixed[]
*/
- protected function configureServer(array $ctx): array
+ protected function configureServer(Request $request): array
{
$server = $this->originalServer;
- $server['REQUEST_URI'] = $ctx['uri'];
+ $server['REQUEST_URI'] = $request->uri;
$server['REQUEST_TIME'] = time();
$server['REQUEST_TIME_FLOAT'] = microtime(true);
- $server['REMOTE_ADDR'] = $ctx['attributes']['ipAddress'] ?? $ctx['remoteAddr'] ?? '127.0.0.1';
- $server['REQUEST_METHOD'] = $ctx['method'];
+ $server['REMOTE_ADDR'] = $request->getRemoteAddr();
+ $server['REQUEST_METHOD'] = $request->method;
$server['HTTP_USER_AGENT'] = '';
- foreach ($ctx['headers'] as $key => $value) {
+ foreach ($request->headers as $key => $value) {
$key = strtoupper(str_replace('-', '_', $key));
if (\in_array($key, ['CONTENT_TYPE', 'CONTENT_LENGTH'])) {
$server[$key] = implode(', ', $value);
@@ -156,18 +120,51 @@ class PSR7Client
}
/**
+ * @param Request $httpRequest
+ * @param array $server
+ * @return ServerRequestInterface
+ */
+ protected function mapRequest(Request $httpRequest, array $server): ServerRequestInterface
+ {
+ $request = $this->requestFactory->createServerRequest(
+ $httpRequest->method,
+ $httpRequest->uri,
+ $_SERVER
+ );
+
+ $request = $request
+ ->withProtocolVersion(static::fetchProtocolVersion($httpRequest->protocol))
+ ->withCookieParams($httpRequest->cookies)
+ ->withQueryParams($httpRequest->query)
+ ->withUploadedFiles($this->wrapUploads($httpRequest->uploads));
+
+ foreach ($httpRequest->attributes as $name => $value) {
+ $request = $request->withAttribute($name, $value);
+ }
+
+ foreach ($httpRequest->headers as $name => $value) {
+ $request = $request->withHeader($name, $value);
+ }
+
+ if ($httpRequest->parsed) {
+ return $request->withParsedBody($httpRequest->getParsedBody());
+ }
+
+ if ($httpRequest->body !== null) {
+ return $request->withBody($this->streamFactory->createStream($httpRequest->body));
+ }
+
+ return $request;
+ }
+
+ /**
* Wraps all uploaded files with UploadedFile.
*
* @param array[] $files
- *
* @return UploadedFileInterface[]|mixed[]
*/
- private function wrapUploads($files): array
+ protected function wrapUploads(array $files): array
{
- if (empty($files)) {
- return [];
- }
-
$result = [];
foreach ($files as $index => $f) {
if (!isset($f['name'])) {
diff --git a/tests/src/Http/Request.php b/tests/src/Http/Request.php
new file mode 100644
index 00000000..ef67e28d
--- /dev/null
+++ b/tests/src/Http/Request.php
@@ -0,0 +1,48 @@
+<?php
+
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner\Http;
+
+final class Request
+{
+
+ public string $remoteAddr;
+ public string $protocol;
+ public string $method;
+ public string $uri;
+ public array $headers;
+ public array $cookies;
+ public array $uploads;
+ public array $attributes;
+ public array $query;
+ public ?string $body;
+ public bool $parsed;
+
+ /**
+ * @return string
+ */
+ public function getRemoteAddr(): string
+ {
+ return $this->attributes['ipAddress'] ?? $this->remoteAddr ?? '127.0.0.1';
+ }
+
+ /**
+ * @return array|null
+ */
+ public function getParsedBody(): ?array
+ {
+ if ($this->parsed) {
+ return json_decode($this->body, true);
+ }
+
+ return null;
+ }
+}
diff --git a/tests/src/Payload.php b/tests/src/Payload.php
new file mode 100644
index 00000000..c9b8c198
--- /dev/null
+++ b/tests/src/Payload.php
@@ -0,0 +1,43 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go.
+ *
+ * @author Wolfy-J
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+/**
+ * Class Payload
+ *
+ * @package Spiral\RoadRunner
+ */
+final class Payload
+{
+ /**
+ * Execution payload (binary).
+ *
+ * @var string|null
+ */
+ public ?string $body;
+
+ /**
+ * Execution context (binary).
+ *
+ * @var string|null
+ */
+ public ?string $header;
+
+ /**
+ * @param string|null $body
+ * @param string|null $header
+ */
+ public function __construct(?string $body, ?string $header = null)
+ {
+ $this->body = $body;
+ $this->header = $header;
+ }
+}
diff --git a/tests/src/Worker.php b/tests/src/Worker.php
new file mode 100644
index 00000000..53cf6cef
--- /dev/null
+++ b/tests/src/Worker.php
@@ -0,0 +1,162 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go.
+ *
+ * @author Wolfy-J
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+use Spiral\Goridge\Exception\GoridgeException;
+use Spiral\Goridge\Frame;
+use Spiral\Goridge\RelayInterface as Relay;
+use Spiral\RoadRunner\Exception\EnvironmentException;
+use Spiral\RoadRunner\Exception\RoadRunnerException;
+
+/**
+ * Accepts connection from RoadRunner server over given Goridge relay.
+ *
+ * $worker = Worker::create();
+ * while ($p = $worker->waitPayload()) {
+ * $worker->send(new Payload("DONE", json_encode($context)));
+ * }
+ */
+class Worker implements WorkerInterface
+{
+ // Request graceful worker termination.
+ private const STOP_REQUEST = '{"stop":true}';
+
+ private Relay $relay;
+
+ /**
+ * @param Relay $relay
+ */
+ public function __construct(Relay $relay)
+ {
+ $this->relay = $relay;
+ }
+
+ /**
+ * Wait for incoming payload from the server. Must return null when worker stopped.
+ *
+ * @return Payload|null
+ * @throws GoridgeException
+ * @throws RoadRunnerException
+ */
+ public function waitPayload(): ?Payload
+ {
+ $frame = $this->relay->waitFrame();
+
+ if ($frame->hasFlag(Frame::CONTROL)) {
+ $continue = $this->handleControl($frame->payload);
+
+ if ($continue) {
+ return $this->waitPayload();
+ } else {
+ return null;
+ }
+ }
+
+ return new Payload(
+ substr($frame->payload, $frame->options[0]),
+ substr($frame->payload, 0, $frame->options[0])
+ );
+ }
+
+ /**
+ * Respond to the server with the processing result.
+ *
+ * @param Payload $payload
+ * @throws GoridgeException
+ */
+ public function respond(Payload $payload): void
+ {
+ $this->send($payload->body, $payload->header);
+ }
+
+ /**
+ * Respond to the server with an error. Error must be treated as TaskError and might not cause
+ * worker destruction.
+ *
+ * Example:
+ *
+ * $worker->error("invalid payload");
+ *
+ * @param string $message
+ */
+ public function error(string $message): void
+ {
+ $this->relay->send(new Frame($message, [], Frame::ERROR));
+ }
+
+ /**
+ * Terminate the process. Server must automatically pass task to the next available process.
+ * Worker will receive StopCommand context after calling this method.
+ *
+ * Attention, you MUST use continue; after invoking this method to let rr to properly
+ * stop worker.
+ *
+ * @throws GoridgeException
+ */
+ public function stop(): void
+ {
+ $this->send("", self::STOP_REQUEST);
+ }
+
+ /**
+ * @param string $body
+ * @param string|null $context
+ * @throws GoridgeException
+ */
+ public function send(string $body, string $context = null): void
+ {
+ $this->relay->send(new Frame(
+ (string) $context . $body,
+ [strlen((string) $context)]
+ ));
+ }
+
+ /**
+ * Return true if continue.
+ *
+ * @param string $header
+ * @return bool
+ *
+ * @throws RoadRunnerException
+ */
+ private function handleControl(string $header): bool
+ {
+ $command = json_decode($header, true);
+ if ($command === false) {
+ throw new RoadRunnerException('Invalid task header, JSON payload is expected');
+ }
+
+ switch (true) {
+ case !empty($command['pid']):
+ $this->relay->send(new Frame(sprintf('{"pid":%s}', getmypid()), [], Frame::CONTROL));
+ return true;
+
+ case !empty($command['stop']):
+ return false;
+
+ default:
+ throw new RoadRunnerException('Invalid task header, undefined control package');
+ }
+ }
+
+ /**
+ * Create Worker using global environment configuration.
+ *
+ * @return WorkerInterface
+ * @throws EnvironmentException
+ */
+ public static function create(): WorkerInterface
+ {
+ $env = Environment::fromGlobals();
+
+ return new static(\Spiral\Goridge\Relay::create($env->getRelayAddress()));
+ }
+}
diff --git a/tests/src/WorkerInterface.php b/tests/src/WorkerInterface.php
new file mode 100644
index 00000000..bf0b6e06
--- /dev/null
+++ b/tests/src/WorkerInterface.php
@@ -0,0 +1,55 @@
+<?php
+
+/**
+ * High-performance PHP process supervisor and load balancer written in Go.
+ *
+ * @author Wolfy-J
+ */
+
+declare(strict_types=1);
+
+namespace Spiral\RoadRunner;
+
+use Spiral\Goridge\Exception\GoridgeException;
+use Spiral\RoadRunner\Exception\RoadRunnerException;
+
+interface WorkerInterface
+{
+ /**
+ * Wait for incoming payload from the server. Must return null when worker stopped.
+ *
+ * @return Payload|null
+ * @throws GoridgeException
+ * @throws RoadRunnerException
+ */
+ public function waitPayload(): ?Payload;
+
+ /**
+ * Respond to the server with the processing result.
+ *
+ * @param Payload $payload
+ * @throws GoridgeException
+ */
+ public function respond(Payload $payload): void;
+
+ /**
+ * Respond to the server with an error. Error must be treated as TaskError and might not cause
+ * worker destruction.
+ *
+ * Example:
+ *
+ * $worker->error("invalid payload");
+ *
+ * @param string $error
+ * @throws GoridgeException
+ */
+ public function error(string $error): void;
+
+ /**
+ * Terminate the process. Server must automatically pass task to the next available process.
+ * Worker will receive stop command after calling this method.
+ *
+ * Attention, you MUST use continue; after invoking this method to let rr to properly stop worker.
+ */
+ public function stop(): void;
+}
diff --git a/tests/stop.php b/tests/stop.php
index 0100ad0f..f83d3f29 100644
--- a/tests/stop.php
+++ b/tests/stop.php
@@ -9,7 +9,7 @@ use Spiral\RoadRunner;
$rr = new RoadRunner\Worker($relay);
$used = false;
-while ($in = $rr->receive($ctx)) {
+while ($in = $rr->waitPayload()) {
try {
if ($used) {
// kill on second attempt