summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--payload_test.go3
-rw-r--r--pool_test.go42
-rw-r--r--protocol.go3
-rw-r--r--source/Worker.php40
-rw-r--r--tests/head.php17
-rw-r--r--todo.go4
-rw-r--r--worker.go10
7 files changed, 86 insertions, 33 deletions
diff --git a/payload_test.go b/payload_test.go
deleted file mode 100644
index 1721392a..00000000
--- a/payload_test.go
+++ /dev/null
@@ -1,3 +0,0 @@
-package roadrunner
-
-//todo: need payload test \ No newline at end of file
diff --git a/pool_test.go b/pool_test.go
index 0ce7be26..76c93b60 100644
--- a/pool_test.go
+++ b/pool_test.go
@@ -75,6 +75,48 @@ func Test_Pool_Echo(t *testing.T) {
assert.Equal(t, "hello", res.String())
}
+func Test_Pool_Echo_NilHead(t *testing.T) {
+ p, err := NewPool(
+ func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
+ NewPipeFactory(),
+ cfg,
+ )
+ defer p.Destroy()
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+
+ res, err := p.Exec(&Payload{Body: []byte("hello"), Head: nil})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Head)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_Pool_Echo_Head(t *testing.T) {
+ p, err := NewPool(
+ func() *exec.Cmd { return exec.Command("php", "tests/client.php", "head", "pipes") },
+ NewPipeFactory(),
+ cfg,
+ )
+ defer p.Destroy()
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+
+ res, err := p.Exec(&Payload{Body: []byte("hello"), Head: []byte("world")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.Nil(t, res.Body)
+ assert.NotNil(t, res.Head)
+
+ assert.Equal(t, "world", string(res.Head))
+}
+
func Test_Pool_AllocateTimeout(t *testing.T) {
p, err := NewPool(
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
diff --git a/protocol.go b/protocol.go
index b78f2807..0a0ec37f 100644
--- a/protocol.go
+++ b/protocol.go
@@ -21,7 +21,7 @@ func sendHead(rl goridge.Relay, v interface{}) error {
}
if data, ok := v.([]byte); ok {
- return rl.Send(data, goridge.PayloadControl)
+ return rl.Send(data, goridge.PayloadControl|goridge.PayloadRaw)
}
data, err := json.Marshal(v)
@@ -43,7 +43,6 @@ func fetchPID(rl goridge.Relay) (pid int, err error) {
}
link := &pidCommand{}
- //log.Println(string(body))
if err := json.Unmarshal(body, link); err != nil {
return 0, err
}
diff --git a/source/Worker.php b/source/Worker.php
index 57e8fbf5..f21b27c0 100644
--- a/source/Worker.php
+++ b/source/Worker.php
@@ -23,11 +23,8 @@ use Spiral\RoadRunner\Exceptions\RoadRunnerException;
*/
class Worker
{
- // Must be set as context value in order to perform controlled demolition of worker
- const TERMINATE = "TERMINATE";
-
- // Must be set as context value in order to represent content as an error
- const ERROR = "ERROR";
+ // Send as response context to request worker termination
+ const TERMINATE = '{"stop": true}';
/** @var Relay */
private $relay;
@@ -44,19 +41,19 @@ class Worker
* Receive packet of information to process, returns null when process must be stopped. Might
* return Error to wrap error message from server.
*
- * @param array $context Contains parsed context array send by the server.
+ * @param mixed $header
*
* @return \Error|null|string
* @throws GoridgeException
*/
- public function receive(&$context)
+ public function receive(&$header)
{
$body = $this->relay->receiveSync($flags);
if ($flags & Relay::PAYLOAD_CONTROL) {
- if ($this->handleControl($body, $context)) {
+ if ($this->handleControl($body, $header, $flags)) {
// wait for the next command
- return $this->receive($context);
+ return $this->receive($header);
}
// Expect process termination
@@ -77,17 +74,16 @@ class Worker
* $worker->respond((string)$response->getBody(), json_encode($response->getHeaders()));
*
* @param string $payload
- * @param string $context
+ * @param string $header
*/
- public function send(string $payload, string $context = null)
+ public function send(string $payload, string $header = null)
{
- if (is_null($context)) {
- $this->relay->send($context, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_NONE);
+ if (is_null($header)) {
+ $this->relay->send($header, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_NONE);
} else {
- $this->relay->send($context, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW);
+ $this->relay->send($header, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW);
}
- //todo: null payload?
$this->relay->send($payload, Relay::PAYLOAD_RAW);
}
@@ -124,16 +120,18 @@ class Worker
* Handles incoming control command payload and executes it if required.
*
* @param string $body
- * @param array $context Exported context (if any).
+ * @param mixed $header Exported context (if any).
+ * @param int $flags
*
* @returns bool True when continue processing.
*
* @throws RoadRunnerException
*/
- private function handleControl(string $body = null, &$context = null): bool
+ private function handleControl(string $body = null, &$header = null, int $flags): bool
{
- if (is_null($body)) {
- // empty prefix
+ $header = $body;
+ if (is_null($body) || $flags & Relay::PAYLOAD_RAW) {
+ // empty or raw prefix
return true;
}
@@ -152,8 +150,8 @@ class Worker
return false;
}
- // not a command but execution context
- $context = $p;
+ // parsed header
+ $header = $p;
return true;
}
diff --git a/tests/head.php b/tests/head.php
new file mode 100644
index 00000000..4f4e4061
--- /dev/null
+++ b/tests/head.php
@@ -0,0 +1,17 @@
+<?php
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+
+$rr = new RoadRunner\Worker($relay);
+
+while ($in = $rr->receive($ctx)) {
+ try {
+ $rr->send("", (string)$ctx);
+ } catch (\Throwable $e) {
+ $rr->error((string)$e);
+ }
+} \ No newline at end of file
diff --git a/todo.go b/todo.go
index d3af4a44..1a4a1544 100644
--- a/todo.go
+++ b/todo.go
@@ -1,7 +1,7 @@
package roadrunner
-
// test terminate request
// test head
// test json head?
-// job error \ No newline at end of file
+// job error
+// job timeout? \ No newline at end of file
diff --git a/worker.go b/worker.go
index b62cf86e..6520ebfe 100644
--- a/worker.go
+++ b/worker.go
@@ -21,17 +21,17 @@ type Worker struct {
// state holds information about current worker state,
// number of worker executions, last status change time.
- // publicly this object is read-only and protected using Mutex
+ // publicly this object is receive-only and protected using Mutex
// and atomic counter.
state *state
// underlying command with associated process, command must be
// provided to worker from outside in non-started form. Cmd
- // stdErr pipe will be handled by worker to aggregate error message.
+ // stdErr direction will be handled by worker to aggregate error message.
cmd *exec.Cmd
// err aggregates stderr output from underlying process. Value can be
- // read only once command is completed and all pipes are closed.
+ // receive only once command is completed and all pipes are closed.
err *bytes.Buffer
// channel is being closed once command is complete.
@@ -66,7 +66,7 @@ func newWorker(cmd *exec.Cmd) (*Worker, error) {
return w, nil
}
-// State return read-only worker state object, state can be used to safely access
+// State return receive-only worker state object, state can be used to safely access
// worker status, time when status changed and number of worker executions.
func (w *Worker) State() State {
return w.state
@@ -126,7 +126,7 @@ func (w *Worker) Start() error {
func (w *Worker) Wait() error {
<-w.waitDone
- // ensure that all read/write operations are complete
+ // ensure that all receive/send operations are complete
w.mu.Lock()
defer w.mu.Unlock()