summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-03-31 14:45:55 +0300
committerWolfy-J <[email protected]>2018-03-31 14:45:55 +0300
commitd1b7ac989e40b096bf3a47fd79d589adc4a63b31 (patch)
tree7fe0379c173f79505537037afcaf4e1f0a45791f
parent83fb54d2c19cfb3fe1f2b68be317a8ad21a6e15c (diff)
pool renamed to static pool to reflect it's design, new pool interface has been added
-rw-r--r--pool.go238
-rw-r--r--static_pool.go241
-rw-r--r--static_pool_test.go (renamed from pool_test.go)20
3 files changed, 257 insertions, 242 deletions
diff --git a/pool.go b/pool.go
index 86320baa..1a134a6a 100644
--- a/pool.go
+++ b/pool.go
@@ -1,18 +1,5 @@
package roadrunner
-import (
- "fmt"
- "github.com/pkg/errors"
- "os/exec"
- "sync"
- "time"
-)
-
-const (
- // StopRequest can be sent by worker to indicate that restart is required.
- StopRequest = "{\"stop\":true}"
-)
-
const (
// EventCreated thrown when new worker is spawned.
EventCreated = iota
@@ -24,224 +11,11 @@ const (
EventError
)
-// Pool controls worker creation, destruction and task routing.
-type Pool struct {
- // Observer is optional callback to handle worker create/destruct/error events.
- Observer func(event int, w *Worker, ctx interface{})
-
- // pool behaviour
- cfg Config
-
- // worker command creator
- cmd func() *exec.Cmd
-
- // creates and connects to workers
- factory Factory
-
- // active task executions
- tasks sync.WaitGroup
-
- // workers circular allocation buffer
- free chan *Worker
-
- // protects state of worker list, does not affect allocation
- muw sync.RWMutex
-
- // all registered workers
- workers []*Worker
-}
-
-// NewPool creates new worker pool and task multiplexer. Pool will initiate with one worker.
-func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*Pool, error) {
- if err := cfg.Valid(); err != nil {
- return nil, errors.Wrap(err, "config error")
- }
-
- p := &Pool{
- cfg: cfg,
- cmd: cmd,
- factory: factory,
- workers: make([]*Worker, 0, cfg.NumWorkers),
- free: make(chan *Worker, cfg.NumWorkers),
- }
-
- // constant number of workers simplify logic
- for i := uint64(0); i < p.cfg.NumWorkers; i++ {
- // to test if worker ready
- w, err := p.createWorker()
-
- if err != nil {
- p.Destroy()
- return nil, err
- }
-
- p.free <- w
- }
-
- return p, nil
-}
-
-// Config returns associated pool configuration. Immutable.
-func (p *Pool) Config() Config {
- return p.cfg
-}
-
-// Workers returns worker list associated with the pool.
-func (p *Pool) Workers() (workers []*Worker) {
- p.muw.RLock()
- defer p.muw.RUnlock()
-
- for _, w := range p.workers {
- workers = append(workers, w)
- }
-
- return workers
-}
-
-// Exec one task with given payload and context, returns result or error.
-func (p *Pool) Exec(rqs *Payload) (rsp *Payload, err error) {
- p.tasks.Add(1)
- defer p.tasks.Done()
-
- w, err := p.allocateWorker()
- if err != nil {
- return nil, errors.Wrap(err, "unable to allocate worker")
- }
-
- rsp, err = w.Exec(rqs)
-
- if err != nil {
- // soft job errors are allowed
- if _, jobError := err.(JobError); jobError {
- p.free <- w
- return nil, err
- }
-
- go p.replaceWorker(w, err)
- return nil, err
- }
-
- // worker want's to be terminated
- if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- go p.replaceWorker(w, err)
- return p.Exec(rqs)
- }
-
- if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions {
- go p.replaceWorker(w, p.cfg.MaxExecutions)
- } else {
- p.free <- w
- }
-
- return rsp, nil
-}
-
-// Destroy all underlying workers (but let them to complete the task).
-func (p *Pool) Destroy() {
- p.tasks.Wait()
-
- var wg sync.WaitGroup
- for _, w := range p.Workers() {
- wg.Add(1)
- go func(w *Worker) {
- defer wg.Done()
-
- p.destroyWorker(w)
- }(w)
- }
-
- wg.Wait()
-}
-
-// finds free worker in a given time interval or creates new if allowed.
-func (p *Pool) allocateWorker() (w *Worker, err error) {
- select {
- case w = <-p.free:
- return w, nil
- default:
- // enable timeout handler
- }
-
- timeout := time.NewTimer(p.cfg.AllocateTimeout)
- select {
- case <-timeout.C:
- return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
- case w := <-p.free:
- timeout.Stop()
- return w, nil
- }
-}
-
-// replaces dead or expired worker with new instance
-func (p *Pool) replaceWorker(w *Worker, caused interface{}) {
- go p.destroyWorker(w)
-
- if nw, err := p.createWorker(); err != nil {
- p.throw(EventError, w, err)
-
- if len(p.Workers()) == 0 {
- // possible situation when major error causes all PHP scripts to die (for example dead DB)
- p.throw(EventError, nil, fmt.Errorf("all workers dead"))
- }
- } else {
- p.free <- nw
- }
-}
-
-// destroy and remove worker from the pool.
-func (p *Pool) destroyWorker(w *Worker) {
- p.throw(EventDestruct, w, nil)
-
- // detaching
- p.muw.Lock()
- for i, wc := range p.workers {
- if wc == w {
- p.workers = p.workers[:i+1]
- break
- }
- }
- p.muw.Unlock()
-
- go w.Stop()
-
- select {
- case <-w.waitDone:
- // worker is dead
- case <-time.NewTimer(p.cfg.DestroyTimeout).C:
- // failed to stop process
- if err := w.Kill(); err != nil {
- p.throw(EventError, w, err)
- }
- }
-}
-
-// creates new worker using associated factory. automatically
-// adds worker to the worker list (background)
-func (p *Pool) createWorker() (*Worker, error) {
- w, err := p.factory.SpawnWorker(p.cmd())
- if err != nil {
- return nil, err
- }
-
- p.throw(EventCreated, w, nil)
-
- go func(w *Worker) {
- if err := w.Wait(); err != nil {
- p.throw(EventError, w, err)
- }
- }(w)
-
- p.muw.Lock()
- defer p.muw.Unlock()
-
- p.workers = append(p.workers, w)
-
- return w, nil
-}
+// Pool managed set of inner worker processes.
+type Pool interface {
+ // Exec one task with given payload and context, returns result or error.
+ Exec(rqs *Payload) (rsp *Payload, err error)
-// throw invokes event handler if any.
-func (p *Pool) throw(event int, w *Worker, ctx interface{}) {
- if p.Observer != nil {
- p.Observer(event, w, ctx)
- }
+ // Destroy all underlying workers (but let them to complete the task).
+ Destroy()
}
diff --git a/static_pool.go b/static_pool.go
new file mode 100644
index 00000000..b0f50c6f
--- /dev/null
+++ b/static_pool.go
@@ -0,0 +1,241 @@
+package roadrunner
+
+import (
+ "fmt"
+ "github.com/pkg/errors"
+ "os/exec"
+ "sync"
+ "time"
+)
+
+const (
+ // StopRequest can be sent by worker to indicate that restart is required.
+ StopRequest = "{\"stop\":true}"
+)
+
+// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of workers.
+type StaticPool struct {
+ // pool behaviour
+ cfg Config
+
+ // worker command creator
+ cmd func() *exec.Cmd
+
+ // observer is optional callback to handle worker create/destruct/error events.
+ observer func(event int, w *Worker, ctx interface{})
+
+ // creates and connects to workers
+ factory Factory
+
+ // active task executions
+ tasks sync.WaitGroup
+
+ // workers circular allocation buffer
+ free chan *Worker
+
+ // protects state of worker list, does not affect allocation
+ muw sync.RWMutex
+
+ // all registered workers
+ workers []*Worker
+}
+
+// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
+func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, error) {
+ if err := cfg.Valid(); err != nil {
+ return nil, errors.Wrap(err, "config error")
+ }
+
+ p := &StaticPool{
+ cfg: cfg,
+ cmd: cmd,
+ factory: factory,
+ workers: make([]*Worker, 0, cfg.NumWorkers),
+ free: make(chan *Worker, cfg.NumWorkers),
+ }
+
+ // constant number of workers simplify logic
+ for i := uint64(0); i < p.cfg.NumWorkers; i++ {
+ // to test if worker ready
+ w, err := p.createWorker()
+
+ if err != nil {
+ p.Destroy()
+ return nil, err
+ }
+
+ p.free <- w
+ }
+
+ return p, nil
+}
+
+// Observe attaches pool event watcher.
+func (p *StaticPool) Observe(o func(event int, w *Worker, ctx interface{})) {
+ p.observer = o
+}
+
+// Config returns associated pool configuration. Immutable.
+func (p *StaticPool) Config() Config {
+ return p.cfg
+}
+
+// Workers returns worker list associated with the pool.
+func (p *StaticPool) Workers() (workers []*Worker) {
+ p.muw.RLock()
+ defer p.muw.RUnlock()
+
+ for _, w := range p.workers {
+ workers = append(workers, w)
+ }
+
+ return workers
+}
+
+// Exec one task with given payload and context, returns result or error.
+func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
+ p.tasks.Add(1)
+ defer p.tasks.Done()
+
+ w, err := p.allocateWorker()
+ if err != nil {
+ return nil, errors.Wrap(err, "unable to allocate worker")
+ }
+
+ rsp, err = w.Exec(rqs)
+
+ if err != nil {
+ // soft job errors are allowed
+ if _, jobError := err.(JobError); jobError {
+ p.free <- w
+ return nil, err
+ }
+
+ go p.replaceWorker(w, err)
+ return nil, err
+ }
+
+ // worker want's to be terminated
+ if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
+ go p.replaceWorker(w, err)
+ return p.Exec(rqs)
+ }
+
+ if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions {
+ go p.replaceWorker(w, p.cfg.MaxExecutions)
+ } else {
+ p.free <- w
+ }
+
+ return rsp, nil
+}
+
+// Destroy all underlying workers (but let them to complete the task).
+func (p *StaticPool) Destroy() {
+ p.tasks.Wait()
+
+ var wg sync.WaitGroup
+ for _, w := range p.Workers() {
+ wg.Add(1)
+ go func(w *Worker) {
+ defer wg.Done()
+
+ p.destroyWorker(w)
+ }(w)
+ }
+
+ wg.Wait()
+}
+
+// finds free worker in a given time interval or creates new if allowed.
+func (p *StaticPool) allocateWorker() (w *Worker, err error) {
+ select {
+ case w = <-p.free:
+ return w, nil
+ default:
+ // enable timeout handler
+ }
+
+ timeout := time.NewTimer(p.cfg.AllocateTimeout)
+ select {
+ case <-timeout.C:
+ return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
+ case w := <-p.free:
+ timeout.Stop()
+ return w, nil
+ }
+}
+
+// replaces dead or expired worker with new instance
+func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) {
+ go p.destroyWorker(w)
+
+ if nw, err := p.createWorker(); err != nil {
+ p.throw(EventError, w, err)
+
+ if len(p.Workers()) == 0 {
+ // possible situation when major error causes all PHP scripts to die (for example dead DB)
+ p.throw(EventError, nil, fmt.Errorf("all workers dead"))
+ }
+ } else {
+ p.free <- nw
+ }
+}
+
+// destroy and remove worker from the pool.
+func (p *StaticPool) destroyWorker(w *Worker) {
+ p.throw(EventDestruct, w, nil)
+
+ // detaching
+ p.muw.Lock()
+ for i, wc := range p.workers {
+ if wc == w {
+ p.workers = p.workers[:i+1]
+ break
+ }
+ }
+ p.muw.Unlock()
+
+ go w.Stop()
+
+ select {
+ case <-w.waitDone:
+ // worker is dead
+ case <-time.NewTimer(p.cfg.DestroyTimeout).C:
+ // failed to stop process
+ if err := w.Kill(); err != nil {
+ p.throw(EventError, w, err)
+ }
+ }
+}
+
+// creates new worker using associated factory. automatically
+// adds worker to the worker list (background)
+func (p *StaticPool) createWorker() (*Worker, error) {
+ w, err := p.factory.SpawnWorker(p.cmd())
+ if err != nil {
+ return nil, err
+ }
+
+ p.throw(EventCreated, w, nil)
+
+ go func(w *Worker) {
+ if err := w.Wait(); err != nil {
+ p.throw(EventError, w, err)
+ }
+ }(w)
+
+ p.muw.Lock()
+ defer p.muw.Unlock()
+
+ p.workers = append(p.workers, w)
+
+ return w, nil
+}
+
+// throw invokes event handler if any.
+func (p *StaticPool) throw(event int, w *Worker, ctx interface{}) {
+ if p.observer != nil {
+ p.observer(event, w, ctx)
+ }
+}
diff --git a/pool_test.go b/static_pool_test.go
index c87f4ea7..c3b3cbba 100644
--- a/pool_test.go
+++ b/static_pool_test.go
@@ -31,7 +31,7 @@ func Test_NewPool(t *testing.T) {
assert.NoError(t, err)
}
-func Test_Pool_Invalid(t *testing.T) {
+func Test_StaticPool_Invalid(t *testing.T) {
p, err := NewPool(
func() *exec.Cmd { return exec.Command("php", "tests/invalid.php") },
NewPipeFactory(),
@@ -56,7 +56,7 @@ func Test_ConfigError(t *testing.T) {
assert.Error(t, err)
}
-func Test_Pool_Echo(t *testing.T) {
+func Test_StaticPool_Echo(t *testing.T) {
p, err := NewPool(
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
@@ -77,7 +77,7 @@ func Test_Pool_Echo(t *testing.T) {
assert.Equal(t, "hello", res.String())
}
-func Test_Pool_Echo_NilContext(t *testing.T) {
+func Test_StaticPool_Echo_NilContext(t *testing.T) {
p, err := NewPool(
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") },
NewPipeFactory(),
@@ -98,7 +98,7 @@ func Test_Pool_Echo_NilContext(t *testing.T) {
assert.Equal(t, "hello", res.String())
}
-func Test_Pool_Echo_Context(t *testing.T) {
+func Test_StaticPool_Echo_Context(t *testing.T) {
p, err := NewPool(
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "head", "pipes") },
NewPipeFactory(),
@@ -119,7 +119,7 @@ func Test_Pool_Echo_Context(t *testing.T) {
assert.Equal(t, "world", string(res.Context))
}
-func Test_Pool_JobError(t *testing.T) {
+func Test_StaticPool_JobError(t *testing.T) {
p, err := NewPool(
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "error", "pipes") },
NewPipeFactory(),
@@ -139,7 +139,7 @@ func Test_Pool_JobError(t *testing.T) {
assert.Equal(t, "hello", err.Error())
}
-func Test_Pool_Broken_Replace(t *testing.T) {
+func Test_StaticPool_Broken_Replace(t *testing.T) {
p, err := NewPool(
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") },
NewPipeFactory(),
@@ -150,7 +150,7 @@ func Test_Pool_Broken_Replace(t *testing.T) {
assert.NotNil(t, p)
assert.NoError(t, err)
- p.Observer = func(e int, w *Worker, ctx interface{}) {
+ p.observer = func(e int, w *Worker, ctx interface{}) {
if err, ok := ctx.(error); ok {
assert.Contains(t, err.Error(), "undefined_function()")
}
@@ -162,7 +162,7 @@ func Test_Pool_Broken_Replace(t *testing.T) {
assert.Nil(t, res)
}
-func Test_Pool_AllocateTimeout(t *testing.T) {
+func Test_StaticPool_AllocateTimeout(t *testing.T) {
p, err := NewPool(
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") },
NewPipeFactory(),
@@ -194,7 +194,7 @@ func Test_Pool_AllocateTimeout(t *testing.T) {
p.Destroy()
}
-func Test_Pool_Replace_Worker(t *testing.T) {
+func Test_StaticPool_Replace_Worker(t *testing.T) {
p, err := NewPool(
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") },
NewPipeFactory(),
@@ -230,7 +230,7 @@ func Test_Pool_Replace_Worker(t *testing.T) {
}
// identical to replace but controlled on worker side
-func Test_Pool_Stop_Worker(t *testing.T) {
+func Test_StaticPool_Stop_Worker(t *testing.T) {
p, err := NewPool(
func() *exec.Cmd { return exec.Command("php", "tests/client.php", "stop", "pipes") },
NewPipeFactory(),