diff options
author | Wolfy-J <[email protected]> | 2017-12-26 19:14:53 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2017-12-26 19:14:53 +0300 |
commit | e229d83dea4bbe9d0cfe6569c8fbe239690aafb9 (patch) | |
tree | 2d4887ffdb167d660b705415f0617458490d0b9f /source/Worker.php |
init
Diffstat (limited to 'source/Worker.php')
-rw-r--r-- | source/Worker.php | 162 |
1 files changed, 162 insertions, 0 deletions
diff --git a/source/Worker.php b/source/Worker.php new file mode 100644 index 00000000..d31b48bd --- /dev/null +++ b/source/Worker.php @@ -0,0 +1,162 @@ +<?php +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Wolfy-J + */ + +namespace Spiral\RoadRunner; + +use Spiral\Goridge\Exceptions\GoridgeException; +use Spiral\Goridge\RelayInterface as Relay; +use Spiral\RoadRunner\Exceptions\RoadRunnerException; + +/** + * Accepts connection from RoadRunner server over given Goridge relay. + * + * Example: + * + * $worker = new Worker(new Goridge\StreamRelay(STDIN, STDOUT)); + * while ($task = $worker->receive($context)) { + * $worker->send("DONE", json_encode($context)); + * } + */ +class Worker +{ + // Must be set as context value in order to perform controlled demolition of worker + const TERMINATE = "TERMINATE"; + + // Must be set as context value in order to represent content as an error + const ERROR = "ERROR"; + + /** @var Relay */ + private $relay; + + /** + * @param Relay $relay + */ + public function __construct(Relay $relay) + { + $this->relay = $relay; + } + + /** + * Receive packet of information to process, returns null when process must be stopped. Might + * return Error to wrap error message from server. + * + * @param array $context Contains parsed context array send by the server. + * + * @return \Error|null|string + * @throws GoridgeException + */ + public function receive(&$context) + { + $body = $this->relay->receiveSync($flags); + + if ($flags & Relay::PAYLOAD_CONTROL) { + if ($this->handleControl($body, $context)) { + // wait for the next command + return $this->receive($context); + } + + // Expect process termination + return null; + } + + if ($flags & Relay::PAYLOAD_ERROR) { + return new \Error($body); + } + + return $body; + } + + /** + * Respond to the server with result of task execution and execution context. + * + * Example: + * $worker->respond((string)$response->getBody(), json_encode($response->getHeaders())); + * + * @param string $payload + * @param string $context + */ + public function send(string $payload, string $context = null) + { + if (is_null($context)) { + $this->relay->send($context, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_NONE); + } else { + $this->relay->send($context, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW); + } + + $this->relay->send($payload, Relay::PAYLOAD_RAW); + } + + /** + * Respond to the server with an error. Error must be treated as TaskError and might not cause + * worker destruction. + * + * Example: + * + * $worker->error("invalid payload"); + * + * @param string $message + */ + public function error(string $message) + { + $this->relay->send( + $message, + Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW | Relay::PAYLOAD_ERROR + ); + } + + /** + * Terminate the process. Server must automatically pass task to the next available process. + * Worker will receive TerminateCommand context after calling this method. + * + * @throws GoridgeException + */ + public function terminate() + { + $this->send(null, self::TERMINATE); + } + + /** + * Handles incoming control command payload and executes it if required. + * + * @param string $body + * @param array $context Exported context (if any). + * + * @returns bool True when continue processing. + * + * @throws RoadRunnerException + */ + private function handleControl(string $body = null, &$context = null): bool + { + if (is_null($body)) { + // empty prefix + return true; + } + + $parsed = json_decode($body, true); + if ($parsed === false) { + throw new RoadRunnerException("invalid task context, JSON payload is expected"); + } + + // PID negotiation (socket connections only) + if (!empty($parsed['pid'])) { + $this->relay->send(json_encode([ + 'pid' => getmypid(), + 'parent' => $parsed['pid'], + ]), Relay::PAYLOAD_CONTROL); + } + + // termination request + if (!empty($parsed['terminate'])) { + return false; + } + + // not a command but execution context + $context = $parsed; + + return true; + } +}
\ No newline at end of file |