summaryrefslogtreecommitdiff
path: root/tests/src/Worker.php
diff options
context:
space:
mode:
Diffstat (limited to 'tests/src/Worker.php')
-rw-r--r--tests/src/Worker.php162
1 files changed, 0 insertions, 162 deletions
diff --git a/tests/src/Worker.php b/tests/src/Worker.php
deleted file mode 100644
index 53cf6cef..00000000
--- a/tests/src/Worker.php
+++ /dev/null
@@ -1,162 +0,0 @@
-<?php
-
-/**
- * High-performance PHP process supervisor and load balancer written in Go.
- *
- * @author Wolfy-J
- */
-
-declare(strict_types=1);
-
-namespace Spiral\RoadRunner;
-
-use Spiral\Goridge\Exception\GoridgeException;
-use Spiral\Goridge\Frame;
-use Spiral\Goridge\RelayInterface as Relay;
-use Spiral\RoadRunner\Exception\EnvironmentException;
-use Spiral\RoadRunner\Exception\RoadRunnerException;
-
-/**
- * Accepts connection from RoadRunner server over given Goridge relay.
- *
- * $worker = Worker::create();
- * while ($p = $worker->waitPayload()) {
- * $worker->send(new Payload("DONE", json_encode($context)));
- * }
- */
-class Worker implements WorkerInterface
-{
- // Request graceful worker termination.
- private const STOP_REQUEST = '{"stop":true}';
-
- private Relay $relay;
-
- /**
- * @param Relay $relay
- */
- public function __construct(Relay $relay)
- {
- $this->relay = $relay;
- }
-
- /**
- * Wait for incoming payload from the server. Must return null when worker stopped.
- *
- * @return Payload|null
- * @throws GoridgeException
- * @throws RoadRunnerException
- */
- public function waitPayload(): ?Payload
- {
- $frame = $this->relay->waitFrame();
-
- if ($frame->hasFlag(Frame::CONTROL)) {
- $continue = $this->handleControl($frame->payload);
-
- if ($continue) {
- return $this->waitPayload();
- } else {
- return null;
- }
- }
-
- return new Payload(
- substr($frame->payload, $frame->options[0]),
- substr($frame->payload, 0, $frame->options[0])
- );
- }
-
- /**
- * Respond to the server with the processing result.
- *
- * @param Payload $payload
- * @throws GoridgeException
- */
- public function respond(Payload $payload): void
- {
- $this->send($payload->body, $payload->header);
- }
-
- /**
- * Respond to the server with an error. Error must be treated as TaskError and might not cause
- * worker destruction.
- *
- * Example:
- *
- * $worker->error("invalid payload");
- *
- * @param string $message
- */
- public function error(string $message): void
- {
- $this->relay->send(new Frame($message, [], Frame::ERROR));
- }
-
- /**
- * Terminate the process. Server must automatically pass task to the next available process.
- * Worker will receive StopCommand context after calling this method.
- *
- * Attention, you MUST use continue; after invoking this method to let rr to properly
- * stop worker.
- *
- * @throws GoridgeException
- */
- public function stop(): void
- {
- $this->send("", self::STOP_REQUEST);
- }
-
- /**
- * @param string $body
- * @param string|null $context
- * @throws GoridgeException
- */
- public function send(string $body, string $context = null): void
- {
- $this->relay->send(new Frame(
- (string) $context . $body,
- [strlen((string) $context)]
- ));
- }
-
- /**
- * Return true if continue.
- *
- * @param string $header
- * @return bool
- *
- * @throws RoadRunnerException
- */
- private function handleControl(string $header): bool
- {
- $command = json_decode($header, true);
- if ($command === false) {
- throw new RoadRunnerException('Invalid task header, JSON payload is expected');
- }
-
- switch (true) {
- case !empty($command['pid']):
- $this->relay->send(new Frame(sprintf('{"pid":%s}', getmypid()), [], Frame::CONTROL));
- return true;
-
- case !empty($command['stop']):
- return false;
-
- default:
- throw new RoadRunnerException('Invalid task header, undefined control package');
- }
- }
-
- /**
- * Create Worker using global environment configuration.
- *
- * @return WorkerInterface
- * @throws EnvironmentException
- */
- public static function create(): WorkerInterface
- {
- $env = Environment::fromGlobals();
-
- return new static(\Spiral\Goridge\Relay::create($env->getRelayAddress()));
- }
-}