diff options
-rw-r--r-- | pool.go | 238 | ||||
-rw-r--r-- | static_pool.go | 241 | ||||
-rw-r--r-- | static_pool_test.go (renamed from pool_test.go) | 20 |
3 files changed, 257 insertions, 242 deletions
@@ -1,18 +1,5 @@ package roadrunner -import ( - "fmt" - "github.com/pkg/errors" - "os/exec" - "sync" - "time" -) - -const ( - // StopRequest can be sent by worker to indicate that restart is required. - StopRequest = "{\"stop\":true}" -) - const ( // EventCreated thrown when new worker is spawned. EventCreated = iota @@ -24,224 +11,11 @@ const ( EventError ) -// Pool controls worker creation, destruction and task routing. -type Pool struct { - // Observer is optional callback to handle worker create/destruct/error events. - Observer func(event int, w *Worker, ctx interface{}) - - // pool behaviour - cfg Config - - // worker command creator - cmd func() *exec.Cmd - - // creates and connects to workers - factory Factory - - // active task executions - tasks sync.WaitGroup - - // workers circular allocation buffer - free chan *Worker - - // protects state of worker list, does not affect allocation - muw sync.RWMutex - - // all registered workers - workers []*Worker -} - -// NewPool creates new worker pool and task multiplexer. Pool will initiate with one worker. -func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*Pool, error) { - if err := cfg.Valid(); err != nil { - return nil, errors.Wrap(err, "config error") - } - - p := &Pool{ - cfg: cfg, - cmd: cmd, - factory: factory, - workers: make([]*Worker, 0, cfg.NumWorkers), - free: make(chan *Worker, cfg.NumWorkers), - } - - // constant number of workers simplify logic - for i := uint64(0); i < p.cfg.NumWorkers; i++ { - // to test if worker ready - w, err := p.createWorker() - - if err != nil { - p.Destroy() - return nil, err - } - - p.free <- w - } - - return p, nil -} - -// Config returns associated pool configuration. Immutable. -func (p *Pool) Config() Config { - return p.cfg -} - -// Workers returns worker list associated with the pool. -func (p *Pool) Workers() (workers []*Worker) { - p.muw.RLock() - defer p.muw.RUnlock() - - for _, w := range p.workers { - workers = append(workers, w) - } - - return workers -} - -// Exec one task with given payload and context, returns result or error. -func (p *Pool) Exec(rqs *Payload) (rsp *Payload, err error) { - p.tasks.Add(1) - defer p.tasks.Done() - - w, err := p.allocateWorker() - if err != nil { - return nil, errors.Wrap(err, "unable to allocate worker") - } - - rsp, err = w.Exec(rqs) - - if err != nil { - // soft job errors are allowed - if _, jobError := err.(JobError); jobError { - p.free <- w - return nil, err - } - - go p.replaceWorker(w, err) - return nil, err - } - - // worker want's to be terminated - if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - go p.replaceWorker(w, err) - return p.Exec(rqs) - } - - if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions { - go p.replaceWorker(w, p.cfg.MaxExecutions) - } else { - p.free <- w - } - - return rsp, nil -} - -// Destroy all underlying workers (but let them to complete the task). -func (p *Pool) Destroy() { - p.tasks.Wait() - - var wg sync.WaitGroup - for _, w := range p.Workers() { - wg.Add(1) - go func(w *Worker) { - defer wg.Done() - - p.destroyWorker(w) - }(w) - } - - wg.Wait() -} - -// finds free worker in a given time interval or creates new if allowed. -func (p *Pool) allocateWorker() (w *Worker, err error) { - select { - case w = <-p.free: - return w, nil - default: - // enable timeout handler - } - - timeout := time.NewTimer(p.cfg.AllocateTimeout) - select { - case <-timeout.C: - return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout) - case w := <-p.free: - timeout.Stop() - return w, nil - } -} - -// replaces dead or expired worker with new instance -func (p *Pool) replaceWorker(w *Worker, caused interface{}) { - go p.destroyWorker(w) - - if nw, err := p.createWorker(); err != nil { - p.throw(EventError, w, err) - - if len(p.Workers()) == 0 { - // possible situation when major error causes all PHP scripts to die (for example dead DB) - p.throw(EventError, nil, fmt.Errorf("all workers dead")) - } - } else { - p.free <- nw - } -} - -// destroy and remove worker from the pool. -func (p *Pool) destroyWorker(w *Worker) { - p.throw(EventDestruct, w, nil) - - // detaching - p.muw.Lock() - for i, wc := range p.workers { - if wc == w { - p.workers = p.workers[:i+1] - break - } - } - p.muw.Unlock() - - go w.Stop() - - select { - case <-w.waitDone: - // worker is dead - case <-time.NewTimer(p.cfg.DestroyTimeout).C: - // failed to stop process - if err := w.Kill(); err != nil { - p.throw(EventError, w, err) - } - } -} - -// creates new worker using associated factory. automatically -// adds worker to the worker list (background) -func (p *Pool) createWorker() (*Worker, error) { - w, err := p.factory.SpawnWorker(p.cmd()) - if err != nil { - return nil, err - } - - p.throw(EventCreated, w, nil) - - go func(w *Worker) { - if err := w.Wait(); err != nil { - p.throw(EventError, w, err) - } - }(w) - - p.muw.Lock() - defer p.muw.Unlock() - - p.workers = append(p.workers, w) - - return w, nil -} +// Pool managed set of inner worker processes. +type Pool interface { + // Exec one task with given payload and context, returns result or error. + Exec(rqs *Payload) (rsp *Payload, err error) -// throw invokes event handler if any. -func (p *Pool) throw(event int, w *Worker, ctx interface{}) { - if p.Observer != nil { - p.Observer(event, w, ctx) - } + // Destroy all underlying workers (but let them to complete the task). + Destroy() } diff --git a/static_pool.go b/static_pool.go new file mode 100644 index 00000000..b0f50c6f --- /dev/null +++ b/static_pool.go @@ -0,0 +1,241 @@ +package roadrunner + +import ( + "fmt" + "github.com/pkg/errors" + "os/exec" + "sync" + "time" +) + +const ( + // StopRequest can be sent by worker to indicate that restart is required. + StopRequest = "{\"stop\":true}" +) + +// StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of workers. +type StaticPool struct { + // pool behaviour + cfg Config + + // worker command creator + cmd func() *exec.Cmd + + // observer is optional callback to handle worker create/destruct/error events. + observer func(event int, w *Worker, ctx interface{}) + + // creates and connects to workers + factory Factory + + // active task executions + tasks sync.WaitGroup + + // workers circular allocation buffer + free chan *Worker + + // protects state of worker list, does not affect allocation + muw sync.RWMutex + + // all registered workers + workers []*Worker +} + +// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. +func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, error) { + if err := cfg.Valid(); err != nil { + return nil, errors.Wrap(err, "config error") + } + + p := &StaticPool{ + cfg: cfg, + cmd: cmd, + factory: factory, + workers: make([]*Worker, 0, cfg.NumWorkers), + free: make(chan *Worker, cfg.NumWorkers), + } + + // constant number of workers simplify logic + for i := uint64(0); i < p.cfg.NumWorkers; i++ { + // to test if worker ready + w, err := p.createWorker() + + if err != nil { + p.Destroy() + return nil, err + } + + p.free <- w + } + + return p, nil +} + +// Observe attaches pool event watcher. +func (p *StaticPool) Observe(o func(event int, w *Worker, ctx interface{})) { + p.observer = o +} + +// Config returns associated pool configuration. Immutable. +func (p *StaticPool) Config() Config { + return p.cfg +} + +// Workers returns worker list associated with the pool. +func (p *StaticPool) Workers() (workers []*Worker) { + p.muw.RLock() + defer p.muw.RUnlock() + + for _, w := range p.workers { + workers = append(workers, w) + } + + return workers +} + +// Exec one task with given payload and context, returns result or error. +func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { + p.tasks.Add(1) + defer p.tasks.Done() + + w, err := p.allocateWorker() + if err != nil { + return nil, errors.Wrap(err, "unable to allocate worker") + } + + rsp, err = w.Exec(rqs) + + if err != nil { + // soft job errors are allowed + if _, jobError := err.(JobError); jobError { + p.free <- w + return nil, err + } + + go p.replaceWorker(w, err) + return nil, err + } + + // worker want's to be terminated + if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { + go p.replaceWorker(w, err) + return p.Exec(rqs) + } + + if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions { + go p.replaceWorker(w, p.cfg.MaxExecutions) + } else { + p.free <- w + } + + return rsp, nil +} + +// Destroy all underlying workers (but let them to complete the task). +func (p *StaticPool) Destroy() { + p.tasks.Wait() + + var wg sync.WaitGroup + for _, w := range p.Workers() { + wg.Add(1) + go func(w *Worker) { + defer wg.Done() + + p.destroyWorker(w) + }(w) + } + + wg.Wait() +} + +// finds free worker in a given time interval or creates new if allowed. +func (p *StaticPool) allocateWorker() (w *Worker, err error) { + select { + case w = <-p.free: + return w, nil + default: + // enable timeout handler + } + + timeout := time.NewTimer(p.cfg.AllocateTimeout) + select { + case <-timeout.C: + return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout) + case w := <-p.free: + timeout.Stop() + return w, nil + } +} + +// replaces dead or expired worker with new instance +func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) { + go p.destroyWorker(w) + + if nw, err := p.createWorker(); err != nil { + p.throw(EventError, w, err) + + if len(p.Workers()) == 0 { + // possible situation when major error causes all PHP scripts to die (for example dead DB) + p.throw(EventError, nil, fmt.Errorf("all workers dead")) + } + } else { + p.free <- nw + } +} + +// destroy and remove worker from the pool. +func (p *StaticPool) destroyWorker(w *Worker) { + p.throw(EventDestruct, w, nil) + + // detaching + p.muw.Lock() + for i, wc := range p.workers { + if wc == w { + p.workers = p.workers[:i+1] + break + } + } + p.muw.Unlock() + + go w.Stop() + + select { + case <-w.waitDone: + // worker is dead + case <-time.NewTimer(p.cfg.DestroyTimeout).C: + // failed to stop process + if err := w.Kill(); err != nil { + p.throw(EventError, w, err) + } + } +} + +// creates new worker using associated factory. automatically +// adds worker to the worker list (background) +func (p *StaticPool) createWorker() (*Worker, error) { + w, err := p.factory.SpawnWorker(p.cmd()) + if err != nil { + return nil, err + } + + p.throw(EventCreated, w, nil) + + go func(w *Worker) { + if err := w.Wait(); err != nil { + p.throw(EventError, w, err) + } + }(w) + + p.muw.Lock() + defer p.muw.Unlock() + + p.workers = append(p.workers, w) + + return w, nil +} + +// throw invokes event handler if any. +func (p *StaticPool) throw(event int, w *Worker, ctx interface{}) { + if p.observer != nil { + p.observer(event, w, ctx) + } +} diff --git a/pool_test.go b/static_pool_test.go index c87f4ea7..c3b3cbba 100644 --- a/pool_test.go +++ b/static_pool_test.go @@ -31,7 +31,7 @@ func Test_NewPool(t *testing.T) { assert.NoError(t, err) } -func Test_Pool_Invalid(t *testing.T) { +func Test_StaticPool_Invalid(t *testing.T) { p, err := NewPool( func() *exec.Cmd { return exec.Command("php", "tests/invalid.php") }, NewPipeFactory(), @@ -56,7 +56,7 @@ func Test_ConfigError(t *testing.T) { assert.Error(t, err) } -func Test_Pool_Echo(t *testing.T) { +func Test_StaticPool_Echo(t *testing.T) { p, err := NewPool( func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), @@ -77,7 +77,7 @@ func Test_Pool_Echo(t *testing.T) { assert.Equal(t, "hello", res.String()) } -func Test_Pool_Echo_NilContext(t *testing.T) { +func Test_StaticPool_Echo_NilContext(t *testing.T) { p, err := NewPool( func() *exec.Cmd { return exec.Command("php", "tests/client.php", "echo", "pipes") }, NewPipeFactory(), @@ -98,7 +98,7 @@ func Test_Pool_Echo_NilContext(t *testing.T) { assert.Equal(t, "hello", res.String()) } -func Test_Pool_Echo_Context(t *testing.T) { +func Test_StaticPool_Echo_Context(t *testing.T) { p, err := NewPool( func() *exec.Cmd { return exec.Command("php", "tests/client.php", "head", "pipes") }, NewPipeFactory(), @@ -119,7 +119,7 @@ func Test_Pool_Echo_Context(t *testing.T) { assert.Equal(t, "world", string(res.Context)) } -func Test_Pool_JobError(t *testing.T) { +func Test_StaticPool_JobError(t *testing.T) { p, err := NewPool( func() *exec.Cmd { return exec.Command("php", "tests/client.php", "error", "pipes") }, NewPipeFactory(), @@ -139,7 +139,7 @@ func Test_Pool_JobError(t *testing.T) { assert.Equal(t, "hello", err.Error()) } -func Test_Pool_Broken_Replace(t *testing.T) { +func Test_StaticPool_Broken_Replace(t *testing.T) { p, err := NewPool( func() *exec.Cmd { return exec.Command("php", "tests/client.php", "broken", "pipes") }, NewPipeFactory(), @@ -150,7 +150,7 @@ func Test_Pool_Broken_Replace(t *testing.T) { assert.NotNil(t, p) assert.NoError(t, err) - p.Observer = func(e int, w *Worker, ctx interface{}) { + p.observer = func(e int, w *Worker, ctx interface{}) { if err, ok := ctx.(error); ok { assert.Contains(t, err.Error(), "undefined_function()") } @@ -162,7 +162,7 @@ func Test_Pool_Broken_Replace(t *testing.T) { assert.Nil(t, res) } -func Test_Pool_AllocateTimeout(t *testing.T) { +func Test_StaticPool_AllocateTimeout(t *testing.T) { p, err := NewPool( func() *exec.Cmd { return exec.Command("php", "tests/client.php", "delay", "pipes") }, NewPipeFactory(), @@ -194,7 +194,7 @@ func Test_Pool_AllocateTimeout(t *testing.T) { p.Destroy() } -func Test_Pool_Replace_Worker(t *testing.T) { +func Test_StaticPool_Replace_Worker(t *testing.T) { p, err := NewPool( func() *exec.Cmd { return exec.Command("php", "tests/client.php", "pid", "pipes") }, NewPipeFactory(), @@ -230,7 +230,7 @@ func Test_Pool_Replace_Worker(t *testing.T) { } // identical to replace but controlled on worker side -func Test_Pool_Stop_Worker(t *testing.T) { +func Test_StaticPool_Stop_Worker(t *testing.T) { p, err := NewPool( func() *exec.Cmd { return exec.Command("php", "tests/client.php", "stop", "pipes") }, NewPipeFactory(), |