summaryrefslogtreecommitdiff
path: root/controller_test.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2019-05-04 19:24:25 +0300
committerWolfy-J <[email protected]>2019-05-04 19:24:25 +0300
commit2efc533f2aac215d487a80020b0f9bf4ae5209c3 (patch)
treea80a7a74dc4ca8c290b8b1bf1f6d24535b5ae3d7 /controller_test.go
parent726b31008e73ab83d0582305c28a8cf62322e47a (diff)
watchers renamed to controllers
Diffstat (limited to 'controller_test.go')
-rw-r--r--controller_test.go216
1 files changed, 216 insertions, 0 deletions
diff --git a/controller_test.go b/controller_test.go
new file mode 100644
index 00000000..031c2f31
--- /dev/null
+++ b/controller_test.go
@@ -0,0 +1,216 @@
+package roadrunner
+
+import (
+ "fmt"
+ "github.com/stretchr/testify/assert"
+ "runtime"
+ "testing"
+ "time"
+)
+
+type eWatcher struct {
+ p Pool
+ onAttach func(p Pool)
+ onDetach func(p Pool)
+}
+
+func (w *eWatcher) Attach(p Pool) Controller {
+ wp := &eWatcher{p: p, onAttach: w.onAttach, onDetach: w.onDetach}
+
+ if wp.onAttach != nil {
+ wp.onAttach(p)
+ }
+
+ return wp
+}
+
+func (w *eWatcher) Detach() {
+ if w.onDetach != nil {
+ w.onDetach(w.p)
+ }
+}
+
+func (w *eWatcher) remove(wr *Worker, err error) {
+ w.p.Remove(wr, err)
+}
+
+func Test_WatcherWatch(t *testing.T) {
+ rr := NewServer(
+ &ServerConfig{
+ Command: "php tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: int64(runtime.NumCPU()),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer rr.Stop()
+
+ rr.Watch(&eWatcher{})
+ assert.NoError(t, rr.Start())
+
+ assert.NotNil(t, rr.pController)
+ assert.Equal(t, rr.pController.(*eWatcher).p, rr.pool)
+
+ res, err := rr.Exec(&Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_WatcherReattach(t *testing.T) {
+ rr := NewServer(
+ &ServerConfig{
+ Command: "php tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: int64(runtime.NumCPU()),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer rr.Stop()
+
+ rr.Watch(&eWatcher{})
+ assert.NoError(t, rr.Start())
+
+ assert.NotNil(t, rr.pController)
+ assert.Equal(t, rr.pController.(*eWatcher).p, rr.pool)
+
+ oldWatcher := rr.pController
+
+ assert.NoError(t, rr.Reset())
+
+ assert.NotNil(t, rr.pController)
+ assert.Equal(t, rr.pController.(*eWatcher).p, rr.pool)
+ assert.NotEqual(t, oldWatcher, rr.pController)
+
+ res, err := rr.Exec(&Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_WatcherAttachDetachSequence(t *testing.T) {
+ rr := NewServer(
+ &ServerConfig{
+ Command: "php tests/client.php echo pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: int64(runtime.NumCPU()),
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer rr.Stop()
+
+ var attachedPool Pool
+
+ rr.Watch(&eWatcher{
+ onAttach: func(p Pool) {
+ attachedPool = p
+ },
+ onDetach: func(p Pool) {
+ assert.Equal(t, attachedPool, p)
+ },
+ })
+ assert.NoError(t, rr.Start())
+
+ assert.NotNil(t, rr.pController)
+ assert.Equal(t, rr.pController.(*eWatcher).p, rr.pool)
+
+ res, err := rr.Exec(&Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.Equal(t, "hello", res.String())
+}
+
+func Test_RemoveWorkerOnAllocation(t *testing.T) {
+ rr := NewServer(
+ &ServerConfig{
+ Command: "php tests/client.php pid pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer rr.Stop()
+
+ rr.Watch(&eWatcher{})
+ assert.NoError(t, rr.Start())
+
+ wr := rr.Workers()[0]
+
+ res, err := rr.Exec(&Payload{Body: []byte("hello")})
+ assert.NoError(t, err)
+ assert.Equal(t, fmt.Sprintf("%v", *wr.Pid), res.String())
+ lastPid := res.String()
+
+ rr.pController.(*eWatcher).remove(wr, nil)
+
+ res, err = rr.Exec(&Payload{Body: []byte("hello")})
+ assert.NoError(t, err)
+ assert.NotEqual(t, lastPid, res.String())
+
+ assert.NotEqual(t, StateReady, wr.state.Value())
+
+ _, ok := rr.pool.(*StaticPool).remove.Load(wr)
+ assert.False(t, ok)
+}
+
+func Test_RemoveWorkerAfterTask(t *testing.T) {
+ rr := NewServer(
+ &ServerConfig{
+ Command: "php tests/client.php slow-pid pipes",
+ Relay: "pipes",
+ Pool: &Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ })
+ defer rr.Stop()
+
+ rr.Watch(&eWatcher{})
+ assert.NoError(t, rr.Start())
+
+ wr := rr.Workers()[0]
+ lastPid := ""
+
+ wait := make(chan interface{})
+ go func() {
+ res, err := rr.Exec(&Payload{Body: []byte("hello")})
+ assert.NoError(t, err)
+ assert.Equal(t, fmt.Sprintf("%v", *wr.Pid), res.String())
+ lastPid = res.String()
+
+ close(wait)
+ }()
+
+ // wait for worker execution to be in progress
+ time.Sleep(time.Millisecond * 250)
+ rr.pController.(*eWatcher).remove(wr, nil)
+
+ <-wait
+
+ // must be replaced
+ assert.NotEqual(t, lastPid, fmt.Sprintf("%v", rr.Workers()[0]))
+
+ // must not be registered within the pool
+ rr.pController.(*eWatcher).remove(wr, nil)
+}