diff options
author | Wolfy-J <[email protected]> | 2018-05-29 13:03:34 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-05-29 13:03:34 +0300 |
commit | b8bc792b263a3891e125757a35cc563bb85f1a0b (patch) | |
tree | f7a9e6f2568220491a26f6544e4acf0ed62012bb /php | |
parent | 50f820833eeef8518b3b978b33c6f20391225162 (diff) |
nested observers
Diffstat (limited to 'php')
-rw-r--r-- | php/Exceptions/RoadRunnerException.php | 13 | ||||
-rw-r--r-- | php/Worker.php | 163 | ||||
-rw-r--r-- | php/tests/broken.php | 14 | ||||
-rw-r--r-- | php/tests/client.php | 35 | ||||
-rw-r--r-- | php/tests/delay.php | 18 | ||||
-rw-r--r-- | php/tests/echo.php | 17 | ||||
-rw-r--r-- | php/tests/error.php | 13 | ||||
-rw-r--r-- | php/tests/failboot.php | 3 | ||||
-rw-r--r-- | php/tests/head.php | 17 | ||||
-rw-r--r-- | php/tests/pid.php | 17 | ||||
-rw-r--r-- | php/tests/slow-client.php | 38 | ||||
-rw-r--r-- | php/tests/stop.php | 25 |
12 files changed, 373 insertions, 0 deletions
diff --git a/php/Exceptions/RoadRunnerException.php b/php/Exceptions/RoadRunnerException.php new file mode 100644 index 00000000..fa7b8da3 --- /dev/null +++ b/php/Exceptions/RoadRunnerException.php @@ -0,0 +1,13 @@ +<?php +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Wolfy-J + */ + +namespace Spiral\RoadRunner\Exceptions; + +class RoadRunnerException extends \RuntimeException +{ + +}
\ No newline at end of file diff --git a/php/Worker.php b/php/Worker.php new file mode 100644 index 00000000..5835baf2 --- /dev/null +++ b/php/Worker.php @@ -0,0 +1,163 @@ +<?php +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Wolfy-J + */ + +namespace Spiral\RoadRunner; + +use Spiral\Goridge\Exceptions\GoridgeException; +use Spiral\Goridge\RelayInterface as Relay; +use Spiral\RoadRunner\Exceptions\RoadRunnerException; + +/** + * Accepts connection from RoadRunner server over given Goridge relay. + * + * Example: + * + * $worker = new Worker(new Goridge\StreamRelay(STDIN, STDOUT)); + * while ($task = $worker->receive($context)) { + * $worker->send("DONE", json_encode($context)); + * } + */ +class Worker +{ + // Send as response context to request worker termination + const STOP = '{"stop":true}'; + + /** @var Relay */ + private $relay; + + /** + * @param Relay $relay + */ + public function __construct(Relay $relay) + { + $this->relay = $relay; + } + + /** + * Receive packet of information to process, returns null when process must be stopped. Might + * return Error to wrap error message from server. + * + * @param mixed $header + * + * @return \Error|null|string + * @throws GoridgeException + */ + public function receive(&$header) + { + $body = $this->relay->receiveSync($flags); + + if ($flags & Relay::PAYLOAD_CONTROL) { + if ($this->handleControl($body, $header, $flags)) { + // wait for the next command + return $this->receive($header); + } + + // Expect process termination + return null; + } + + if ($flags & Relay::PAYLOAD_ERROR) { + return new \Error($body); + } + + return $body; + } + + /** + * Respond to the server with result of task execution and execution context. + * + * Example: + * $worker->respond((string)$response->getBody(), json_encode($response->getHeaders())); + * + * @param string|null $payload + * @param string|null $header + */ + public function send(string $payload = null, string $header = null) + { + if (is_null($header)) { + $this->relay->send($header, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_NONE); + } else { + $this->relay->send($header, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW); + } + + $this->relay->send($payload, Relay::PAYLOAD_RAW); + } + + /** + * Respond to the server with an error. Error must be treated as TaskError and might not cause + * worker destruction. + * + * Example: + * + * $worker->error("invalid payload"); + * + * @param string $message + */ + public function error(string $message) + { + $this->relay->send( + $message, + Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW | Relay::PAYLOAD_ERROR + ); + } + + /** + * Terminate the process. Server must automatically pass task to the next available process. + * Worker will receive StopCommand context after calling this method. + * + * Attention, you MUST use continue; after invoking this method to let rr to properly + * stop worker. + * + * @throws GoridgeException + */ + public function stop() + { + $this->send(null, self::STOP); + } + + /** + * Handles incoming control command payload and executes it if required. + * + * @param string $body + * @param mixed $header Exported context (if any). + * @param int $flags + * + * @returns bool True when continue processing. + * + * @throws RoadRunnerException + */ + private function handleControl(string $body = null, &$header = null, int $flags): bool + { + $header = $body; + if (is_null($body) || $flags & Relay::PAYLOAD_RAW) { + // empty or raw prefix + return true; + } + + $p = json_decode($body, true); + if ($p === false) { + throw new RoadRunnerException("invalid task context, JSON payload is expected"); + } + + // PID negotiation (socket connections only) + if (!empty($p['pid'])) { + $this->relay->send( + sprintf('{"pid":%s}', getmypid()), Relay::PAYLOAD_CONTROL + ); + } + + // termination request + if (!empty($p['stop'])) { + return false; + } + + // parsed header + $header = $p; + + return true; + } +}
\ No newline at end of file diff --git a/php/tests/broken.php b/php/tests/broken.php new file mode 100644 index 00000000..b1a3839e --- /dev/null +++ b/php/tests/broken.php @@ -0,0 +1,14 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ + +use Spiral\Goridge; +use Spiral\RoadRunner; + +$rr = new RoadRunner\Worker($relay); + +while ($in = $rr->receive($ctx)) { + echo undefined_function(); + $rr->send((string)$in); +}
\ No newline at end of file diff --git a/php/tests/client.php b/php/tests/client.php new file mode 100644 index 00000000..31caa410 --- /dev/null +++ b/php/tests/client.php @@ -0,0 +1,35 @@ +<?php + +use Spiral\Goridge; + +ini_set('display_errors', 'stderr'); +require dirname(__DIR__) . "/vendor/autoload.php"; + +if (count($argv) < 3) { + die("need 2 arguments"); +} + +list($test, $goridge) = [$argv[1], $argv[2]]; + +switch ($goridge) { + case "pipes": + $relay = new Goridge\StreamRelay(STDIN, STDOUT); + break; + + case "tcp": + $relay = new Goridge\SocketRelay("localhost", 9007); + break; + + case "unix": + $relay = new Goridge\SocketRelay( + "sock.unix", + null, + Goridge\SocketRelay::SOCK_UNIX + ); + break; + + default: + die("invalid protocol selection"); +} + +require_once sprintf("%s/%s.php", __DIR__, $test); diff --git a/php/tests/delay.php b/php/tests/delay.php new file mode 100644 index 00000000..bfde2fc4 --- /dev/null +++ b/php/tests/delay.php @@ -0,0 +1,18 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ + +use Spiral\Goridge; +use Spiral\RoadRunner; + +$rr = new RoadRunner\Worker($relay); + +while ($in = $rr->receive($ctx)) { + try { + usleep($in * 1000); + $rr->send(''); + } catch (\Throwable $e) { + $rr->error((string)$e); + } +}
\ No newline at end of file diff --git a/php/tests/echo.php b/php/tests/echo.php new file mode 100644 index 00000000..ba58ff30 --- /dev/null +++ b/php/tests/echo.php @@ -0,0 +1,17 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ + +use Spiral\Goridge; +use Spiral\RoadRunner; + +$rr = new RoadRunner\Worker($relay); + +while ($in = $rr->receive($ctx)) { + try { + $rr->send((string)$in); + } catch (\Throwable $e) { + $rr->error((string)$e); + } +}
\ No newline at end of file diff --git a/php/tests/error.php b/php/tests/error.php new file mode 100644 index 00000000..ebd3418b --- /dev/null +++ b/php/tests/error.php @@ -0,0 +1,13 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ + +use Spiral\Goridge; +use Spiral\RoadRunner; + +$rr = new RoadRunner\Worker($relay); + +while ($in = $rr->receive($ctx)) { + $rr->error((string)$in); +}
\ No newline at end of file diff --git a/php/tests/failboot.php b/php/tests/failboot.php new file mode 100644 index 00000000..fa8b96f6 --- /dev/null +++ b/php/tests/failboot.php @@ -0,0 +1,3 @@ +<?php +ini_set('display_errors', 'stderr'); +throw new Error("failboot error");
\ No newline at end of file diff --git a/php/tests/head.php b/php/tests/head.php new file mode 100644 index 00000000..4f4e4061 --- /dev/null +++ b/php/tests/head.php @@ -0,0 +1,17 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ + +use Spiral\Goridge; +use Spiral\RoadRunner; + +$rr = new RoadRunner\Worker($relay); + +while ($in = $rr->receive($ctx)) { + try { + $rr->send("", (string)$ctx); + } catch (\Throwable $e) { + $rr->error((string)$e); + } +}
\ No newline at end of file diff --git a/php/tests/pid.php b/php/tests/pid.php new file mode 100644 index 00000000..a8cfa229 --- /dev/null +++ b/php/tests/pid.php @@ -0,0 +1,17 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ + +use Spiral\Goridge; +use Spiral\RoadRunner; + +$rr = new RoadRunner\Worker($relay); + +while ($in = $rr->receive($ctx)) { + try { + $rr->send((string)getmypid()); + } catch (\Throwable $e) { + $rr->error((string)$e); + } +}
\ No newline at end of file diff --git a/php/tests/slow-client.php b/php/tests/slow-client.php new file mode 100644 index 00000000..f09142b5 --- /dev/null +++ b/php/tests/slow-client.php @@ -0,0 +1,38 @@ +<?php + +use Spiral\Goridge; + +ini_set('display_errors', 'stderr'); +require dirname(__DIR__) . "/vendor/autoload.php"; + +if (count($argv) < 3) { + die("need 2 arguments"); +} + +list($test, $goridge, $bootDelay, $shutdownDelay) = [$argv[1], $argv[2], $argv[3], $argv[4]]; + +switch ($goridge) { + case "pipes": + $relay = new Goridge\StreamRelay(STDIN, STDOUT); + break; + + case "tcp": + $relay = new Goridge\SocketRelay("localhost", 9007); + break; + + case "unix": + $relay = new Goridge\SocketRelay( + "sock.unix", + null, + Goridge\SocketRelay::SOCK_UNIX + ); + + break; + + default: + die("invalid protocol selection"); +} + +usleep($bootDelay * 1000); +require_once sprintf("%s/%s.php", __DIR__, $test); +usleep($shutdownDelay * 1000);
\ No newline at end of file diff --git a/php/tests/stop.php b/php/tests/stop.php new file mode 100644 index 00000000..caa485d6 --- /dev/null +++ b/php/tests/stop.php @@ -0,0 +1,25 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ + +use Spiral\Goridge; +use Spiral\RoadRunner; + +$rr = new RoadRunner\Worker($relay); + +$used = false; +while ($in = $rr->receive($ctx)) { + try { + if ($used) { + // kill on second attempt + $rr->stop(); + continue; + } + + $used = true; + $rr->send((string)getmypid()); + } catch (\Throwable $e) { + $rr->error((string)$e); + } +}
\ No newline at end of file |