summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Exception/RoadRunnerException.php13
-rw-r--r--src/Exceptions/RoadRunnerException.php17
-rw-r--r--src/PSR7Client.php158
-rw-r--r--src/Worker.php166
4 files changed, 354 insertions, 0 deletions
diff --git a/src/Exception/RoadRunnerException.php b/src/Exception/RoadRunnerException.php
new file mode 100644
index 00000000..ee99bb2b
--- /dev/null
+++ b/src/Exception/RoadRunnerException.php
@@ -0,0 +1,13 @@
+<?php
+/**
+ * High-performance PHP process supervisor and load balancer written in Go
+ *
+ * @author Wolfy-J
+ */
+
+namespace Spiral\RoadRunner\Exception;
+
+class RoadRunnerException extends \Spiral\RoadRunner\Exceptions\RoadRunnerException
+{
+
+} \ No newline at end of file
diff --git a/src/Exceptions/RoadRunnerException.php b/src/Exceptions/RoadRunnerException.php
new file mode 100644
index 00000000..1a5da18c
--- /dev/null
+++ b/src/Exceptions/RoadRunnerException.php
@@ -0,0 +1,17 @@
+<?php
+/**
+ * Spiral Framework.
+ *
+ * @license MIT
+ * @author Anton Titov (Wolfy-J)
+ */
+
+namespace Spiral\RoadRunner\Exceptions;
+
+/**
+ * @deprecated use \Spiral\RoadRunner\Exception\RoadRunnerException instead
+ */
+class RoadRunnerException extends \RuntimeException
+{
+
+} \ No newline at end of file
diff --git a/src/PSR7Client.php b/src/PSR7Client.php
new file mode 100644
index 00000000..e8d93fe8
--- /dev/null
+++ b/src/PSR7Client.php
@@ -0,0 +1,158 @@
+<?php
+/**
+ * High-performance PHP process supervisor and load balancer written in Go
+ *
+ * @author Wolfy-J
+ */
+
+namespace Spiral\RoadRunner;
+
+use Psr\Http\Message\ResponseInterface;
+use Psr\Http\Message\ServerRequestInterface;
+use Zend\Diactoros;
+
+/**
+ * Manages PSR-7 request and response.
+ */
+class PSR7Client
+{
+ /**
+ * @varWorker
+ */
+ private $worker;
+
+ /**
+ * @param Worker $worker
+ */
+ public function __construct(Worker $worker)
+ {
+ $this->worker = $worker;
+ }
+
+ /**
+ * @return Worker
+ */
+ public function getWorker(): Worker
+ {
+ return $this->worker;
+ }
+
+ /**
+ * @return ServerRequestInterface|null
+ */
+ public function acceptRequest()
+ {
+ $body = $this->worker->receive($ctx);
+ if (empty($body) && empty($ctx)) {
+ // termination request
+ return null;
+ }
+
+ if (empty($ctx = json_decode($ctx, true))) {
+ // invalid context
+ return null;
+ }
+
+ parse_str($ctx['rawQuery'], $query);
+
+ $bodyStream = 'php://input';
+ $parsedBody = null;
+ if ($ctx['parsed']) {
+ $parsedBody = json_decode($body, true);
+ } elseif ($body != null) {
+ $bodyStream = new Diactoros\Stream("php://memory", "rwb");
+ $bodyStream->write($body);
+ }
+
+ $_SERVER = $this->configureServer($ctx);
+
+ $request = new Diactoros\ServerRequest(
+ $_SERVER,
+ $this->wrapUploads($ctx['uploads']),
+ $ctx['uri'],
+ $ctx['method'],
+ $bodyStream,
+ $ctx['headers'],
+ $ctx['cookies'],
+ $query,
+ $parsedBody,
+ $ctx['protocol']
+ );
+
+ if (!empty($ctx['attributes'])) {
+ foreach ($ctx['attributes'] as $key => $value) {
+ $request = $request->withAttribute($key, $value);
+ }
+ }
+
+ return $request;
+ }
+
+ /**
+ * Send response to the application server.
+ *
+ * @param ResponseInterface $response
+ */
+ public function respond(ResponseInterface $response)
+ {
+ $headers = $response->getHeaders();
+ if (empty($headers)) {
+ // this is required to represent empty header set as map and not as array
+ $headers = new \stdClass();
+ }
+
+ $this->worker->send($response->getBody(), json_encode([
+ 'status' => $response->getStatusCode(),
+ 'headers' => $headers
+ ]));
+ }
+
+ /**
+ * Returns altered copy of _SERVER variable. Sets ip-address,
+ * request-time and other values.
+ *
+ * @param array $ctx
+ * @return array
+ */
+ protected function configureServer(array $ctx): array
+ {
+ $server = $_SERVER;
+ $server['REQUEST_TIME'] = time();
+ $server['REQUEST_TIME_FLOAT'] = microtime(true);
+ $server['REMOTE_ADDR'] = $ctx['attributes']['ipAddress'] ?? $ctx['remoteAddr'] ?? '127.0.0.1';
+
+ return $server;
+ }
+
+ /**
+ * Wraps all uploaded files with UploadedFile.
+ *
+ * @param array $files
+ *
+ * @return array
+ */
+ private function wrapUploads($files): array
+ {
+ if (empty($files)) {
+ return [];
+ }
+
+ $result = [];
+ foreach ($files as $index => $f) {
+ if (!isset($f['name'])) {
+ $result[$index] = $this->wrapUploads($f);
+ continue;
+ }
+
+ $result[$index] = new Diactoros\UploadedFile(
+ $f['tmpName'],
+ $f['size'],
+ $f['error'],
+ $f['name'],
+ $f['mime']
+ );
+ }
+
+ return $result;
+ }
+} \ No newline at end of file
diff --git a/src/Worker.php b/src/Worker.php
new file mode 100644
index 00000000..4405cd70
--- /dev/null
+++ b/src/Worker.php
@@ -0,0 +1,166 @@
+<?php
+/**
+ * High-performance PHP process supervisor and load balancer written in Go
+ *
+ * @author Wolfy-J
+ */
+
+namespace Spiral\RoadRunner;
+
+use Spiral\Goridge\Exceptions\GoridgeException;
+use Spiral\Goridge\RelayInterface as Relay;
+use Spiral\RoadRunner\Exception\RoadRunnerException;
+
+/**
+ * Accepts connection from RoadRunner server over given Goridge relay.
+ *
+ * Example:
+ *
+ * $worker = new Worker(new Goridge\StreamRelay(STDIN, STDOUT));
+ * while ($task = $worker->receive($context)) {
+ * $worker->send("DONE", json_encode($context));
+ * }
+ */
+class Worker
+{
+ // Send as response context to request worker termination
+ const STOP = '{"stop":true}';
+
+ /** @var Relay */
+ private $relay;
+
+ /**
+ * @param Relay $relay
+ */
+ public function __construct(Relay $relay)
+ {
+ $this->relay = $relay;
+ }
+
+ /**
+ * Receive packet of information to process, returns null when process must be stopped. Might
+ * return Error to wrap error message from server.
+ *
+ * @param mixed $header
+ *
+ * @return \Error|null|string
+ * @throws GoridgeException
+ */
+ public function receive(&$header)
+ {
+ $body = $this->relay->receiveSync($flags);
+
+ if ($flags & Relay::PAYLOAD_CONTROL) {
+ if ($this->handleControl($body, $header, $flags)) {
+ // wait for the next command
+ return $this->receive($header);
+ }
+
+ // no context for the termination.
+ $header = null;
+
+ // Expect process termination
+ return null;
+ }
+
+ if ($flags & Relay::PAYLOAD_ERROR) {
+ return new \Error($body);
+ }
+
+ return $body;
+ }
+
+ /**
+ * Respond to the server with result of task execution and execution context.
+ *
+ * Example:
+ * $worker->respond((string)$response->getBody(), json_encode($response->getHeaders()));
+ *
+ * @param string|null $payload
+ * @param string|null $header
+ */
+ public function send(string $payload = null, string $header = null)
+ {
+ if (is_null($header)) {
+ $this->relay->send($header, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_NONE);
+ } else {
+ $this->relay->send($header, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW);
+ }
+
+ $this->relay->send($payload, Relay::PAYLOAD_RAW);
+ }
+
+ /**
+ * Respond to the server with an error. Error must be treated as TaskError and might not cause
+ * worker destruction.
+ *
+ * Example:
+ *
+ * $worker->error("invalid payload");
+ *
+ * @param string $message
+ */
+ public function error(string $message)
+ {
+ $this->relay->send(
+ $message,
+ Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW | Relay::PAYLOAD_ERROR
+ );
+ }
+
+ /**
+ * Terminate the process. Server must automatically pass task to the next available process.
+ * Worker will receive StopCommand context after calling this method.
+ *
+ * Attention, you MUST use continue; after invoking this method to let rr to properly
+ * stop worker.
+ *
+ * @throws GoridgeException
+ */
+ public function stop()
+ {
+ $this->send(null, self::STOP);
+ }
+
+ /**
+ * Handles incoming control command payload and executes it if required.
+ *
+ * @param string $body
+ * @param mixed $header Exported context (if any).
+ * @param int $flags
+ *
+ * @returns bool True when continue processing.
+ *
+ * @throws RoadRunnerException
+ */
+ private function handleControl(string $body = null, &$header = null, int $flags): bool
+ {
+ $header = $body;
+ if (is_null($body) || $flags & Relay::PAYLOAD_RAW) {
+ // empty or raw prefix
+ return true;
+ }
+
+ $p = json_decode($body, true);
+ if ($p === false) {
+ throw new RoadRunnerException("invalid task context, JSON payload is expected");
+ }
+
+ // PID negotiation (socket connections only)
+ if (!empty($p['pid'])) {
+ $this->relay->send(
+ sprintf('{"pid":%s}', getmypid()), Relay::PAYLOAD_CONTROL
+ );
+ }
+
+ // termination request
+ if (!empty($p['stop'])) {
+ return false;
+ }
+
+ // parsed header
+ $header = $p;
+
+ return true;
+ }
+} \ No newline at end of file