summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--config_test.go (renamed from ext/config_test.go)2
-rw-r--r--errors_test.go (renamed from ext/errors_test.go)2
-rw-r--r--pipe_factory_test.go (renamed from ext/pipe_factory_test.go)2
-rw-r--r--protocol_test.go (renamed from ext/protocol_test.go)2
-rw-r--r--socket_factory_test.go (renamed from ext/socket_factory_test.go)2
-rw-r--r--state.go5
-rw-r--r--static_pool_test.go137
-rw-r--r--worker.go17
-rw-r--r--worker_test.go3
9 files changed, 82 insertions, 90 deletions
diff --git a/ext/config_test.go b/config_test.go
index fbdde223..f4c6246d 100644
--- a/ext/config_test.go
+++ b/config_test.go
@@ -1,4 +1,4 @@
-package ext
+package roadrunner
import (
"github.com/stretchr/testify/assert"
diff --git a/ext/errors_test.go b/errors_test.go
index 7c9d7a5b..9b0fa53e 100644
--- a/ext/errors_test.go
+++ b/errors_test.go
@@ -1,4 +1,4 @@
-package ext
+package roadrunner
import (
"github.com/stretchr/testify/assert"
diff --git a/ext/pipe_factory_test.go b/pipe_factory_test.go
index 434c31b0..ae276ab6 100644
--- a/ext/pipe_factory_test.go
+++ b/pipe_factory_test.go
@@ -1,4 +1,4 @@
-package ext
+package roadrunner
import (
"github.com/stretchr/testify/assert"
diff --git a/ext/protocol_test.go b/protocol_test.go
index f6410ef5..ed3fe461 100644
--- a/ext/protocol_test.go
+++ b/protocol_test.go
@@ -1,4 +1,4 @@
-package ext
+package roadrunner
import (
"github.com/pkg/errors"
diff --git a/ext/socket_factory_test.go b/socket_factory_test.go
index 2d6108bc..f6b1350c 100644
--- a/ext/socket_factory_test.go
+++ b/socket_factory_test.go
@@ -1,4 +1,4 @@
-package ext
+package roadrunner
import (
"github.com/stretchr/testify/assert"
diff --git a/state.go b/state.go
index 6c27c4c1..7bd5437d 100644
--- a/state.go
+++ b/state.go
@@ -24,14 +24,13 @@ type State interface {
const (
// StateInactive - no associated process
StateInactive int64 = iota
+
// StateReady - ready for job.
StateReady
+
// StateWorking - working on given payload.
StateWorking
- // StateDestructing process is being destructed.
- StateDestructing
-
// StateStopping - process is being softly stopped.
StateStopping
diff --git a/static_pool_test.go b/static_pool_test.go
index 7e2315d7..5fe7e062 100644
--- a/static_pool_test.go
+++ b/static_pool_test.go
@@ -187,6 +187,9 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
p.Observe(func(e int, ctx interface{}) {
if err, ok := ctx.(error); ok {
assert.Contains(t, err.Error(), "exit status 1")
+ }
+
+ if e == EventWorkerCreate {
close(destructed)
}
})
@@ -200,73 +203,73 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
}
}
-//
-//func Test_StaticPool_AllocateTimeout(t *testing.T) {
-// p, err := NewPool(
-// func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "delay", "pipes") },
-// NewPipeFactory(),
-// Config{
-// NumWorkers: 1,
-// AllocateTimeout: time.Millisecond * 50,
-// DestroyTimeout: time.Second,
-// },
-// )
-//
-// assert.NotNil(t, p)
-// assert.NoError(t, err)
-//
-// done := make(chan interface{})
-// go func() {
-// _, err := p.Exec(&Payload{Body: []byte("100")})
-// assert.NoError(t, err)
-// close(done)
-// }()
-//
-// // to ensure that worker is already busy
-// time.Sleep(time.Millisecond * 10)
-//
-// _, err = p.Exec(&Payload{Body: []byte("10")})
-// assert.Error(t, err)
-// assert.Contains(t, err.Error(), "worker timeout")
-//
-// <-done
-// p.Destroy()
-//}
-//
-//func Test_StaticPool_Replace_Worker(t *testing.T) {
-// p, err := NewPool(
-// func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "pid", "pipes") },
-// NewPipeFactory(),
-// Config{
-// NumWorkers: 1,
-// MaxExecutions: 1,
-// AllocateTimeout: time.Second,
-// DestroyTimeout: time.Second,
-// },
-// )
-// defer p.Destroy()
-//
-// assert.NotNil(t, p)
-// assert.NoError(t, err)
-//
-// var lastPID string
-// lastPID = strconv.Itoa(*p.Workers()[0].Pid)
-//
-// res, err := p.Exec(&Payload{Body: []byte("hello")})
-// assert.Equal(t, lastPID, string(res.Body))
-//
-// for i := 0; i < 10; i++ {
-// res, err := p.Exec(&Payload{Body: []byte("hello")})
-//
-// assert.NoError(t, err)
-// assert.NotNil(t, res)
-// assert.NotNil(t, res.Body)
-// assert.Nil(t, res.Context)
-//
-// assert.NotEqual(t, lastPID, string(res.Body))
-// lastPID = string(res.Body)
-// }
-//}
+
+func Test_StaticPool_AllocateTimeout(t *testing.T) {
+ p, err := NewPool(
+ func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "delay", "pipes") },
+ NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ AllocateTimeout: time.Millisecond * 50,
+ DestroyTimeout: time.Second,
+ },
+ )
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+
+ done := make(chan interface{})
+ go func() {
+ _, err := p.Exec(&Payload{Body: []byte("100")})
+ assert.NoError(t, err)
+ close(done)
+ }()
+
+ // to ensure that worker is already busy
+ time.Sleep(time.Millisecond * 10)
+
+ _, err = p.Exec(&Payload{Body: []byte("10")})
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "worker timeout")
+
+ <-done
+ p.Destroy()
+}
+
+func Test_StaticPool_Replace_Worker(t *testing.T) {
+ p, err := NewPool(
+ func() *exec.Cmd { return exec.Command("php", "php-src/tests/client.php", "pid", "pipes") },
+ NewPipeFactory(),
+ Config{
+ NumWorkers: 1,
+ MaxExecutions: 1,
+ AllocateTimeout: time.Second,
+ DestroyTimeout: time.Second,
+ },
+ )
+ defer p.Destroy()
+
+ assert.NotNil(t, p)
+ assert.NoError(t, err)
+
+ var lastPID string
+ lastPID = strconv.Itoa(*p.Workers()[0].Pid)
+
+ res, err := p.Exec(&Payload{Body: []byte("hello")})
+ assert.Equal(t, lastPID, string(res.Body))
+
+ for i := 0; i < 10; i++ {
+ res, err := p.Exec(&Payload{Body: []byte("hello")})
+
+ assert.NoError(t, err)
+ assert.NotNil(t, res)
+ assert.NotNil(t, res.Body)
+ assert.Nil(t, res.Context)
+
+ assert.NotEqual(t, lastPID, string(res.Body))
+ lastPID = string(res.Body)
+ }
+}
// identical to replace but controlled on worker side
func Test_StaticPool_Stop_Worker(t *testing.T) {
diff --git a/worker.go b/worker.go
index ab0af052..5db5ed82 100644
--- a/worker.go
+++ b/worker.go
@@ -114,7 +114,7 @@ func (w *Worker) Wait() error {
return nil
}
- if w.state.Value() != StateDestructing {
+ if w.state.Value() != StateStopping {
w.state.set(StateErrored)
} else {
w.state.set(StateStopped)
@@ -130,17 +130,6 @@ func (w *Worker) Wait() error {
// Stop sends soft termination command to the worker and waits for process completion.
func (w *Worker) Stop() error {
- return w.doStop(StateStopping)
-}
-
-// Destroy is identical to stop command but does mark workers with different state. Destroyed workers won't
-// throw error state on completion of process destruction (exit status).
-func (w *Worker) Destroy() error {
- return w.doStop(StateDestructing)
-}
-
-// actual stopping.
-func (w *Worker) doStop(state int64) error {
select {
case <-w.waitDone:
return nil
@@ -148,7 +137,7 @@ func (w *Worker) doStop(state int64) error {
w.mu.Lock()
defer w.mu.Unlock()
- w.state.set(state)
+ w.state.set(StateStopping)
err := sendPayload(w.rl, &stopCommand{Stop: true})
<-w.waitDone
@@ -163,7 +152,7 @@ func (w *Worker) Kill() error {
case <-w.waitDone:
return nil
default:
- w.state.set(StateDestructing)
+ w.state.set(StateStopping)
err := w.cmd.Process.Signal(os.Kill)
<-w.waitDone
diff --git a/worker_test.go b/worker_test.go
index 362db1f6..cb88d797 100644
--- a/worker_test.go
+++ b/worker_test.go
@@ -13,6 +13,7 @@ func Test_GetState(t *testing.T) {
w, err := NewPipeFactory().SpawnWorker(cmd)
go func() {
assert.NoError(t, w.Wait())
+ assert.Equal(t, StateStopped, w.State().Value())
}()
assert.NoError(t, err)
@@ -20,7 +21,7 @@ func Test_GetState(t *testing.T) {
assert.Equal(t, StateReady, w.State().Value())
w.Stop()
- assert.Equal(t, StateStopped, w.State().Value())
+ assert.Equal(t, StateStopping, w.State().Value())
}
func Test_Echo(t *testing.T) {