diff options
-rw-r--r-- | pool_test.go | 37 | ||||
-rw-r--r-- | source/Worker.php | 9 | ||||
-rw-r--r-- | tests/stop.php | 25 |
3 files changed, 66 insertions, 5 deletions
diff --git a/pool_test.go b/pool_test.go index 76dc233e..121191ee 100644 --- a/pool_test.go +++ b/pool_test.go @@ -188,8 +188,6 @@ func Test_Pool_AllocateTimeout(t *testing.T) { p.Destroy() } -//todo: termiante - func Test_Pool_Replace_Worker(t *testing.T) { p, err := NewPool( func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") }, @@ -225,6 +223,41 @@ func Test_Pool_Replace_Worker(t *testing.T) { } } +// identical to replace but controlled on worker side +func Test_Pool_Stop_Worker(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "tests/client.php", "stop", "pipes") }, + NewPipeFactory(), + Config{ + NumWorkers: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + defer p.Destroy() + + assert.NotNil(t, p) + assert.NoError(t, err) + + var lastPID string + lastPID = strconv.Itoa(*p.Workers()[0].Pid) + + res, err := p.Exec(&Payload{Body: []byte("hello")}) + assert.Equal(t, lastPID, string(res.Body)) + + for i := 0; i < 10; i++ { + res, err := p.Exec(&Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Head) + + assert.NotEqual(t, lastPID, string(res.Body)) + lastPID = string(res.Body) + } +} + func Benchmark_Pool_Allocate(b *testing.B) { p, _ := NewPool( func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, diff --git a/source/Worker.php b/source/Worker.php index 8fdf3070..474101b4 100644 --- a/source/Worker.php +++ b/source/Worker.php @@ -24,7 +24,7 @@ use Spiral\RoadRunner\Exceptions\RoadRunnerException; class Worker { // Send as response context to request worker termination - const TERMINATE = '{"stop": true}'; + const STOP = '{"stop": true}'; /** @var Relay */ private $relay; @@ -107,13 +107,16 @@ class Worker /** * Terminate the process. Server must automatically pass task to the next available process. - * Worker will receive TerminateCommand context after calling this method. + * Worker will receive StopCommand context after calling this method. + * + * Attention, you MUST use continue; after invoking this method to let rr to properly + * stop worker. * * @throws GoridgeException */ public function stop() { - $this->send(null, self::TERMINATE); + $this->send(null, self::STOP); } /** diff --git a/tests/stop.php b/tests/stop.php new file mode 100644 index 00000000..caa485d6 --- /dev/null +++ b/tests/stop.php @@ -0,0 +1,25 @@ +<?php +/** + * @var Goridge\RelayInterface $relay + */ + +use Spiral\Goridge; +use Spiral\RoadRunner; + +$rr = new RoadRunner\Worker($relay); + +$used = false; +while ($in = $rr->receive($ctx)) { + try { + if ($used) { + // kill on second attempt + $rr->stop(); + continue; + } + + $used = true; + $rr->send((string)getmypid()); + } catch (\Throwable $e) { + $rr->error((string)$e); + } +}
\ No newline at end of file |