diff options
author | Wolfy-J <[email protected]> | 2018-01-06 20:28:14 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-01-06 20:28:14 +0300 |
commit | fc7b4269e32cb2a92c706c93a2ef496a333729fd (patch) | |
tree | 2611cd9a3af5376e55236e6cf66ad700c683db68 | |
parent | fa4bd78d9f7c5f74e8445374370927c742fc4e78 (diff) |
more tests
-rw-r--r-- | .gitignore | 4 | ||||
-rw-r--r-- | README.md | 22 | ||||
-rw-r--r-- | balancer.go | 8 | ||||
-rw-r--r-- | commands.go | 37 | ||||
-rw-r--r-- | config.go | 14 | ||||
-rw-r--r-- | error_test.go | 2 | ||||
-rw-r--r-- | pipe_factory_test.go | 50 | ||||
-rw-r--r-- | pool.go | 14 | ||||
-rw-r--r-- | socket_factory.go | 39 | ||||
-rw-r--r-- | socket_factory_test.go | 88 | ||||
-rw-r--r-- | source/Worker.php | 15 | ||||
-rw-r--r-- | state_test.go | 13 | ||||
-rw-r--r-- | tests/broken-client.php | 17 | ||||
-rw-r--r-- | tests/broken.php | 14 | ||||
-rw-r--r-- | tests/client.php | 36 | ||||
-rw-r--r-- | tests/echo.php (renamed from tests/echo-client.php) | 11 | ||||
-rw-r--r-- | tests/error-client.php | 16 | ||||
-rw-r--r-- | tests/error.php | 13 | ||||
-rw-r--r-- | worker.go | 154 | ||||
-rw-r--r-- | worker_test.go | 115 |
20 files changed, 411 insertions, 271 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..dd06a9bf --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.idea/* +composer.lock +vendor/ +bin/
\ No newline at end of file @@ -1,24 +1,20 @@ RoadRunner ========== -[![Latest Stable Version](https://poser.pugx.org/spiral/roadrunner/v/stable)](https://packagist.org/packages/spiral/roadrunner) -[![GoDoc](https://godoc.org/github.com/spiral/roadrunner?status.svg)](https://godoc.org/github.com/spiral/roadrunner) -[![Build Status](https://travis-ci.org/spiral/roadrunner.svg?branch=master)](https://travis-ci.org/spiral/roadrunner) -[![Scrutinizer Code Quality](https://scrutinizer-ci.com/g/spiral/roadrunner/badges/quality-score.png)](https://scrutinizer-ci.com/g/spiral/roadrunner/?branch=master) -[![Go Report Card](https://goreportcard.com/badge/github.com/spiral/roadrunner)](https://goreportcard.com/report/github.com/spiral/roadrunner) - -PHP application server library for Golang. +Embeddable PHP application server library for Golang. Features: -------- -- load balancer, process manager and task pipeline -- hot-swap of workers +- load balancer, process manager and task pipeline +- hot-wrap of worker pool - build for multiple frontends (queue, rest, psr-7, async php, etc) -- works over TPC, unix sockets, standard pipes -- safe worker termination -- protocol, worker and job level error management +- works over TPC, unix sockets and standard pipes +- controlled worker termination +- timeout management +- payload context +- protocol, job and worker level error management - very fast (~200k calls per second on Ryzen 1700X over 17 threads) - works on Windows License: -------- -The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information. +The MIT License (MIT). Please see [`LICENSE`](./LICENSE) for more information.
\ No newline at end of file diff --git a/balancer.go b/balancer.go index 29fcd8d5..52479632 100644 --- a/balancer.go +++ b/balancer.go @@ -35,14 +35,14 @@ func (b *Balancer) Spawn(cmd func() *exec.Cmd, factory Factory, cfg Config) erro return nil } -// Execute one task with given payload and context, returns result and context or error. Must not be used once pool is -// being destroyed. -func (b *Balancer) Execute(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) { +// Exec one task with given payload and context, returns result and context +// or error. Must not be used once pool is being destroyed. +func (b *Balancer) Exec(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) { b.mu.Lock() pool := b.pool b.mu.Unlock() - return pool.Execute(payload, ctx) + return pool.Exec(payload, ctx) } // Workers return list of active workers. diff --git a/commands.go b/commands.go index 5368097f..ae9d0a12 100644 --- a/commands.go +++ b/commands.go @@ -2,26 +2,43 @@ package roadrunner import ( "encoding/json" + "fmt" "github.com/spiral/goridge" + "os" ) -// TerminateCommand must stop underlying process. -type TerminateCommand struct { - Terminate bool `json:"terminate"` +type stopCommand struct { + Stop bool `json:"stop"` } -// PidCommand send greeting message between processes in json format. -type PidCommand struct { - Pid int `json:"pid"` - Parent int `json:"parent,omitempty"` +type pidCommand struct { + Pid int `json:"pid"` } -// sends control message via relay using JSON encoding -func sendCommand(rl goridge.Relay, command interface{}) error { - bin, err := json.Marshal(command) +func sendCommand(rl goridge.Relay, v interface{}) error { + bin, err := json.Marshal(v) if err != nil { return err } return rl.Send(bin, goridge.PayloadControl) } + +func fetchPid(rl goridge.Relay) (pid int, err error) { + if err := sendCommand(rl, pidCommand{Pid: os.Getpid()}); err != nil { + return 0, err + } + + body, p, err := rl.Receive() + if !p.HasFlag(goridge.PayloadControl) { + return 0, fmt.Errorf("unexpected response, `control` header is missing") + } + + link := &pidCommand{} + //log.Println(string(body)) + if err := json.Unmarshal(body, link); err != nil { + return 0, err + } + + return link.Pid, nil +} @@ -4,16 +4,20 @@ import "time" // Config defines basic behaviour of worker creation and handling process. type Config struct { - // MaxWorkers defines how many sub-processes can be run at once. This value might be doubled by Balancer while hot-swap. + // MaxWorkers defines how many sub-processes can be run at once. This value + // might be doubled by Balancer while hot-swap. MaxWorkers uint64 - // MaxExecutions defines how many executions is allowed for the worker until it's destruction. Set 1 to create new process - // for each new task, 0 to let worker handle as many tasks as it can. + // MaxExecutions defines how many executions is allowed for the worker until + // it's destruction. Set 1 to create new process for each new task, 0 to let + // worker handle as many tasks as it can. MaxExecutions uint64 - // AllocateTimeout defines for how long pool will be waiting for a worker to be freed to handle the task. + // AllocateTimeout defines for how long pool will be waiting for a worker to + // be freed to handle the task. AllocateTimeout time.Duration - // DestroyOnError when set to true workers will be destructed after any JobError. + // DestroyOnError when set to true workers will be destructed after any + // JobError. DestroyOnError bool } diff --git a/error_test.go b/error_test.go index e8fa62b4..7680fd58 100644 --- a/error_test.go +++ b/error_test.go @@ -11,6 +11,6 @@ func TestWorkerError_Error(t *testing.T) { } func TestJobError_Error(t *testing.T) { - e := JobError("error") + e := JobError([]byte("error")) assert.Equal(t, "error", e.Error()) } diff --git a/pipe_factory_test.go b/pipe_factory_test.go new file mode 100644 index 00000000..947760d4 --- /dev/null +++ b/pipe_factory_test.go @@ -0,0 +1,50 @@ +package roadrunner + +import ( + "github.com/stretchr/testify/assert" + "os/exec" + "testing" +) + +func TestPipeNotStarted(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + assert.Nil(t, cmd.Start()) + + w, err := new(PipeFactory).NewWorker(cmd) + assert.Nil(t, w) + assert.NotNil(t, err) + + assert.Equal(t, "can't attach to running process", err.Error()) +} + +func TestPipeErrored(t *testing.T) { + cmd := exec.Command("php", "tests/invalid.php") + w, err := new(PipeFactory).NewWorker(cmd) + assert.Nil(t, w) + assert.NotNil(t, err) + + assert.Equal(t, "unable to connect to worker: unexpected response, `control` header is missing", err.Error()) +} + +func TestPipeStarted(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + w, err := new(PipeFactory).NewWorker(cmd) + defer w.Stop() + + assert.NotNil(t, w) + assert.Nil(t, err) + assert.NotNil(t, *w.Pid) + assert.Equal(t, cmd.Process.Pid, *w.Pid) + assert.Equal(t, StateReady, w.st.Value()) +} + +func TestPipeEcho(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + w, _ := new(PipeFactory).NewWorker(cmd) + defer w.Stop() + + r, ctx, err := w.Exec([]byte("hello"), nil) + assert.Nil(t, err) + assert.Nil(t, ctx) + assert.Equal(t, "hello", string(r)) +} @@ -21,7 +21,7 @@ type Pool struct { numWorkers uint64 // current number of tasks workers tasks sync.WaitGroup // counts all tasks executions mua sync.Mutex // protects worker allocation - muw sync.RWMutex // protects state of worker list + muw sync.RWMutex // protects st of worker list workers []*Worker // all registered workers free chan *Worker // freed workers } @@ -46,9 +46,9 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*Pool, error) { return p, nil } -// Execute one task with given payload and context, returns result and context or error. Must not be used once pool is +// Exec one task with given payload and context, returns result and context or error. Must not be used once pool is // being destroyed. -func (p *Pool) Execute(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) { +func (p *Pool) Exec(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) { p.tasks.Add(1) defer p.tasks.Done() @@ -57,7 +57,7 @@ func (p *Pool) Execute(payload []byte, ctx interface{}) (resp []byte, rCtx []byt return nil, nil, err } - if resp, rCtx, err = w.Execute(payload, ctx); err != nil { + if resp, rCtx, err = w.Exec(payload, ctx); err != nil { if !p.cfg.DestroyOnError { if err, jobError := err.(JobError); jobError { p.free <- w @@ -84,7 +84,7 @@ func (p *Pool) Execute(payload []byte, ctx interface{}) (resp []byte, rCtx []byt return p.Execute(payload, ctx) } - if p.cfg.MaxExecutions != 0 && atomic.LoadUint64(&w.NumExecutions) > p.cfg.MaxExecutions { + if p.cfg.MaxExecutions != 0 && atomic.LoadUint64(&w.numExecs) > p.cfg.MaxExecutions { p.destroyWorker(w) } else { p.free <- w @@ -143,7 +143,7 @@ func (p *Pool) allocateWorker() (*Worker, error) { timeout := time.NewTimer(p.cfg.AllocateTimeout) select { case <-timeout.C: - return nil, fmt.Errorf("unable to allocate worker, timeout (%s)", p.cfg.AllocateTimeout) + return nil, fmt.Errorf("unable to allocate worker, timeout (%st)", p.cfg.AllocateTimeout) case w := <-p.free: timeout.Stop() return w, nil @@ -170,7 +170,7 @@ func (p *Pool) destroyWorker(w *Worker) { }() } -// creates new worker (must be called in a locked state). +// creates new worker (must be called in a locked st). func (p *Pool) createWorker() (*Worker, error) { w, err := p.factory.NewWorker(p.cmd()) if err != nil { diff --git a/socket_factory.go b/socket_factory.go index 5d1b488b..28208327 100644 --- a/socket_factory.go +++ b/socket_factory.go @@ -1,11 +1,9 @@ package roadrunner import ( - "encoding/json" "fmt" "github.com/spiral/goridge" "net" - "os" "os/exec" "sync" "time" @@ -34,7 +32,7 @@ func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory { // NewWorker creates worker and connects it to appropriate relay or returns error func (f *SocketFactory) NewWorker(cmd *exec.Cmd) (w *Worker, err error) { - w, err = NewWorker(cmd) + w, err = newWorker(cmd) if err != nil { return nil, err } @@ -43,17 +41,13 @@ func (f *SocketFactory) NewWorker(cmd *exec.Cmd) (w *Worker, err error) { return nil, err } - w.Pid = &w.cmd.Process.Pid - if w.Pid == nil { - return nil, fmt.Errorf("can't to start worker %s", w) - } - rl, err := f.waitRelay(*w.Pid, f.tout) if err != nil { - return nil, fmt.Errorf("can't connect to worker %s: %s", w, err) + return nil, fmt.Errorf("unable to connect to worker: %s", err) } w.attach(rl) + w.st = newState(StateReady) return w, nil } @@ -72,7 +66,7 @@ func (f *SocketFactory) listen() { } rl := goridge.NewSocketRelay(conn) - if pid, err := fetchPID(rl); err == nil { + if pid, err := fetchPid(rl); err == nil { f.relayChan(pid) <- rl } } @@ -88,7 +82,7 @@ func (f *SocketFactory) waitRelay(pid int, tout time.Duration) (*goridge.SocketR return rl, nil case <-timer.C: - return nil, fmt.Errorf("relay timer for [%v]", pid) + return nil, fmt.Errorf("relay timeout") } } @@ -113,26 +107,3 @@ func (f *SocketFactory) cleanChan(pid int) { delete(f.wait, pid) } - -// send control command to relay and return associated Pid (or error) -func fetchPID(rl goridge.Relay) (pid int, err error) { - if err := sendCommand(rl, PidCommand{Pid: os.Getpid()}); err != nil { - return 0, err - } - - body, p, err := rl.Receive() - if !p.HasFlag(goridge.PayloadControl) { - return 0, fmt.Errorf("unexpected response, `control` header is missing") - } - - link := &PidCommand{} - if err := json.Unmarshal(body, link); err != nil { - return 0, err - } - - if link.Parent != os.Getpid() { - return 0, fmt.Errorf("integrity error, parent process does not match") - } - - return link.Pid, nil -} diff --git a/socket_factory_test.go b/socket_factory_test.go new file mode 100644 index 00000000..4cefa9b8 --- /dev/null +++ b/socket_factory_test.go @@ -0,0 +1,88 @@ +package roadrunner + +import ( + "github.com/stretchr/testify/assert" + "net" + "os/exec" + "runtime" + "testing" + "time" +) + +func TestTcpNotStarted(t *testing.T) { + ls, err := net.Listen("tcp", "localhost:9007") + defer ls.Close() + + assert.NotNil(t, ls) + assert.Nil(t, err) + + cmd := exec.Command("php", "tests/client.php", "echo", "") + assert.Nil(t, cmd.Start()) + + w, err := NewSocketFactory(ls, time.Second).NewWorker(cmd) + assert.Nil(t, w) + assert.NotNil(t, err) + + assert.Equal(t, "can't attach to running process", err.Error()) +} + +func TestTcpErrored(t *testing.T) { + ls, _ := net.Listen("tcp", "localhost:9007") + defer ls.Close() + + cmd := exec.Command("php", "tests/invalid.php") + w, err := NewSocketFactory(ls, time.Second).NewWorker(cmd) + assert.Nil(t, w) + assert.NotNil(t, err) + + assert.Equal(t, "unable to connect to worker: relay timeout", err.Error()) +} + +func TestTcpStarted(t *testing.T) { + ls, _ := net.Listen("tcp", "localhost:9007") + defer ls.Close() + + cmd := exec.Command("php", "tests/client.php", "echo", "tcp") + w, err := NewSocketFactory(ls, time.Second).NewWorker(cmd) + defer w.Stop() + + assert.NotNil(t, w) + assert.Nil(t, err) + assert.NotNil(t, *w.Pid) + assert.Equal(t, cmd.Process.Pid, *w.Pid) + assert.Equal(t, StateReady, w.st.Value()) +} + +func TestTcpEcho(t *testing.T) { + ls, _ := net.Listen("tcp", "localhost:9007") + defer ls.Close() + + cmd := exec.Command("php", "tests/client.php", "echo", "tcp") + w, err := NewSocketFactory(ls, time.Second).NewWorker(cmd) + + defer w.Stop() + + r, ctx, err := w.Exec([]byte("hello"), nil) + assert.Nil(t, err) + assert.Nil(t, ctx) + assert.Equal(t, "hello", string(r)) +} + +func TestUnixEcho(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("not supported on windows") + } + + ls, _ := net.Listen("unix", "sock.unix") + defer ls.Close() + + cmd := exec.Command("php", "tests/client.php", "echo", "unix") + w, err := NewSocketFactory(ls, time.Second).NewWorker(cmd) + + defer w.Stop() + + r, ctx, err := w.Exec([]byte("hello"), nil) + assert.Nil(t, err) + assert.Nil(t, ctx) + assert.Equal(t, "hello", string(r)) +} diff --git a/source/Worker.php b/source/Worker.php index d31b48bd..cd496403 100644 --- a/source/Worker.php +++ b/source/Worker.php @@ -136,26 +136,23 @@ class Worker return true; } - $parsed = json_decode($body, true); - if ($parsed === false) { + $p = json_decode($body, true); + if ($p === false) { throw new RoadRunnerException("invalid task context, JSON payload is expected"); } // PID negotiation (socket connections only) - if (!empty($parsed['pid'])) { - $this->relay->send(json_encode([ - 'pid' => getmypid(), - 'parent' => $parsed['pid'], - ]), Relay::PAYLOAD_CONTROL); + if (!empty($p['pid'])) { + $this->relay->send(sprintf('{"pid":%s}', getmypid()), Relay::PAYLOAD_CONTROL); } // termination request - if (!empty($parsed['terminate'])) { + if (!empty($p['stop'])) { return false; } // not a command but execution context - $context = $parsed; + $context = $p; return true; } diff --git a/state_test.go b/state_test.go new file mode 100644 index 00000000..027486f8 --- /dev/null +++ b/state_test.go @@ -0,0 +1,13 @@ +package roadrunner + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestNewState(t *testing.T) { + st := newState(StateAttached) + + assert.Equal(t, "attached", st.String()) + assert.NotEqual(t, 0, st.Updated().Unix()) +} diff --git a/tests/broken-client.php b/tests/broken-client.php deleted file mode 100644 index ed5bde20..00000000 --- a/tests/broken-client.php +++ /dev/null @@ -1,17 +0,0 @@ -<?php - -use Spiral\Goridge; -use Spiral\RoadRunner; - -/** - * echo client over pipes. - */ -ini_set('display_errors', 'stderr'); -require "vendor/autoload.php"; - -$rr = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); - -while ($in = $rr->receive($ctx)) { - echo undefined_function(); - $rr->send((string)$in); -}
\ No newline at end of file diff --git a/tests/broken.php b/tests/broken.php new file mode 100644 index 00000000..b1a3839e --- /dev/null +++ b/tests/broken.php @@ -0,0 +1,14 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ + +use Spiral\Goridge; +use Spiral\RoadRunner; + +$rr = new RoadRunner\Worker($relay); + +while ($in = $rr->receive($ctx)) { + echo undefined_function(); + $rr->send((string)$in); +}
\ No newline at end of file diff --git a/tests/client.php b/tests/client.php new file mode 100644 index 00000000..45505a29 --- /dev/null +++ b/tests/client.php @@ -0,0 +1,36 @@ +<?php + +use Spiral\Goridge; + +ini_set('display_errors', 'stderr'); +require dirname(__DIR__) . "/vendor/autoload.php"; + +if (count($argv) < 3) { + die("need 2 arguments"); +} + +list($test, $goridge) = [$argv[1], $argv[2]]; + +switch ($goridge) { + case "pipes": + $relay = new Goridge\StreamRelay(STDIN, STDOUT); + break; + + case "tcp": + $relay = new Goridge\SocketRelay("localhost", 9007); + break; + + case "unix": + $relay = new Goridge\SocketRelay( + "sock.unix", + null, + Goridge\SocketRelay::SOCK_UNIX + ); + + break; + + default: + die("invalid connect"); +} + +require_once sprintf("%s/%s.php", __DIR__, $test);
\ No newline at end of file diff --git a/tests/echo-client.php b/tests/echo.php index 22761862..ba58ff30 100644 --- a/tests/echo-client.php +++ b/tests/echo.php @@ -1,15 +1,12 @@ <?php +/** + * @var Goridge\RelayInterface $relay + */ use Spiral\Goridge; use Spiral\RoadRunner; -/** - * echo client over pipes. - */ -ini_set('display_errors', 'stderr'); -require "vendor/autoload.php"; - -$rr = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); +$rr = new RoadRunner\Worker($relay); while ($in = $rr->receive($ctx)) { try { diff --git a/tests/error-client.php b/tests/error-client.php deleted file mode 100644 index 113d1197..00000000 --- a/tests/error-client.php +++ /dev/null @@ -1,16 +0,0 @@ -<?php - -use Spiral\Goridge; -use Spiral\RoadRunner; - -/** - * echo client over pipes. - */ -ini_set('display_errors', 'stderr'); -require "vendor/autoload.php"; - -$rr = new RoadRunner\Worker(new Goridge\StreamRelay(STDIN, STDOUT)); - -while ($in = $rr->receive($ctx)) { - $rr->error((string)$in); -}
\ No newline at end of file diff --git a/tests/error.php b/tests/error.php new file mode 100644 index 00000000..ebd3418b --- /dev/null +++ b/tests/error.php @@ -0,0 +1,13 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ + +use Spiral\Goridge; +use Spiral\RoadRunner; + +$rr = new RoadRunner\Worker($relay); + +while ($in = $rr->receive($ctx)) { + $rr->error((string)$in); +}
\ No newline at end of file @@ -16,30 +16,25 @@ import ( // Worker - supervised process with api over goridge.Relay. type Worker struct { - // State current worker state. - State State - - // Last time worker State has changed - Last time.Time - - // NumExecutions how many times worker have been invoked. - NumExecutions uint64 - - // Pid contains process ID and empty until worker is started. + // Pid of the process, can be null if not started Pid *int + st State // st current worker st. + numExecs uint64 // numExecs show how many times worker have been invoked. + cmd *exec.Cmd // underlying command process err *bytes.Buffer // aggregates stderr - rl goridge.Relay // communication bus with underlying process - mu sync.RWMutex // ensures than only one execution can be run at once + + mu sync.RWMutex // ensures than only one execution can be run at once + rl goridge.Relay // communication bus with underlying process } -// NewWorker creates new worker -func NewWorker(cmd *exec.Cmd) (*Worker, error) { +// newWorker creates new worker +func newWorker(cmd *exec.Cmd) (*Worker, error) { w := &Worker{ - cmd: cmd, - err: bytes.NewBuffer(nil), - State: StateInactive, + cmd: cmd, + err: bytes.NewBuffer(nil), + st: newState(StateDisabled), } if w.cmd.Process != nil { @@ -49,22 +44,32 @@ func NewWorker(cmd *exec.Cmd) (*Worker, error) { return w, nil } +// State provides access to worker state +func (w *Worker) State() State { + return w.st +} + +// NumExecs show how many times worker have been invoked. +func (w *Worker) NumExecs() uint64 { + return w.numExecs +} + // String returns worker description. func (w *Worker) String() string { - state := w.State.String() + state := w.st.String() if w.Pid != nil { state = state + ", pid:" + strconv.Itoa(*w.Pid) } - return fmt.Sprintf("(`%s` [%s], execs: %v)", strings.Join(w.cmd.Args, " "), state, w.NumExecutions) + return fmt.Sprintf("(`%st` [%s], numExecs: %v)", strings.Join(w.cmd.Args, " "), state, w.numExecs) } // Start underlying process or return error func (w *Worker) Start() error { stderr, err := w.cmd.StderrPipe() if err != nil { - w.setState(StateError) + w.st = newState(StateError) return err } @@ -72,97 +77,106 @@ func (w *Worker) Start() error { go io.Copy(w.err, stderr) if err := w.cmd.Start(); err != nil { - w.setState(StateError) + w.st = newState(StateError) return w.mockError(err) } - w.setState(StateReady) + w.Pid = &w.cmd.Process.Pid return nil } -// Execute command and return result and result context. -func (w *Worker) Execute(body []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) { +// Stop underlying process (No timeout limit) or return error. +func (w *Worker) Stop() { w.mu.Lock() defer w.mu.Unlock() - if w.State != StateReady { - return nil, nil, fmt.Errorf("worker must be in state `waiting` (`%s` given)", w.State) + w.st = newState(StateDisabled) + + go func() { + sendCommand(w.rl, &stopCommand{Stop: true}) + }() + + w.cmd.Wait() + w.rl.Close() + + w.st = newState(StateStopped) +} + +// todo: timeout +// return syscall.Kill(-c.status.PID, syscall.SIGTERM) + +// Exec command and return result and result context. +func (w *Worker) Exec(body []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) { + if w.st.Value() != StateReady { + return nil, nil, fmt.Errorf("worker is not ready (%s)", w.st.Value()) } - w.setState(StateReady) - atomic.AddUint64(&w.NumExecutions, 1) - - if ctx != nil { - if data, err := json.Marshal(ctx); err == nil { - w.rl.Send(data, goridge.PayloadControl) - } else { - return nil, nil, fmt.Errorf("invalid context: %s", err) - } - } else { - w.rl.Send(nil, goridge.PayloadControl|goridge.PayloadEmpty) + w.mu.Lock() + defer w.mu.Unlock() + defer atomic.AddUint64(&w.numExecs, 1) + + w.st = newState(StateWorking) + + if err := w.sendPayload(ctx, goridge.PayloadControl); err != nil { + return nil, nil, fmt.Errorf("invalid context: %st", err) } w.rl.Send(body, 0) + // response header rCtx, p, err := w.rl.Receive() - if !p.HasFlag(goridge.PayloadControl) { - return nil, nil, w.mockError(fmt.Errorf("invalid response (check script integrity)")) - } - if p.HasFlag(goridge.PayloadError) { - w.setState(StateReady) + w.st = newState(StateReady) return nil, nil, JobError(rCtx) } + if !p.HasFlag(goridge.PayloadControl) { + w.st = newState(StateError) + return nil, nil, w.mockError(fmt.Errorf("invalid response (check script integrity)")) + } + + // body if resp, p, err = w.rl.Receive(); err != nil { - w.setState(StateError) - return nil, nil, w.mockError(fmt.Errorf("worker error: %s", err)) + w.st = newState(StateError) + return nil, nil, w.mockError(fmt.Errorf("worker error: %st", err)) } - w.setState(StateReady) + w.st = newState(StateReady) return resp, rCtx, nil } -// Stop underlying process or return error. -func (w *Worker) Stop() { - w.mu.Lock() - defer w.mu.Unlock() - - w.setState(StateInactive) - - go func() { - sendCommand(w.rl, &TerminateCommand{Terminate: true}) - }() - - w.cmd.Wait() - w.rl.Close() - - w.setState(StateStopped) -} - // attach payload/control relay to the worker. func (w *Worker) attach(rl goridge.Relay) { w.mu.Lock() defer w.mu.Unlock() w.rl = rl - w.setState(StateBooting) -} - -// sets worker State and it's context (non blocking!). -func (w *Worker) setState(state State) { - // safer? - w.State = state - w.Last = time.Now() + w.st = newState(StateAttached) } // mockError attaches worker specific error (from stderr) to parent error func (w *Worker) mockError(err error) WorkerError { + //w.cmd.Process.Signal(os.Kill) + + time.Sleep(time.Millisecond * 100) if w.err.Len() != 0 { return WorkerError(w.err.String()) } return WorkerError(err.Error()) } + +func (w *Worker) sendPayload(v interface{}, flags byte) error { + if v == nil { + w.rl.Send(nil, goridge.PayloadControl) + } + data, err := json.Marshal(v) + + if err != nil { + return fmt.Errorf("invalid payload: %s", err) + } + + return w.rl.Send(data, goridge.PayloadControl) +} diff --git a/worker_test.go b/worker_test.go index d4d24364..d69ab4d6 100644 --- a/worker_test.go +++ b/worker_test.go @@ -1,81 +1,46 @@ package roadrunner import ( - "github.com/spiral/goridge" "github.com/stretchr/testify/assert" - "io" "os/exec" "testing" "time" ) -func getPipes(cmd *exec.Cmd) (io.ReadCloser, io.WriteCloser) { - in, err := cmd.StdoutPipe() - if err != nil { - panic(err) - } +func TestGetState(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + w, _ := new(PipeFactory).NewWorker(cmd) + defer w.Stop() - out, err := cmd.StdinPipe() - if err != nil { - panic(err) - } - - return in, out -} - -func TestOnStarted(t *testing.T) { - pr := exec.Command("php", "tests/echo-client.php") - pr.Start() - - _, err := NewWorker(pr) - assert.NotNil(t, err) - assert.Equal(t, "can't attach to running process", err.Error()) -} - -func TestNewWorkerState(t *testing.T) { - w, err := NewWorker(exec.Command("php", "tests/echo-client.php")) - assert.Nil(t, err) - assert.Equal(t, StateInactive, w.State) - - w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) - assert.Equal(t, StateBooting, w.State) - - assert.Nil(t, w.Start()) - assert.Equal(t, StateReady, w.State) + assert.Equal(t, StateReady, w.State().Value()) } func TestStop(t *testing.T) { - w, err := NewWorker(exec.Command("php", "tests/echo-client.php")) - assert.Nil(t, err) - - w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) - assert.Nil(t, w.Start()) + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + w, _ := new(PipeFactory).NewWorker(cmd) + defer w.Stop() w.Stop() - assert.Equal(t, StateStopped, w.State) + assert.Equal(t, StateStopped, w.State().Value()) } func TestEcho(t *testing.T) { - w, err := NewWorker(exec.Command("php", "tests/echo-client.php")) - assert.Nil(t, err) - - w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) - assert.Nil(t, w.Start()) + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + w, _ := new(PipeFactory).NewWorker(cmd) + defer w.Stop() - r, ctx, err := w.Execute([]byte("hello"), nil) + r, ctx, err := w.Exec([]byte("hello"), nil) assert.Nil(t, err) assert.Nil(t, ctx) assert.Equal(t, "hello", string(r)) } func TestError(t *testing.T) { - w, err := NewWorker(exec.Command("php", "tests/error-client.php")) - assert.Nil(t, err) - - w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) - assert.Nil(t, w.Start()) + cmd := exec.Command("php", "tests/client.php", "error", "pipes") + w, _ := new(PipeFactory).NewWorker(cmd) + defer w.Stop() - r, ctx, err := w.Execute([]byte("hello"), nil) + r, ctx, err := w.Exec([]byte("hello"), nil) assert.Nil(t, r) assert.NotNil(t, err) assert.Nil(t, ctx) @@ -85,13 +50,11 @@ func TestError(t *testing.T) { } func TestBroken(t *testing.T) { - w, err := NewWorker(exec.Command("php", "tests/broken-client.php")) - assert.Nil(t, err) + cmd := exec.Command("php", "tests/client.php", "broken", "pipes") + w, _ := new(PipeFactory).NewWorker(cmd) + defer w.Stop() - w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) - assert.Nil(t, w.Start()) - - r, ctx, err := w.Execute([]byte("hello"), nil) + r, ctx, err := w.Exec([]byte("hello"), nil) assert.Nil(t, r) assert.NotNil(t, err) assert.Nil(t, ctx) @@ -100,31 +63,27 @@ func TestBroken(t *testing.T) { assert.Contains(t, err.Error(), "undefined_function()") } -func TestNumExecutions(t *testing.T) { - w, err := NewWorker(exec.Command("php", "tests/echo-client.php")) - assert.Nil(t, err) - - w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) - assert.Nil(t, w.Start()) +func TestCalled(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + w, _ := new(PipeFactory).NewWorker(cmd) + defer w.Stop() - w.Execute([]byte("hello"), nil) - assert.Equal(t, uint64(1), w.NumExecutions) + w.Exec([]byte("hello"), nil) + assert.Equal(t, uint64(1), w.NumExecs()) - w.Execute([]byte("hello"), nil) - assert.Equal(t, uint64(2), w.NumExecutions) + w.Exec([]byte("hello"), nil) + assert.Equal(t, uint64(2), w.NumExecs()) - w.Execute([]byte("hello"), nil) - assert.Equal(t, uint64(3), w.NumExecutions) + w.Exec([]byte("hello"), nil) + assert.Equal(t, uint64(3), w.NumExecs()) } -func TestLastExecution(t *testing.T) { - w, err := NewWorker(exec.Command("php", "tests/echo-client.php")) - assert.Nil(t, err) - - w.attach(goridge.NewPipeRelay(getPipes(w.cmd))) - assert.Nil(t, w.Start()) +func TestStateChanged(t *testing.T) { + cmd := exec.Command("php", "tests/client.php", "echo", "pipes") + w, _ := new(PipeFactory).NewWorker(cmd) + defer w.Stop() tm := time.Now() - w.Execute([]byte("hello"), nil) - assert.True(t, w.Last.After(tm)) + w.Exec([]byte("hello"), nil) + assert.True(t, w.State().Updated().After(tm)) } |