summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-01-06 20:28:14 +0300
committerWolfy-J <[email protected]>2018-01-06 20:28:14 +0300
commitfc7b4269e32cb2a92c706c93a2ef496a333729fd (patch)
tree2611cd9a3af5376e55236e6cf66ad700c683db68
parentfa4bd78d9f7c5f74e8445374370927c742fc4e78 (diff)
more tests
-rw-r--r--.gitignore4
-rw-r--r--README.md22
-rw-r--r--balancer.go8
-rw-r--r--commands.go37
-rw-r--r--config.go14
-rw-r--r--error_test.go2
-rw-r--r--pipe_factory_test.go50
-rw-r--r--pool.go14
-rw-r--r--socket_factory.go39
-rw-r--r--socket_factory_test.go88
-rw-r--r--source/Worker.php15
-rw-r--r--state_test.go13
-rw-r--r--tests/broken-client.php17
-rw-r--r--tests/broken.php14
-rw-r--r--tests/client.php36
-rw-r--r--tests/echo.php (renamed from tests/echo-client.php)11
-rw-r--r--tests/error-client.php16
-rw-r--r--tests/error.php13
-rw-r--r--worker.go154
-rw-r--r--worker_test.go115
20 files changed, 411 insertions, 271 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 00000000..dd06a9bf
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,4 @@
+.idea/*
+composer.lock
+vendor/
+bin/ \ No newline at end of file
diff --git a/README.md b/README.md
index b4ab2759..3c7fc485 100644
--- a/README.md
+++ b/README.md
@@ -1,24 +1,20 @@
RoadRunner
==========
-[![Latest Stable Version](https://poser.pugx.org/spiral/roadrunner/v/stable)](https://packagist.org/packages/spiral/roadrunner)
-[![GoDoc](https://godoc.org/github.com/spiral/roadrunner?status.svg)](https://godoc.org/github.com/spiral/roadrunner)
-[![Build Status](https://travis-ci.org/spiral/roadrunner.svg?branch=master)](https://travis-ci.org/spiral/roadrunner)
-[![Scrutinizer Code Quality](https://scrutinizer-ci.com/g/spiral/roadrunner/badges/quality-score.png)](https://scrutinizer-ci.com/g/spiral/roadrunner/?branch=master)
-[![Go Report Card](https://goreportcard.com/badge/github.com/spiral/roadrunner)](https://goreportcard.com/report/github.com/spiral/roadrunner)
-
-PHP application server library for Golang.
+Embeddable PHP application server library for Golang.
Features:
--------
-- load balancer, process manager and task pipeline
-- hot-swap of workers
+- load balancer, process manager and task pipeline
+- hot-wrap of worker pool
- build for multiple frontends (queue, rest, psr-7, async php, etc)
-- works over TPC, unix sockets, standard pipes
-- safe worker termination
-- protocol, worker and job level error management
+- works over TPC, unix sockets and standard pipes
+- controlled worker termination
+- timeout management
+- payload context
+- protocol, job and worker level error management
- very fast (~200k calls per second on Ryzen 1700X over 17 threads)
- works on Windows
License:
--------
-The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information.
+The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information. \ No newline at end of file
diff --git a/balancer.go b/balancer.go
index 29fcd8d5..52479632 100644
--- a/balancer.go
+++ b/balancer.go
@@ -35,14 +35,14 @@ func (b *Balancer) Spawn(cmd func() *exec.Cmd, factory Factory, cfg Config) erro
return nil
}
-// Execute one task with given payload and context, returns result and context or error. Must not be used once pool is
-// being destroyed.
-func (b *Balancer) Execute(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) {
+// Exec one task with given payload and context, returns result and context
+// or error. Must not be used once pool is being destroyed.
+func (b *Balancer) Exec(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) {
b.mu.Lock()
pool := b.pool
b.mu.Unlock()
- return pool.Execute(payload, ctx)
+ return pool.Exec(payload, ctx)
}
// Workers return list of active workers.
diff --git a/commands.go b/commands.go
index 5368097f..ae9d0a12 100644
--- a/commands.go
+++ b/commands.go
@@ -2,26 +2,43 @@ package roadrunner
import (
"encoding/json"
+ "fmt"
"github.com/spiral/goridge"
+ "os"
)
-// TerminateCommand must stop underlying process.
-type TerminateCommand struct {
- Terminate bool `json:"terminate"`
+type stopCommand struct {
+ Stop bool `json:"stop"`
}
-// PidCommand send greeting message between processes in json format.
-type PidCommand struct {
- Pid int `json:"pid"`
- Parent int `json:"parent,omitempty"`
+type pidCommand struct {
+ Pid int `json:"pid"`
}
-// sends control message via relay using JSON encoding
-func sendCommand(rl goridge.Relay, command interface{}) error {
- bin, err := json.Marshal(command)
+func sendCommand(rl goridge.Relay, v interface{}) error {
+ bin, err := json.Marshal(v)
if err != nil {
return err
}
return rl.Send(bin, goridge.PayloadControl)
}
+
+func fetchPid(rl goridge.Relay) (pid int, err error) {
+ if err := sendCommand(rl, pidCommand{Pid: os.Getpid()}); err != nil {
+ return 0, err
+ }
+
+ body, p, err := rl.Receive()
+ if !p.HasFlag(goridge.PayloadControl) {
+ return 0, fmt.Errorf("unexpected response, `control` header is missing")
+ }
+
+ link := &pidCommand{}
+ //log.Println(string(body))
+ if err := json.Unmarshal(body, link); err != nil {
+ return 0, err
+ }
+
+ return link.Pid, nil
+}
diff --git a/config.go b/config.go
index e5d78d49..c8721962 100644
--- a/config.go
+++ b/config.go
@@ -4,16 +4,20 @@ import "time"
// Config defines basic behaviour of worker creation and handling process.
type Config struct {
- // MaxWorkers defines how many sub-processes can be run at once. This value might be doubled by Balancer while hot-swap.
+ // MaxWorkers defines how many sub-processes can be run at once. This value
+ // might be doubled by Balancer while hot-swap.
MaxWorkers uint64
- // MaxExecutions defines how many executions is allowed for the worker until it's destruction. Set 1 to create new process
- // for each new task, 0 to let worker handle as many tasks as it can.
+ // MaxExecutions defines how many executions is allowed for the worker until
+ // it's destruction. Set 1 to create new process for each new task, 0 to let
+ // worker handle as many tasks as it can.
MaxExecutions uint64
- // AllocateTimeout defines for how long pool will be waiting for a worker to be freed to handle the task.
+ // AllocateTimeout defines for how long pool will be waiting for a worker to
+ // be freed to handle the task.
AllocateTimeout time.Duration
- // DestroyOnError when set to true workers will be destructed after any JobError.
+ // DestroyOnError when set to true workers will be destructed after any
+ // JobError.
DestroyOnError bool
}
diff --git a/error_test.go b/error_test.go
index e8fa62b4..7680fd58 100644
--- a/error_test.go
+++ b/error_test.go
@@ -11,6 +11,6 @@ func TestWorkerError_Error(t *testing.T) {
}
func TestJobError_Error(t *testing.T) {
- e := JobError("error")
+ e := JobError([]byte("error"))
assert.Equal(t, "error", e.Error())
}
diff --git a/pipe_factory_test.go b/pipe_factory_test.go
new file mode 100644
index 00000000..947760d4
--- /dev/null
+++ b/pipe_factory_test.go
@@ -0,0 +1,50 @@
+package roadrunner
+
+import (
+ "github.com/stretchr/testify/assert"
+ "os/exec"
+ "testing"
+)
+
+func TestPipeNotStarted(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ assert.Nil(t, cmd.Start())
+
+ w, err := new(PipeFactory).NewWorker(cmd)
+ assert.Nil(t, w)
+ assert.NotNil(t, err)
+
+ assert.Equal(t, "can't attach to running process", err.Error())
+}
+
+func TestPipeErrored(t *testing.T) {
+ cmd := exec.Command("php", "tests/invalid.php")
+ w, err := new(PipeFactory).NewWorker(cmd)
+ assert.Nil(t, w)
+ assert.NotNil(t, err)
+
+ assert.Equal(t, "unable to connect to worker: unexpected response, `control` header is missing", err.Error())
+}
+
+func TestPipeStarted(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ w, err := new(PipeFactory).NewWorker(cmd)
+ defer w.Stop()
+
+ assert.NotNil(t, w)
+ assert.Nil(t, err)
+ assert.NotNil(t, *w.Pid)
+ assert.Equal(t, cmd.Process.Pid, *w.Pid)
+ assert.Equal(t, StateReady, w.st.Value())
+}
+
+func TestPipeEcho(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ w, _ := new(PipeFactory).NewWorker(cmd)
+ defer w.Stop()
+
+ r, ctx, err := w.Exec([]byte("hello"), nil)
+ assert.Nil(t, err)
+ assert.Nil(t, ctx)
+ assert.Equal(t, "hello", string(r))
+}
diff --git a/pool.go b/pool.go
index 50f14e4e..36665cac 100644
--- a/pool.go
+++ b/pool.go
@@ -21,7 +21,7 @@ type Pool struct {
numWorkers uint64 // current number of tasks workers
tasks sync.WaitGroup // counts all tasks executions
mua sync.Mutex // protects worker allocation
- muw sync.RWMutex // protects state of worker list
+ muw sync.RWMutex // protects st of worker list
workers []*Worker // all registered workers
free chan *Worker // freed workers
}
@@ -46,9 +46,9 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*Pool, error) {
return p, nil
}
-// Execute one task with given payload and context, returns result and context or error. Must not be used once pool is
+// Exec one task with given payload and context, returns result and context or error. Must not be used once pool is
// being destroyed.
-func (p *Pool) Execute(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) {
+func (p *Pool) Exec(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) {
p.tasks.Add(1)
defer p.tasks.Done()
@@ -57,7 +57,7 @@ func (p *Pool) Execute(payload []byte, ctx interface{}) (resp []byte, rCtx []byt
return nil, nil, err
}
- if resp, rCtx, err = w.Execute(payload, ctx); err != nil {
+ if resp, rCtx, err = w.Exec(payload, ctx); err != nil {
if !p.cfg.DestroyOnError {
if err, jobError := err.(JobError); jobError {
p.free <- w
@@ -84,7 +84,7 @@ func (p *Pool) Execute(payload []byte, ctx interface{}) (resp []byte, rCtx []byt
return p.Execute(payload, ctx)
}
- if p.cfg.MaxExecutions != 0 && atomic.LoadUint64(&w.NumExecutions) > p.cfg.MaxExecutions {
+ if p.cfg.MaxExecutions != 0 && atomic.LoadUint64(&w.numExecs) > p.cfg.MaxExecutions {
p.destroyWorker(w)
} else {
p.free <- w
@@ -143,7 +143,7 @@ func (p *Pool) allocateWorker() (*Worker, error) {
timeout := time.NewTimer(p.cfg.AllocateTimeout)
select {
case <-timeout.C:
- return nil, fmt.Errorf("unable to allocate worker, timeout (%s)", p.cfg.AllocateTimeout)
+ return nil, fmt.Errorf("unable to allocate worker, timeout (%st)", p.cfg.AllocateTimeout)
case w := <-p.free:
timeout.Stop()
return w, nil
@@ -170,7 +170,7 @@ func (p *Pool) destroyWorker(w *Worker) {
}()
}
-// creates new worker (must be called in a locked state).
+// creates new worker (must be called in a locked st).
func (p *Pool) createWorker() (*Worker, error) {
w, err := p.factory.NewWorker(p.cmd())
if err != nil {
diff --git a/socket_factory.go b/socket_factory.go
index 5d1b488b..28208327 100644
--- a/socket_factory.go
+++ b/socket_factory.go
@@ -1,11 +1,9 @@
package roadrunner
import (
- "encoding/json"
"fmt"
"github.com/spiral/goridge"
"net"
- "os"
"os/exec"
"sync"
"time"
@@ -34,7 +32,7 @@ func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory {
// NewWorker creates worker and connects it to appropriate relay or returns error
func (f *SocketFactory) NewWorker(cmd *exec.Cmd) (w *Worker, err error) {
- w, err = NewWorker(cmd)
+ w, err = newWorker(cmd)
if err != nil {
return nil, err
}
@@ -43,17 +41,13 @@ func (f *SocketFactory) NewWorker(cmd *exec.Cmd) (w *Worker, err error) {
return nil, err
}
- w.Pid = &w.cmd.Process.Pid
- if w.Pid == nil {
- return nil, fmt.Errorf("can't to start worker %s", w)
- }
-
rl, err := f.waitRelay(*w.Pid, f.tout)
if err != nil {
- return nil, fmt.Errorf("can't connect to worker %s: %s", w, err)
+ return nil, fmt.Errorf("unable to connect to worker: %s", err)
}
w.attach(rl)
+ w.st = newState(StateReady)
return w, nil
}
@@ -72,7 +66,7 @@ func (f *SocketFactory) listen() {
}
rl := goridge.NewSocketRelay(conn)
- if pid, err := fetchPID(rl); err == nil {
+ if pid, err := fetchPid(rl); err == nil {
f.relayChan(pid) <- rl
}
}
@@ -88,7 +82,7 @@ func (f *SocketFactory) waitRelay(pid int, tout time.Duration) (*goridge.SocketR
return rl, nil
case <-timer.C:
- return nil, fmt.Errorf("relay timer for [%v]", pid)
+ return nil, fmt.Errorf("relay timeout")
}
}
@@ -113,26 +107,3 @@ func (f *SocketFactory) cleanChan(pid int) {
delete(f.wait, pid)
}
-
-// send control command to relay and return associated Pid (or error)
-func fetchPID(rl goridge.Relay) (pid int, err error) {
- if err := sendCommand(rl, PidCommand{Pid: os.Getpid()}); err != nil {
- return 0, err
- }
-
- body, p, err := rl.Receive()
- if !p.HasFlag(goridge.PayloadControl) {
- return 0, fmt.Errorf("unexpected response, `control` header is missing")
- }
-
- link := &PidCommand{}
- if err := json.Unmarshal(body, link); err != nil {
- return 0, err
- }
-
- if link.Parent != os.Getpid() {
- return 0, fmt.Errorf("integrity error, parent process does not match")
- }
-
- return link.Pid, nil
-}
diff --git a/socket_factory_test.go b/socket_factory_test.go
new file mode 100644
index 00000000..4cefa9b8
--- /dev/null
+++ b/socket_factory_test.go
@@ -0,0 +1,88 @@
+package roadrunner
+
+import (
+ "github.com/stretchr/testify/assert"
+ "net"
+ "os/exec"
+ "runtime"
+ "testing"
+ "time"
+)
+
+func TestTcpNotStarted(t *testing.T) {
+ ls, err := net.Listen("tcp", "localhost:9007")
+ defer ls.Close()
+
+ assert.NotNil(t, ls)
+ assert.Nil(t, err)
+
+ cmd := exec.Command("php", "tests/client.php", "echo", "")
+ assert.Nil(t, cmd.Start())
+
+ w, err := NewSocketFactory(ls, time.Second).NewWorker(cmd)
+ assert.Nil(t, w)
+ assert.NotNil(t, err)
+
+ assert.Equal(t, "can't attach to running process", err.Error())
+}
+
+func TestTcpErrored(t *testing.T) {
+ ls, _ := net.Listen("tcp", "localhost:9007")
+ defer ls.Close()
+
+ cmd := exec.Command("php", "tests/invalid.php")
+ w, err := NewSocketFactory(ls, time.Second).NewWorker(cmd)
+ assert.Nil(t, w)
+ assert.NotNil(t, err)
+
+ assert.Equal(t, "unable to connect to worker: relay timeout", err.Error())
+}
+
+func TestTcpStarted(t *testing.T) {
+ ls, _ := net.Listen("tcp", "localhost:9007")
+ defer ls.Close()
+
+ cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
+ w, err := NewSocketFactory(ls, time.Second).NewWorker(cmd)
+ defer w.Stop()
+
+ assert.NotNil(t, w)
+ assert.Nil(t, err)
+ assert.NotNil(t, *w.Pid)
+ assert.Equal(t, cmd.Process.Pid, *w.Pid)
+ assert.Equal(t, StateReady, w.st.Value())
+}
+
+func TestTcpEcho(t *testing.T) {
+ ls, _ := net.Listen("tcp", "localhost:9007")
+ defer ls.Close()
+
+ cmd := exec.Command("php", "tests/client.php", "echo", "tcp")
+ w, err := NewSocketFactory(ls, time.Second).NewWorker(cmd)
+
+ defer w.Stop()
+
+ r, ctx, err := w.Exec([]byte("hello"), nil)
+ assert.Nil(t, err)
+ assert.Nil(t, ctx)
+ assert.Equal(t, "hello", string(r))
+}
+
+func TestUnixEcho(t *testing.T) {
+ if runtime.GOOS == "windows" {
+ t.Skip("not supported on windows")
+ }
+
+ ls, _ := net.Listen("unix", "sock.unix")
+ defer ls.Close()
+
+ cmd := exec.Command("php", "tests/client.php", "echo", "unix")
+ w, err := NewSocketFactory(ls, time.Second).NewWorker(cmd)
+
+ defer w.Stop()
+
+ r, ctx, err := w.Exec([]byte("hello"), nil)
+ assert.Nil(t, err)
+ assert.Nil(t, ctx)
+ assert.Equal(t, "hello", string(r))
+}
diff --git a/source/Worker.php b/source/Worker.php
index d31b48bd..cd496403 100644
--- a/source/Worker.php
+++ b/source/Worker.php
@@ -136,26 +136,23 @@ class Worker
return true;
}
- $parsed = json_decode($body, true);
- if ($parsed === false) {
+ $p = json_decode($body, true);
+ if ($p === false) {
throw new RoadRunnerException("invalid task context, JSON payload is expected");
}
// PID negotiation (socket connections only)
- if (!empty($parsed['pid'])) {
- $this->relay->send(json_encode([
- 'pid' => getmypid(),
- 'parent' => $parsed['pid'],
- ]), Relay::PAYLOAD_CONTROL);
+ if (!empty($p['pid'])) {
+ $this->relay->send(sprintf('{"pid":%s}', getmypid()), Relay::PAYLOAD_CONTROL);
}
// termination request
- if (!empty($parsed['terminate'])) {
+ if (!empty($p['stop'])) {
return false;
}
// not a command but execution context
- $context = $parsed;
+ $context = $p;
return true;
}
diff --git a/state_test.go b/state_test.go
new file mode 100644
index 00000000..027486f8
--- /dev/null
+++ b/state_test.go
@@ -0,0 +1,13 @@
+package roadrunner
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestNewState(t *testing.T) {
+ st := newState(StateAttached)
+
+ assert.Equal(t, "attached", st.String())
+ assert.NotEqual(t, 0, st.Updated().Unix())
+}
diff --git a/tests/broken-client.php b/tests/broken-client.php
deleted file mode 100644
index ed5bde20..00000000
--- a/tests/broken-client.php
+++ /dev/null
@@ -1,17 +0,0 @@
-<?php
-
-use Spiral\Goridge;
-use Spiral\RoadRunner;
-
-/**
- * echo client over pipes.
- */
-ini_set('display_errors', 'stderr');
-require "vendor/autoload.php";
-
-$rr = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
-
-while ($in = $rr->receive($ctx)) {
- echo undefined_function();
- $rr->send((string)$in);
-} \ No newline at end of file
diff --git a/tests/broken.php b/tests/broken.php
new file mode 100644
index 00000000..b1a3839e
--- /dev/null
+++ b/tests/broken.php
@@ -0,0 +1,14 @@
+<?php
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+
+$rr = new RoadRunner\Worker($relay);
+
+while ($in = $rr->receive($ctx)) {
+ echo undefined_function();
+ $rr->send((string)$in);
+} \ No newline at end of file
diff --git a/tests/client.php b/tests/client.php
new file mode 100644
index 00000000..45505a29
--- /dev/null
+++ b/tests/client.php
@@ -0,0 +1,36 @@
+<?php
+
+use Spiral\Goridge;
+
+ini_set('display_errors', 'stderr');
+require dirname(__DIR__) . "/vendor/autoload.php";
+
+if (count($argv) < 3) {
+ die("need 2 arguments");
+}
+
+list($test, $goridge) = [$argv[1], $argv[2]];
+
+switch ($goridge) {
+ case "pipes":
+ $relay = new Goridge\StreamRelay(STDIN, STDOUT);
+ break;
+
+ case "tcp":
+ $relay = new Goridge\SocketRelay("localhost", 9007);
+ break;
+
+ case "unix":
+ $relay = new Goridge\SocketRelay(
+ "sock.unix",
+ null,
+ Goridge\SocketRelay::SOCK_UNIX
+ );
+
+ break;
+
+ default:
+ die("invalid connect");
+}
+
+require_once sprintf("%s/%s.php", __DIR__, $test); \ No newline at end of file
diff --git a/tests/echo-client.php b/tests/echo.php
index 22761862..ba58ff30 100644
--- a/tests/echo-client.php
+++ b/tests/echo.php
@@ -1,15 +1,12 @@
<?php
+/**
+ * @var Goridge\RelayInterface $relay
+ */
use Spiral\Goridge;
use Spiral\RoadRunner;
-/**
- * echo client over pipes.
- */
-ini_set('display_errors', 'stderr');
-require "vendor/autoload.php";
-
-$rr = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
+$rr = new RoadRunner\Worker($relay);
while ($in = $rr->receive($ctx)) {
try {
diff --git a/tests/error-client.php b/tests/error-client.php
deleted file mode 100644
index 113d1197..00000000
--- a/tests/error-client.php
+++ /dev/null
@@ -1,16 +0,0 @@
-<?php
-
-use Spiral\Goridge;
-use Spiral\RoadRunner;
-
-/**
- * echo client over pipes.
- */
-ini_set('display_errors', 'stderr');
-require "vendor/autoload.php";
-
-$rr = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT));
-
-while ($in = $rr->receive($ctx)) {
- $rr->error((string)$in);
-} \ No newline at end of file
diff --git a/tests/error.php b/tests/error.php
new file mode 100644
index 00000000..ebd3418b
--- /dev/null
+++ b/tests/error.php
@@ -0,0 +1,13 @@
+<?php
+/**
+ * @var Goridge\RelayInterface $relay
+ */
+
+use Spiral\Goridge;
+use Spiral\RoadRunner;
+
+$rr = new RoadRunner\Worker($relay);
+
+while ($in = $rr->receive($ctx)) {
+ $rr->error((string)$in);
+} \ No newline at end of file
diff --git a/worker.go b/worker.go
index 8960b3fa..ebf9e29c 100644
--- a/worker.go
+++ b/worker.go
@@ -16,30 +16,25 @@ import (
// Worker - supervised process with api over goridge.Relay.
type Worker struct {
- // State current worker state.
- State State
-
- // Last time worker State has changed
- Last time.Time
-
- // NumExecutions how many times worker have been invoked.
- NumExecutions uint64
-
- // Pid contains process ID and empty until worker is started.
+ // Pid of the process, can be null if not started
Pid *int
+ st State // st current worker st.
+ numExecs uint64 // numExecs show how many times worker have been invoked.
+
cmd *exec.Cmd // underlying command process
err *bytes.Buffer // aggregates stderr
- rl goridge.Relay // communication bus with underlying process
- mu sync.RWMutex // ensures than only one execution can be run at once
+
+ mu sync.RWMutex // ensures than only one execution can be run at once
+ rl goridge.Relay // communication bus with underlying process
}
-// NewWorker creates new worker
-func NewWorker(cmd *exec.Cmd) (*Worker, error) {
+// newWorker creates new worker
+func newWorker(cmd *exec.Cmd) (*Worker, error) {
w := &Worker{
- cmd: cmd,
- err: bytes.NewBuffer(nil),
- State: StateInactive,
+ cmd: cmd,
+ err: bytes.NewBuffer(nil),
+ st: newState(StateDisabled),
}
if w.cmd.Process != nil {
@@ -49,22 +44,32 @@ func NewWorker(cmd *exec.Cmd) (*Worker, error) {
return w, nil
}
+// State provides access to worker state
+func (w *Worker) State() State {
+ return w.st
+}
+
+// NumExecs show how many times worker have been invoked.
+func (w *Worker) NumExecs() uint64 {
+ return w.numExecs
+}
+
// String returns worker description.
func (w *Worker) String() string {
- state := w.State.String()
+ state := w.st.String()
if w.Pid != nil {
state = state + ", pid:" + strconv.Itoa(*w.Pid)
}
- return fmt.Sprintf("(`%s` [%s], execs: %v)", strings.Join(w.cmd.Args, " "), state, w.NumExecutions)
+ return fmt.Sprintf("(`%st` [%s], numExecs: %v)", strings.Join(w.cmd.Args, " "), state, w.numExecs)
}
// Start underlying process or return error
func (w *Worker) Start() error {
stderr, err := w.cmd.StderrPipe()
if err != nil {
- w.setState(StateError)
+ w.st = newState(StateError)
return err
}
@@ -72,97 +77,106 @@ func (w *Worker) Start() error {
go io.Copy(w.err, stderr)
if err := w.cmd.Start(); err != nil {
- w.setState(StateError)
+ w.st = newState(StateError)
return w.mockError(err)
}
- w.setState(StateReady)
+ w.Pid = &w.cmd.Process.Pid
return nil
}
-// Execute command and return result and result context.
-func (w *Worker) Execute(body []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) {
+// Stop underlying process (No timeout limit) or return error.
+func (w *Worker) Stop() {
w.mu.Lock()
defer w.mu.Unlock()
- if w.State != StateReady {
- return nil, nil, fmt.Errorf("worker must be in state `waiting` (`%s` given)", w.State)
+ w.st = newState(StateDisabled)
+
+ go func() {
+ sendCommand(w.rl, &stopCommand{Stop: true})
+ }()
+
+ w.cmd.Wait()
+ w.rl.Close()
+
+ w.st = newState(StateStopped)
+}
+
+// todo: timeout
+// return syscall.Kill(-c.status.PID, syscall.SIGTERM)
+
+// Exec command and return result and result context.
+func (w *Worker) Exec(body []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) {
+ if w.st.Value() != StateReady {
+ return nil, nil, fmt.Errorf("worker is not ready (%s)", w.st.Value())
}
- w.setState(StateReady)
- atomic.AddUint64(&w.NumExecutions, 1)
-
- if ctx != nil {
- if data, err := json.Marshal(ctx); err == nil {
- w.rl.Send(data, goridge.PayloadControl)
- } else {
- return nil, nil, fmt.Errorf("invalid context: %s", err)
- }
- } else {
- w.rl.Send(nil, goridge.PayloadControl|goridge.PayloadEmpty)
+ w.mu.Lock()
+ defer w.mu.Unlock()
+ defer atomic.AddUint64(&w.numExecs, 1)
+
+ w.st = newState(StateWorking)
+
+ if err := w.sendPayload(ctx, goridge.PayloadControl); err != nil {
+ return nil, nil, fmt.Errorf("invalid context: %st", err)
}
w.rl.Send(body, 0)
+ // response header
rCtx, p, err := w.rl.Receive()
- if !p.HasFlag(goridge.PayloadControl) {
- return nil, nil, w.mockError(fmt.Errorf("invalid response (check script integrity)"))
- }
-
if p.HasFlag(goridge.PayloadError) {
- w.setState(StateReady)
+ w.st = newState(StateReady)
return nil, nil, JobError(rCtx)
}
+ if !p.HasFlag(goridge.PayloadControl) {
+ w.st = newState(StateError)
+ return nil, nil, w.mockError(fmt.Errorf("invalid response (check script integrity)"))
+ }
+
+ // body
if resp, p, err = w.rl.Receive(); err != nil {
- w.setState(StateError)
- return nil, nil, w.mockError(fmt.Errorf("worker error: %s", err))
+ w.st = newState(StateError)
+ return nil, nil, w.mockError(fmt.Errorf("worker error: %st", err))
}
- w.setState(StateReady)
+ w.st = newState(StateReady)
return resp, rCtx, nil
}
-// Stop underlying process or return error.
-func (w *Worker) Stop() {
- w.mu.Lock()
- defer w.mu.Unlock()
-
- w.setState(StateInactive)
-
- go func() {
- sendCommand(w.rl, &TerminateCommand{Terminate: true})
- }()
-
- w.cmd.Wait()
- w.rl.Close()
-
- w.setState(StateStopped)
-}
-
// attach payload/control relay to the worker.
func (w *Worker) attach(rl goridge.Relay) {
w.mu.Lock()
defer w.mu.Unlock()
w.rl = rl
- w.setState(StateBooting)
-}
-
-// sets worker State and it's context (non blocking!).
-func (w *Worker) setState(state State) {
- // safer?
- w.State = state
- w.Last = time.Now()
+ w.st = newState(StateAttached)
}
// mockError attaches worker specific error (from stderr) to parent error
func (w *Worker) mockError(err error) WorkerError {
+ //w.cmd.Process.Signal(os.Kill)
+
+ time.Sleep(time.Millisecond * 100)
if w.err.Len() != 0 {
return WorkerError(w.err.String())
}
return WorkerError(err.Error())
}
+
+func (w *Worker) sendPayload(v interface{}, flags byte) error {
+ if v == nil {
+ w.rl.Send(nil, goridge.PayloadControl)
+ }
+ data, err := json.Marshal(v)
+
+ if err != nil {
+ return fmt.Errorf("invalid payload: %s", err)
+ }
+
+ return w.rl.Send(data, goridge.PayloadControl)
+}
diff --git a/worker_test.go b/worker_test.go
index d4d24364..d69ab4d6 100644
--- a/worker_test.go
+++ b/worker_test.go
@@ -1,81 +1,46 @@
package roadrunner
import (
- "github.com/spiral/goridge"
"github.com/stretchr/testify/assert"
- "io"
"os/exec"
"testing"
"time"
)
-func getPipes(cmd *exec.Cmd) (io.ReadCloser, io.WriteCloser) {
- in, err := cmd.StdoutPipe()
- if err != nil {
- panic(err)
- }
+func TestGetState(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ w, _ := new(PipeFactory).NewWorker(cmd)
+ defer w.Stop()
- out, err := cmd.StdinPipe()
- if err != nil {
- panic(err)
- }
-
- return in, out
-}
-
-func TestOnStarted(t *testing.T) {
- pr := exec.Command("php", "tests/echo-client.php")
- pr.Start()
-
- _, err := NewWorker(pr)
- assert.NotNil(t, err)
- assert.Equal(t, "can't attach to running process", err.Error())
-}
-
-func TestNewWorkerState(t *testing.T) {
- w, err := NewWorker(exec.Command("php", "tests/echo-client.php"))
- assert.Nil(t, err)
- assert.Equal(t, StateInactive, w.State)
-
- w.attach(goridge.NewPipeRelay(getPipes(w.cmd)))
- assert.Equal(t, StateBooting, w.State)
-
- assert.Nil(t, w.Start())
- assert.Equal(t, StateReady, w.State)
+ assert.Equal(t, StateReady, w.State().Value())
}
func TestStop(t *testing.T) {
- w, err := NewWorker(exec.Command("php", "tests/echo-client.php"))
- assert.Nil(t, err)
-
- w.attach(goridge.NewPipeRelay(getPipes(w.cmd)))
- assert.Nil(t, w.Start())
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ w, _ := new(PipeFactory).NewWorker(cmd)
+ defer w.Stop()
w.Stop()
- assert.Equal(t, StateStopped, w.State)
+ assert.Equal(t, StateStopped, w.State().Value())
}
func TestEcho(t *testing.T) {
- w, err := NewWorker(exec.Command("php", "tests/echo-client.php"))
- assert.Nil(t, err)
-
- w.attach(goridge.NewPipeRelay(getPipes(w.cmd)))
- assert.Nil(t, w.Start())
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ w, _ := new(PipeFactory).NewWorker(cmd)
+ defer w.Stop()
- r, ctx, err := w.Execute([]byte("hello"), nil)
+ r, ctx, err := w.Exec([]byte("hello"), nil)
assert.Nil(t, err)
assert.Nil(t, ctx)
assert.Equal(t, "hello", string(r))
}
func TestError(t *testing.T) {
- w, err := NewWorker(exec.Command("php", "tests/error-client.php"))
- assert.Nil(t, err)
-
- w.attach(goridge.NewPipeRelay(getPipes(w.cmd)))
- assert.Nil(t, w.Start())
+ cmd := exec.Command("php", "tests/client.php", "error", "pipes")
+ w, _ := new(PipeFactory).NewWorker(cmd)
+ defer w.Stop()
- r, ctx, err := w.Execute([]byte("hello"), nil)
+ r, ctx, err := w.Exec([]byte("hello"), nil)
assert.Nil(t, r)
assert.NotNil(t, err)
assert.Nil(t, ctx)
@@ -85,13 +50,11 @@ func TestError(t *testing.T) {
}
func TestBroken(t *testing.T) {
- w, err := NewWorker(exec.Command("php", "tests/broken-client.php"))
- assert.Nil(t, err)
+ cmd := exec.Command("php", "tests/client.php", "broken", "pipes")
+ w, _ := new(PipeFactory).NewWorker(cmd)
+ defer w.Stop()
- w.attach(goridge.NewPipeRelay(getPipes(w.cmd)))
- assert.Nil(t, w.Start())
-
- r, ctx, err := w.Execute([]byte("hello"), nil)
+ r, ctx, err := w.Exec([]byte("hello"), nil)
assert.Nil(t, r)
assert.NotNil(t, err)
assert.Nil(t, ctx)
@@ -100,31 +63,27 @@ func TestBroken(t *testing.T) {
assert.Contains(t, err.Error(), "undefined_function()")
}
-func TestNumExecutions(t *testing.T) {
- w, err := NewWorker(exec.Command("php", "tests/echo-client.php"))
- assert.Nil(t, err)
-
- w.attach(goridge.NewPipeRelay(getPipes(w.cmd)))
- assert.Nil(t, w.Start())
+func TestCalled(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ w, _ := new(PipeFactory).NewWorker(cmd)
+ defer w.Stop()
- w.Execute([]byte("hello"), nil)
- assert.Equal(t, uint64(1), w.NumExecutions)
+ w.Exec([]byte("hello"), nil)
+ assert.Equal(t, uint64(1), w.NumExecs())
- w.Execute([]byte("hello"), nil)
- assert.Equal(t, uint64(2), w.NumExecutions)
+ w.Exec([]byte("hello"), nil)
+ assert.Equal(t, uint64(2), w.NumExecs())
- w.Execute([]byte("hello"), nil)
- assert.Equal(t, uint64(3), w.NumExecutions)
+ w.Exec([]byte("hello"), nil)
+ assert.Equal(t, uint64(3), w.NumExecs())
}
-func TestLastExecution(t *testing.T) {
- w, err := NewWorker(exec.Command("php", "tests/echo-client.php"))
- assert.Nil(t, err)
-
- w.attach(goridge.NewPipeRelay(getPipes(w.cmd)))
- assert.Nil(t, w.Start())
+func TestStateChanged(t *testing.T) {
+ cmd := exec.Command("php", "tests/client.php", "echo", "pipes")
+ w, _ := new(PipeFactory).NewWorker(cmd)
+ defer w.Stop()
tm := time.Now()
- w.Execute([]byte("hello"), nil)
- assert.True(t, w.Last.After(tm))
+ w.Exec([]byte("hello"), nil)
+ assert.True(t, w.State().Updated().After(tm))
}