summaryrefslogtreecommitdiff
path: root/tests/src/Worker.php
blob: 53cf6cefd49387d20682023403f316aa5f2999af (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
<?php

/**
 * High-performance PHP process supervisor and load balancer written in Go.
 *
 * @author Wolfy-J
 */

declare(strict_types=1);

namespace Spiral\RoadRunner;

use Spiral\Goridge\Exception\GoridgeException;
use Spiral\Goridge\Frame;
use Spiral\Goridge\RelayInterface as Relay;
use Spiral\RoadRunner\Exception\EnvironmentException;
use Spiral\RoadRunner\Exception\RoadRunnerException;

/**
 * Accepts connection from RoadRunner server over given Goridge relay.
 *
 * $worker = Worker::create();
 * while ($p = $worker->waitPayload()) {
 *      $worker->send(new Payload("DONE", json_encode($context)));
 * }
 */
class Worker implements WorkerInterface
{
    // Request graceful worker termination.
    private const STOP_REQUEST = '{"stop":true}';

    private Relay $relay;

    /**
     * @param Relay $relay
     */
    public function __construct(Relay $relay)
    {
        $this->relay = $relay;
    }

    /**
     * Wait for incoming payload from the server. Must return null when worker stopped.
     *
     * @return Payload|null
     * @throws GoridgeException
     * @throws RoadRunnerException
     */
    public function waitPayload(): ?Payload
    {
        $frame = $this->relay->waitFrame();

        if ($frame->hasFlag(Frame::CONTROL)) {
            $continue = $this->handleControl($frame->payload);

            if ($continue) {
                return $this->waitPayload();
            } else {
                return null;
            }
        }

        return new Payload(
            substr($frame->payload, $frame->options[0]),
            substr($frame->payload, 0, $frame->options[0])
        );
    }

    /**
     * Respond to the server with the processing result.
     *
     * @param Payload $payload
     * @throws GoridgeException
     */
    public function respond(Payload $payload): void
    {
        $this->send($payload->body, $payload->header);
    }

    /**
     * Respond to the server with an error. Error must be treated as TaskError and might not cause
     * worker destruction.
     *
     * Example:
     *
     * $worker->error("invalid payload");
     *
     * @param string $message
     */
    public function error(string $message): void
    {
        $this->relay->send(new Frame($message, [], Frame::ERROR));
    }

    /**
     * Terminate the process. Server must automatically pass task to the next available process.
     * Worker will receive StopCommand context after calling this method.
     *
     * Attention, you MUST use continue; after invoking this method to let rr to properly
     * stop worker.
     *
     * @throws GoridgeException
     */
    public function stop(): void
    {
        $this->send("", self::STOP_REQUEST);
    }

    /**
     * @param string      $body
     * @param string|null $context
     * @throws GoridgeException
     */
    public function send(string $body, string $context = null): void
    {
        $this->relay->send(new Frame(
            (string) $context . $body,
            [strlen((string) $context)]
        ));
    }

    /**
     * Return true if continue.
     *
     * @param string $header
     * @return bool
     *
     * @throws RoadRunnerException
     */
    private function handleControl(string $header): bool
    {
        $command = json_decode($header, true);
        if ($command === false) {
            throw new RoadRunnerException('Invalid task header, JSON payload is expected');
        }

        switch (true) {
            case !empty($command['pid']):
                $this->relay->send(new Frame(sprintf('{"pid":%s}', getmypid()), [], Frame::CONTROL));
                return true;

            case !empty($command['stop']):
                return false;

            default:
                throw new RoadRunnerException('Invalid task header, undefined control package');
        }
    }

    /**
     * Create Worker using global environment configuration.
     *
     * @return WorkerInterface
     * @throws EnvironmentException
     */
    public static function create(): WorkerInterface
    {
        $env = Environment::fromGlobals();

        return new static(\Spiral\Goridge\Relay::create($env->getRelayAddress()));
    }
}