summaryrefslogtreecommitdiff
path: root/watcher_test.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2019-05-03 13:08:49 +0300
committerWolfy-J <[email protected]>2019-05-03 13:08:49 +0300
commit1e38f34d0d4bca699bd2025dddeb6c66587b3246 (patch)
tree5fff0a61ca85e35d08623ccb6e8148491d39aa3f /watcher_test.go
parent457ec8eac0f5267e61871de87c3f4daa9f595be0 (diff)
testing watchers
Diffstat (limited to 'watcher_test.go')
-rw-r--r--watcher_test.go76
1 files changed, 76 insertions, 0 deletions
diff --git a/watcher_test.go b/watcher_test.go
index 8a5821bd..4a46eaf1 100644
--- a/watcher_test.go
+++ b/watcher_test.go
@@ -1,6 +1,7 @@
package roadrunner
import (
+ "fmt"
"github.com/stretchr/testify/assert"
"runtime"
"testing"
@@ -29,6 +30,10 @@ func (w *eWatcher) Detach() {
}
}
+func (w *eWatcher) remove(wr *Worker, err error) {
+ w.p.Remove(wr, err)
+}
+
func Test_WatcherWatch(t *testing.T) {
rr := NewServer(
&ServerConfig{
@@ -132,3 +137,74 @@ func Test_WatcherAttachDetachSequence(t *testing.T) {
assert.Equal(t, "hello", res.String())
}
+
+func Test_RemoveWorkerOnAllocation(t *testing.T) {
+ rr := NewServer(
+ &ServerConfig{
+ Command: "php tests/client.php pid pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer rr.Stop()
+
+ rr.Watch(&eWatcher{})
+ assert.NoError(t, rr.Start())
+
+ wr := rr.Workers()[0]
+
+ res, err := rr.Exec(&Payload{Body: []byte("hello")})
+ assert.NoError(t, err)
+ assert.Equal(t, fmt.Sprintf("%v", *wr.Pid), res.String())
+ lastPid := res.String()
+
+ rr.pWatcher.(*eWatcher).remove(wr, nil)
+
+ res, err = rr.Exec(&Payload{Body: []byte("hello")})
+ assert.NoError(t, err)
+ assert.NotEqual(t, lastPid, res.String())
+
+ assert.NotEqual(t, StateReady, wr.state.Value())
+}
+
+func Test_RemoveWorkerAfterTask(t *testing.T) {
+ rr := NewServer(
+ &ServerConfig{
+ Command: "php tests/client.php slow-pid pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer rr.Stop()
+
+ rr.Watch(&eWatcher{})
+ assert.NoError(t, rr.Start())
+
+ wr := rr.Workers()[0]
+ lastPid := ""
+
+ wait := make(chan interface{})
+ go func() {
+ res, err := rr.Exec(&Payload{Body: []byte("hello")})
+ assert.NoError(t, err)
+ assert.Equal(t, fmt.Sprintf("%v", *wr.Pid), res.String())
+ lastPid = res.String()
+
+ close(wait)
+ }()
+
+ // wait for worker execution to be in progress
+ time.Sleep(time.Millisecond * 250)
+ rr.pWatcher.(*eWatcher).remove(wr, nil)
+
+ <-wait
+
+ // must be replaced
+ assert.NotEqual(t, lastPid, fmt.Sprintf("%v", rr.Workers()[0]))
+}