diff options
author | Wolfy-J <[email protected]> | 2018-01-28 15:25:57 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-01-28 15:25:57 +0300 |
commit | cb7629dbe105e6ee2bf9b2b1f3619dff596d0aa1 (patch) | |
tree | 26a5d6c2077b1cd77878fa009ff4fca448827e5b | |
parent | a28ac27bd762ad65ff4cbbeaa8e2b03c9f0d1c32 (diff) |
head parsing for raw data
-rw-r--r-- | payload_test.go | 3 | ||||
-rw-r--r-- | pool_test.go | 42 | ||||
-rw-r--r-- | protocol.go | 3 | ||||
-rw-r--r-- | source/Worker.php | 40 | ||||
-rw-r--r-- | tests/head.php | 17 | ||||
-rw-r--r-- | todo.go | 4 | ||||
-rw-r--r-- | worker.go | 10 |
7 files changed, 86 insertions, 33 deletions
diff --git a/payload_test.go b/payload_test.go deleted file mode 100644 index 1721392a..00000000 --- a/payload_test.go +++ /dev/null @@ -1,3 +0,0 @@ -package roadrunner - -//todo: need payload test
\ No newline at end of file diff --git a/pool_test.go b/pool_test.go index 0ce7be26..76c93b60 100644 --- a/pool_test.go +++ b/pool_test.go @@ -75,6 +75,48 @@ func Test_Pool_Echo(t *testing.T) { assert.Equal(t, "hello", res.String()) } +func Test_Pool_Echo_NilHead(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, + NewPipeFactory(), + cfg, + ) + defer p.Destroy() + + assert.NotNil(t, p) + assert.NoError(t, err) + + res, err := p.Exec(&Payload{Body: []byte("hello"), Head: nil}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Head) + + assert.Equal(t, "hello", res.String()) +} + +func Test_Pool_Echo_Head(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "head", "pipes") }, + NewPipeFactory(), + cfg, + ) + defer p.Destroy() + + assert.NotNil(t, p) + assert.NoError(t, err) + + res, err := p.Exec(&Payload{Body: []byte("hello"), Head: []byte("world")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.Nil(t, res.Body) + assert.NotNil(t, res.Head) + + assert.Equal(t, "world", string(res.Head)) +} + func Test_Pool_AllocateTimeout(t *testing.T) { p, err := NewPool( func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, diff --git a/protocol.go b/protocol.go index b78f2807..0a0ec37f 100644 --- a/protocol.go +++ b/protocol.go @@ -21,7 +21,7 @@ func sendHead(rl goridge.Relay, v interface{}) error { } if data, ok := v.([]byte); ok { - return rl.Send(data, goridge.PayloadControl) + return rl.Send(data, goridge.PayloadControl|goridge.PayloadRaw) } data, err := json.Marshal(v) @@ -43,7 +43,6 @@ func fetchPID(rl goridge.Relay) (pid int, err error) { } link := &pidCommand{} - //log.Println(string(body)) if err := json.Unmarshal(body, link); err != nil { return 0, err } diff --git a/source/Worker.php b/source/Worker.php index 57e8fbf5..f21b27c0 100644 --- a/source/Worker.php +++ b/source/Worker.php @@ -23,11 +23,8 @@ use Spiral\RoadRunner\Exceptions\RoadRunnerException; */ class Worker { - // Must be set as context value in order to perform controlled demolition of worker - const TERMINATE = "TERMINATE"; - - // Must be set as context value in order to represent content as an error - const ERROR = "ERROR"; + // Send as response context to request worker termination + const TERMINATE = '{"stop": true}'; /** @var Relay */ private $relay; @@ -44,19 +41,19 @@ class Worker * Receive packet of information to process, returns null when process must be stopped. Might * return Error to wrap error message from server. * - * @param array $context Contains parsed context array send by the server. + * @param mixed $header * * @return \Error|null|string * @throws GoridgeException */ - public function receive(&$context) + public function receive(&$header) { $body = $this->relay->receiveSync($flags); if ($flags & Relay::PAYLOAD_CONTROL) { - if ($this->handleControl($body, $context)) { + if ($this->handleControl($body, $header, $flags)) { // wait for the next command - return $this->receive($context); + return $this->receive($header); } // Expect process termination @@ -77,17 +74,16 @@ class Worker * $worker->respond((string)$response->getBody(), json_encode($response->getHeaders())); * * @param string $payload - * @param string $context + * @param string $header */ - public function send(string $payload, string $context = null) + public function send(string $payload, string $header = null) { - if (is_null($context)) { - $this->relay->send($context, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_NONE); + if (is_null($header)) { + $this->relay->send($header, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_NONE); } else { - $this->relay->send($context, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW); + $this->relay->send($header, Relay::PAYLOAD_CONTROL | Relay::PAYLOAD_RAW); } - //todo: null payload? $this->relay->send($payload, Relay::PAYLOAD_RAW); } @@ -124,16 +120,18 @@ class Worker * Handles incoming control command payload and executes it if required. * * @param string $body - * @param array $context Exported context (if any). + * @param mixed $header Exported context (if any). + * @param int $flags * * @returns bool True when continue processing. * * @throws RoadRunnerException */ - private function handleControl(string $body = null, &$context = null): bool + private function handleControl(string $body = null, &$header = null, int $flags): bool { - if (is_null($body)) { - // empty prefix + $header = $body; + if (is_null($body) || $flags & Relay::PAYLOAD_RAW) { + // empty or raw prefix return true; } @@ -152,8 +150,8 @@ class Worker return false; } - // not a command but execution context - $context = $p; + // parsed header + $header = $p; return true; } diff --git a/tests/head.php b/tests/head.php new file mode 100644 index 00000000..4f4e4061 --- /dev/null +++ b/tests/head.php @@ -0,0 +1,17 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ + +use Spiral\Goridge; +use Spiral\RoadRunner; + +$rr = new RoadRunner\Worker($relay); + +while ($in = $rr->receive($ctx)) { + try { + $rr->send("", (string)$ctx); + } catch (\Throwable $e) { + $rr->error((string)$e); + } +}
\ No newline at end of file @@ -1,7 +1,7 @@ package roadrunner - // test terminate request // test head // test json head? -// job error
\ No newline at end of file +// job error +// job timeout?
\ No newline at end of file @@ -21,17 +21,17 @@ type Worker struct { // state holds information about current worker state, // number of worker executions, last status change time. - // publicly this object is read-only and protected using Mutex + // publicly this object is receive-only and protected using Mutex // and atomic counter. state *state // underlying command with associated process, command must be // provided to worker from outside in non-started form. Cmd - // stdErr pipe will be handled by worker to aggregate error message. + // stdErr direction will be handled by worker to aggregate error message. cmd *exec.Cmd // err aggregates stderr output from underlying process. Value can be - // read only once command is completed and all pipes are closed. + // receive only once command is completed and all pipes are closed. err *bytes.Buffer // channel is being closed once command is complete. @@ -66,7 +66,7 @@ func newWorker(cmd *exec.Cmd) (*Worker, error) { return w, nil } -// State return read-only worker state object, state can be used to safely access +// State return receive-only worker state object, state can be used to safely access // worker status, time when status changed and number of worker executions. func (w *Worker) State() State { return w.state @@ -126,7 +126,7 @@ func (w *Worker) Start() error { func (w *Worker) Wait() error { <-w.waitDone - // ensure that all read/write operations are complete + // ensure that all receive/send operations are complete w.mu.Lock() defer w.mu.Unlock() |