diff options
author | Wolfy-J <[email protected]> | 2018-06-05 23:17:14 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-06-05 23:17:14 +0300 |
commit | e594c7070aad609c4caeda760671aca00e638561 (patch) | |
tree | b7ecb76ceeba88e03635c238a67f237452c20524 | |
parent | 6adaf713b47c9a3ab3a516e21d2d4ecf7f2075d6 (diff) |
fixing controlled descruction
-rw-r--r-- | config_test.go (renamed from ext/config_test.go) | 2 | ||||
-rw-r--r-- | errors_test.go (renamed from ext/errors_test.go) | 2 | ||||
-rw-r--r-- | pipe_factory_test.go (renamed from ext/pipe_factory_test.go) | 2 | ||||
-rw-r--r-- | protocol_test.go (renamed from ext/protocol_test.go) | 2 | ||||
-rw-r--r-- | socket_factory_test.go (renamed from ext/socket_factory_test.go) | 2 | ||||
-rw-r--r-- | state.go | 5 | ||||
-rw-r--r-- | static_pool_test.go | 137 | ||||
-rw-r--r-- | worker.go | 17 | ||||
-rw-r--r-- | worker_test.go | 3 |
9 files changed, 82 insertions, 90 deletions
diff --git a/ext/config_test.go b/config_test.go index fbdde223..f4c6246d 100644 --- a/ext/config_test.go +++ b/config_test.go @@ -1,4 +1,4 @@ -package ext +package roadrunner import ( "github.com/stretchr/testify/assert" diff --git a/ext/errors_test.go b/errors_test.go index 7c9d7a5b..9b0fa53e 100644 --- a/ext/errors_test.go +++ b/errors_test.go @@ -1,4 +1,4 @@ -package ext +package roadrunner import ( "github.com/stretchr/testify/assert" diff --git a/ext/pipe_factory_test.go b/pipe_factory_test.go index 434c31b0..ae276ab6 100644 --- a/ext/pipe_factory_test.go +++ b/pipe_factory_test.go @@ -1,4 +1,4 @@ -package ext +package roadrunner import ( "github.com/stretchr/testify/assert" diff --git a/ext/protocol_test.go b/protocol_test.go index f6410ef5..ed3fe461 100644 --- a/ext/protocol_test.go +++ b/protocol_test.go @@ -1,4 +1,4 @@ -package ext +package roadrunner import ( "github.com/pkg/errors" diff --git a/ext/socket_factory_test.go b/socket_factory_test.go index 2d6108bc..f6b1350c 100644 --- a/ext/socket_factory_test.go +++ b/socket_factory_test.go @@ -1,4 +1,4 @@ -package ext +package roadrunner import ( "github.com/stretchr/testify/assert" @@ -24,14 +24,13 @@ type State interface { const ( // StateInactive - no associated process StateInactive int64 = iota + // StateReady - ready for job. StateReady + // StateWorking - working on given payload. StateWorking - // StateDestructing process is being destructed. - StateDestructing - // StateStopping - process is being softly stopped. StateStopping diff --git a/static_pool_test.go b/static_pool_test.go index 7e2315d7..5fe7e062 100644 --- a/static_pool_test.go +++ b/static_pool_test.go @@ -187,6 +187,9 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { p.Observe(func(e int, ctx interface{}) { if err, ok := ctx.(error); ok { assert.Contains(t, err.Error(), "exit status 1") + } + + if e == EventWorkerCreate { close(destructed) } }) @@ -200,73 +203,73 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { } } -// -//func Test_StaticPool_AllocateTimeout(t *testing.T) { -// p, err := NewPool( -// func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "delay", "pipes") }, -// NewPipeFactory(), -// Config{ -// NumWorkers: 1, -// AllocateTimeout: time.Millisecond * 50, -// DestroyTimeout: time.Second, -// }, -// ) -// -// assert.NotNil(t, p) -// assert.NoError(t, err) -// -// done := make(chan interface{}) -// go func() { -// _, err := p.Exec(&Payload{Body: []byte("100")}) -// assert.NoError(t, err) -// close(done) -// }() -// -// // to ensure that worker is already busy -// time.Sleep(time.Millisecond * 10) -// -// _, err = p.Exec(&Payload{Body: []byte("10")}) -// assert.Error(t, err) -// assert.Contains(t, err.Error(), "worker timeout") -// -// <-done -// p.Destroy() -//} -// -//func Test_StaticPool_Replace_Worker(t *testing.T) { -// p, err := NewPool( -// func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "pid", "pipes") }, -// NewPipeFactory(), -// Config{ -// NumWorkers: 1, -// MaxExecutions: 1, -// AllocateTimeout: time.Second, -// DestroyTimeout: time.Second, -// }, -// ) -// defer p.Destroy() -// -// assert.NotNil(t, p) -// assert.NoError(t, err) -// -// var lastPID string -// lastPID = strconv.Itoa(*p.Workers()[0].Pid) -// -// res, err := p.Exec(&Payload{Body: []byte("hello")}) -// assert.Equal(t, lastPID, string(res.Body)) -// -// for i := 0; i < 10; i++ { -// res, err := p.Exec(&Payload{Body: []byte("hello")}) -// -// assert.NoError(t, err) -// assert.NotNil(t, res) -// assert.NotNil(t, res.Body) -// assert.Nil(t, res.Context) -// -// assert.NotEqual(t, lastPID, string(res.Body)) -// lastPID = string(res.Body) -// } -//} + +func Test_StaticPool_AllocateTimeout(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "delay", "pipes") }, + NewPipeFactory(), + Config{ + NumWorkers: 1, + AllocateTimeout: time.Millisecond * 50, + DestroyTimeout: time.Second, + }, + ) + + assert.NotNil(t, p) + assert.NoError(t, err) + + done := make(chan interface{}) + go func() { + _, err := p.Exec(&Payload{Body: []byte("100")}) + assert.NoError(t, err) + close(done) + }() + + // to ensure that worker is already busy + time.Sleep(time.Millisecond * 10) + + _, err = p.Exec(&Payload{Body: []byte("10")}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "worker timeout") + + <-done + p.Destroy() +} + +func Test_StaticPool_Replace_Worker(t *testing.T) { + p, err := NewPool( + func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "pid", "pipes") }, + NewPipeFactory(), + Config{ + NumWorkers: 1, + MaxExecutions: 1, + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + }, + ) + defer p.Destroy() + + assert.NotNil(t, p) + assert.NoError(t, err) + + var lastPID string + lastPID = strconv.Itoa(*p.Workers()[0].Pid) + + res, err := p.Exec(&Payload{Body: []byte("hello")}) + assert.Equal(t, lastPID, string(res.Body)) + + for i := 0; i < 10; i++ { + res, err := p.Exec(&Payload{Body: []byte("hello")}) + + assert.NoError(t, err) + assert.NotNil(t, res) + assert.NotNil(t, res.Body) + assert.Nil(t, res.Context) + + assert.NotEqual(t, lastPID, string(res.Body)) + lastPID = string(res.Body) + } +} // identical to replace but controlled on worker side func Test_StaticPool_Stop_Worker(t *testing.T) { @@ -114,7 +114,7 @@ func (w *Worker) Wait() error { return nil } - if w.state.Value() != StateDestructing { + if w.state.Value() != StateStopping { w.state.set(StateErrored) } else { w.state.set(StateStopped) @@ -130,17 +130,6 @@ func (w *Worker) Wait() error { // Stop sends soft termination command to the worker and waits for process completion. func (w *Worker) Stop() error { - return w.doStop(StateStopping) -} - -// Destroy is identical to stop command but does mark workers with different state. Destroyed workers won't -// throw error state on completion of process destruction (exit status). -func (w *Worker) Destroy() error { - return w.doStop(StateDestructing) -} - -// actual stopping. -func (w *Worker) doStop(state int64) error { select { case <-w.waitDone: return nil @@ -148,7 +137,7 @@ func (w *Worker) doStop(state int64) error { w.mu.Lock() defer w.mu.Unlock() - w.state.set(state) + w.state.set(StateStopping) err := sendPayload(w.rl, &stopCommand{Stop: true}) <-w.waitDone @@ -163,7 +152,7 @@ func (w *Worker) Kill() error { case <-w.waitDone: return nil default: - w.state.set(StateDestructing) + w.state.set(StateStopping) err := w.cmd.Process.Signal(os.Kill) <-w.waitDone diff --git a/worker_test.go b/worker_test.go index 362db1f6..cb88d797 100644 --- a/worker_test.go +++ b/worker_test.go @@ -13,6 +13,7 @@ func Test_GetState(t *testing.T) { w, err := NewPipeFactory().SpawnWorker(cmd) go func() { assert.NoError(t, w.Wait()) + assert.Equal(t, StateStopped, w.State().Value()) }() assert.NoError(t, err) @@ -20,7 +21,7 @@ func Test_GetState(t *testing.T) { assert.Equal(t, StateReady, w.State().Value()) w.Stop() - assert.Equal(t, StateStopped, w.State().Value()) + assert.Equal(t, StateStopping, w.State().Value()) } func Test_Echo(t *testing.T) { |