diff options
author | Wolfy-J <[email protected]> | 2018-09-23 14:00:08 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-09-23 14:00:08 +0300 |
commit | f7d64d8e8592951ed5a0f0b06db608bc73786ea2 (patch) | |
tree | 1bdef1618288221495f1f38bb528e71cca907e2e /src | |
parent | 4815ebb760672c7fa541c941d7f47cd316656020 (diff) |
- new directory structure
- singular exception
- starting exception deprecation
Diffstat (limited to 'src')
-rw-r--r-- | src/Exception/RoadRunnerException.php | 13 | ||||
-rw-r--r-- | src/Exceptions/RoadRunnerException.php | 17 | ||||
-rw-r--r-- | src/PSR7Client.php | 158 | ||||
-rw-r--r-- | src/Worker.php | 166 |
4 files changed, 354 insertions, 0 deletions
diff --git a/src/Exception/RoadRunnerException.php b/src/Exception/RoadRunnerException.php new file mode 100644 index 00000000..ee99bb2b --- /dev/null +++ b/src/Exception/RoadRunnerException.php @@ -0,0 +1,13 @@ +<?php +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Wolfy-J + */ + +namespace Spiral\RoadRunner\Exception; + +class RoadRunnerException extends \Spiral\RoadRunner\Exceptions\RoadRunnerException +{ + +}
\ No newline at end of file diff --git a/src/Exceptions/RoadRunnerException.php b/src/Exceptions/RoadRunnerException.php new file mode 100644 index 00000000..1a5da18c --- /dev/null +++ b/src/Exceptions/RoadRunnerException.php @@ -0,0 +1,17 @@ +<?php +/** + * Spiral Framework. + * + * @license MIT + * @author Anton Titov (Wolfy-J) + */ + +namespace Spiral\RoadRunner\Exceptions; + +/** + * @deprecated use \Spiral\RoadRunner\Exception\RoadRunnerException instead + */ +class RoadRunnerException extends \RuntimeException +{ + +}
\ No newline at end of file diff --git a/src/PSR7Client.php b/src/PSR7Client.php new file mode 100644 index 00000000..e8d93fe8 --- /dev/null +++ b/src/PSR7Client.php @@ -0,0 +1,158 @@ +<?php +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Wolfy-J + */ + +namespace Spiral\RoadRunner; + +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\ServerRequestInterface; +use Zend\Diactoros; + +/** + * Manages PSR-7 request and response. + */ +class PSR7Client +{ + /** + * @varWorker + */ + private $worker; + + /** + * @param Worker $worker + */ + public function __construct(Worker $worker) + { + $this->worker = $worker; + } + + /** + * @return Worker + */ + public function getWorker(): Worker + { + return $this->worker; + } + + /** + * @return ServerRequestInterface|null + */ + public function acceptRequest() + { + $body = $this->worker->receive($ctx); + if (empty($body) && empty($ctx)) { + // termination request + return null; + } + + if (empty($ctx = json_decode($ctx, true))) { + // invalid context + return null; + } + + parse_str($ctx['rawQuery'], $query); + + $bodyStream = 'php://input'; + $parsedBody = null; + if ($ctx['parsed']) { + $parsedBody = json_decode($body, true); + } elseif ($body != null) { + $bodyStream = new Diactoros\Stream("php://memory", "rwb"); + $bodyStream->write($body); + } + + $_SERVER = $this->configureServer($ctx); + + $request = new Diactoros\ServerRequest( + $_SERVER, + $this->wrapUploads($ctx['uploads']), + $ctx['uri'], + $ctx['method'], + $bodyStream, + $ctx['headers'], + $ctx['cookies'], + $query, + $parsedBody, + $ctx['protocol'] + ); + + if (!empty($ctx['attributes'])) { + foreach ($ctx['attributes'] as $key => $value) { + $request = $request->withAttribute($key, $value); + } + } + + return $request; + } + + /** + * Send response to the application server. + * + * @param ResponseInterface $response + */ + public function respond(ResponseInterface $response) + { + $headers = $response->getHeaders(); + if (empty($headers)) { + // this is required to represent empty header set as map and not as array + $headers = new \stdClass(); + } + + $this->worker->send($response->getBody(), json_encode([ + 'status' => $response->getStatusCode(), + 'headers' => $headers + ])); + } + + /** + * Returns altered copy of _SERVER variable. Sets ip-address, + * request-time and other values. + * + * @param array $ctx + * @return array + */ + protected function configureServer(array $ctx): array + { + $server = $_SERVER; + $server['REQUEST_TIME'] = time(); + $server['REQUEST_TIME_FLOAT'] = microtime(true); + $server['REMOTE_ADDR'] = $ctx['attributes']['ipAddress'] ?? $ctx['remoteAddr'] ?? '127.0.0.1'; + + return $server; + } + + /** + * Wraps all uploaded files with UploadedFile. + * + * @param array $files + * + * @return array + */ + private function wrapUploads($files): array + { + if (empty($files)) { + return []; + } + + $result = []; + foreach ($files as $index => $f) { + if (!isset($f['name'])) { + $result[$index] = $this->wrapUploads($f); + continue; + } + + $result[$index] = new Diactoros\UploadedFile( + $f['tmpName'], + $f['size'], + $f['error'], + $f['name'], + $f['mime'] + ); + } + + return $result; + } +}
\ No newline at end of file diff --git a/src/Worker.php b/src/Worker.php new file mode 100644 index 00000000..4405cd70 --- /dev/null +++ b/src/Worker.php @@ -0,0 +1,166 @@ +<?php +/** + * High-performance PHP process supervisor and load balancer written in Go + * + * @author Wolfy-J + */ + +namespace Spiral\RoadRunner; + +use Spiral\Goridge\Exceptions\GoridgeException; +use Spiral\Goridge\RelayInterface as Relay; +use Spiral\RoadRunner\Exception\RoadRunnerException; + +/** + * Accepts connection from RoadRunner server over given Goridge relay. + * + * Example: + * + * $worker = new Worker(new Goridge\StreamRelay(STDIN, STDOUT)); + * while ($task = $worker->receive($context)) { + * $worker->send("DONE", json_encode($context)); + * } + */ +class Worker +{ + // Send as response context to request worker termination + const STOP = '{"stop":true}'; + + /** @var Relay */ + private $relay; + + /** + * @param Relay $relay + */ + public function __construct(Relay $relay) + { + $this->relay = $relay; + } + + /** + * Receive packet of information to process, returns null when process must be stopped. Might + * return Error to wrap error message from server. + * + * @param mixed $header + * + * @return \Error|null|string + * @throws GoridgeException + */ + public function receive(&$header) + { + $body = $this->relay->receiveSync($flags); + + if ($flags & Relay::PAYLOAD_CONTROL) { + if ($this->handleControl($body, $header, $flags)) { + // wait for the next command + return $this->receive($header); + } + + // no context for the termination. + $header = null; + + // Expect process termination + return null; + } + + if ($flags & Relay::PAYLOAD_ERROR) { + return new \Error($body); + } + + return $body; + } + + /** + * Respond to the server with result of task execution and execution context. + * + * Example: + * $worker->respond((string)$response->getBody(), json_encode($response->getHeaders())); + * + * @param string|null $payload + * @param string|null $header + */ + public function send(string $payload = null, string $header = null) + { + if (is_null($header)) { + $this->relay->send($header, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_NONE); + } else { + $this->relay->send($header, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW); + } + + $this->relay->send($payload, Relay::PAYLOAD_RAW); + } + + /** + * Respond to the server with an error. Error must be treated as TaskError and might not cause + * worker destruction. + * + * Example: + * + * $worker->error("invalid payload"); + * + * @param string $message + */ + public function error(string $message) + { + $this->relay->send( + $message, + Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW | Relay::PAYLOAD_ERROR + ); + } + + /** + * Terminate the process. Server must automatically pass task to the next available process. + * Worker will receive StopCommand context after calling this method. + * + * Attention, you MUST use continue; after invoking this method to let rr to properly + * stop worker. + * + * @throws GoridgeException + */ + public function stop() + { + $this->send(null, self::STOP); + } + + /** + * Handles incoming control command payload and executes it if required. + * + * @param string $body + * @param mixed $header Exported context (if any). + * @param int $flags + * + * @returns bool True when continue processing. + * + * @throws RoadRunnerException + */ + private function handleControl(string $body = null, &$header = null, int $flags): bool + { + $header = $body; + if (is_null($body) || $flags & Relay::PAYLOAD_RAW) { + // empty or raw prefix + return true; + } + + $p = json_decode($body, true); + if ($p === false) { + throw new RoadRunnerException("invalid task context, JSON payload is expected"); + } + + // PID negotiation (socket connections only) + if (!empty($p['pid'])) { + $this->relay->send( + sprintf('{"pid":%s}', getmypid()), Relay::PAYLOAD_CONTROL + ); + } + + // termination request + if (!empty($p['stop'])) { + return false; + } + + // parsed header + $header = $p; + + return true; + } +}
\ No newline at end of file |