summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2019-05-03 13:08:49 +0300
committerWolfy-J <[email protected]>2019-05-03 13:08:49 +0300
commit1e38f34d0d4bca699bd2025dddeb6c66587b3246 (patch)
tree5fff0a61ca85e35d08623ccb6e8148491d39aa3f
parent457ec8eac0f5267e61871de87c3f4daa9f595be0 (diff)
testing watchers
-rw-r--r--pool.go2
-rw-r--r--static_pool.go2
-rw-r--r--static_pool_test.go5
-rw-r--r--tests/pid.php26
-rw-r--r--tests/slow-pid.php18
-rw-r--r--watcher_test.go76
6 files changed, 112 insertions, 17 deletions
diff --git a/pool.go b/pool.go
index 2e59e154..13e99fa8 100644
--- a/pool.go
+++ b/pool.go
@@ -31,7 +31,7 @@ type Pool interface {
// Workers returns worker list associated with the pool.
Workers() (workers []*Worker)
- // Remove forces pool to destroy specific worker.
+ // Remove forces pool to remove specific worker.
Remove(w *Worker, err error)
// Destroy all underlying workers (but let them to complete the task).
diff --git a/static_pool.go b/static_pool.go
index eae9e8a2..d81ef7e2 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -114,7 +114,7 @@ func (p *StaticPool) Workers() (workers []*Worker) {
return workers
}
-// Remove forces pool to destroy specific worker.
+// Remove forces pool to remove specific worker.
func (p *StaticPool) Remove(w *Worker, err error) {
if w.State().Value() != StateReady && w.State().Value() != StateWorking {
// unable to remove inactive worker
diff --git a/static_pool_test.go b/static_pool_test.go
index b1a27990..a7e71fdb 100644
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -381,10 +381,11 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
DestroyTimeout: time.Second,
},
)
- p.Destroy()
- assert.NotNil(t, p)
assert.NoError(t, err)
+ assert.NotNil(t, p)
+
+ p.Destroy()
}
func Benchmark_Pool_Allocate(b *testing.B) {
diff --git a/tests/pid.php b/tests/pid.php
index a8cfa229..bc1928a6 100644
--- a/tests/pid.php
+++ b/tests/pid.php
@@ -1,17 +1,17 @@
<?php
-/**
- * @var Goridge\RelayInterface $relay
- */
+ /**
+ * @var Goridge\RelayInterface $relay
+ */
-use Spiral\Goridge;
-use Spiral\RoadRunner;
+ use Spiral\Goridge;
+ use Spiral\RoadRunner;
-$rr = new RoadRunner\Worker($relay);
+ $rr = new RoadRunner\Worker($relay);
-while ($in = $rr->receive($ctx)) {
- try {
- $rr->send((string)getmypid());
- } catch (\Throwable $e) {
- $rr->error((string)$e);
- }
-} \ No newline at end of file
+ while ($in = $rr->receive($ctx)) {
+ try {
+ $rr->send((string)getmypid());
+ } catch (\Throwable $e) {
+ $rr->error((string)$e);
+ }
+ } \ No newline at end of file
diff --git a/tests/slow-pid.php b/tests/slow-pid.php
new file mode 100644
index 00000000..daaf2583
--- /dev/null
+++ b/tests/slow-pid.php
@@ -0,0 +1,18 @@
+<?php
+ /**
+ * @var Goridge\RelayInterface $relay
+ */
+
+ use Spiral\Goridge;
+ use Spiral\RoadRunner;
+
+ $rr = new RoadRunner\Worker($relay);
+
+ while ($in = $rr->receive($ctx)) {
+ try {
+ sleep(1);
+ $rr->send((string)getmypid());
+ } catch (\Throwable $e) {
+ $rr->error((string)$e);
+ }
+ } \ No newline at end of file
diff --git a/watcher_test.go b/watcher_test.go
index 8a5821bd..4a46eaf1 100644
--- a/watcher_test.go
+++ b/watcher_test.go
@@ -1,6 +1,7 @@
package roadrunner
import (
+ "fmt"
"github.com/stretchr/testify/assert"
"runtime"
"testing"
@@ -29,6 +30,10 @@ func (w *eWatcher) Detach() {
}
}
+func (w *eWatcher) remove(wr *Worker, err error) {
+ w.p.Remove(wr, err)
+}
+
func Test_WatcherWatch(t *testing.T) {
rr := NewServer(
&ServerConfig{
@@ -132,3 +137,74 @@ func Test_WatcherAttachDetachSequence(t *testing.T) {
assert.Equal(t, "hello", res.String())
}
+
+func Test_RemoveWorkerOnAllocation(t *testing.T) {
+ rr := NewServer(
+ &ServerConfig{
+ Command: "php tests/client.php pid pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer rr.Stop()
+
+ rr.Watch(&eWatcher{})
+ assert.NoError(t, rr.Start())
+
+ wr := rr.Workers()[0]
+
+ res, err := rr.Exec(&Payload{Body: []byte("hello")})
+ assert.NoError(t, err)
+ assert.Equal(t, fmt.Sprintf("%v", *wr.Pid), res.String())
+ lastPid := res.String()
+
+ rr.pWatcher.(*eWatcher).remove(wr, nil)
+
+ res, err = rr.Exec(&Payload{Body: []byte("hello")})
+ assert.NoError(t, err)
+ assert.NotEqual(t, lastPid, res.String())
+
+ assert.NotEqual(t, StateReady, wr.state.Value())
+}
+
+func Test_RemoveWorkerAfterTask(t *testing.T) {
+ rr := NewServer(
+ &ServerConfig{
+ Command: "php tests/client.php slow-pid pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer rr.Stop()
+
+ rr.Watch(&eWatcher{})
+ assert.NoError(t, rr.Start())
+
+ wr := rr.Workers()[0]
+ lastPid := ""
+
+ wait := make(chan interface{})
+ go func() {
+ res, err := rr.Exec(&Payload{Body: []byte("hello")})
+ assert.NoError(t, err)
+ assert.Equal(t, fmt.Sprintf("%v", *wr.Pid), res.String())
+ lastPid = res.String()
+
+ close(wait)
+ }()
+
+ // wait for worker execution to be in progress
+ time.Sleep(time.Millisecond * 250)
+ rr.pWatcher.(*eWatcher).remove(wr, nil)
+
+ <-wait
+
+ // must be replaced
+ assert.NotEqual(t, lastPid, fmt.Sprintf("%v", rr.Workers()[0]))
+}