summaryrefslogtreecommitdiff
path: root/pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'pool.go')
-rw-r--r--pool.go238
1 files changed, 6 insertions, 232 deletions
diff --git a/pool.go b/pool.go
index 86320baa..1a134a6a 100644
--- a/pool.go
+++ b/pool.go
@@ -1,18 +1,5 @@
package roadrunner
-import (
- "fmt"
- "github.com/pkg/errors"
- "os/exec"
- "sync"
- "time"
-)
-
-const (
- // StopRequest can be sent by worker to indicate that restart is required.
- StopRequest = "{\"stop\":true}"
-)
-
const (
// EventCreated thrown when new worker is spawned.
EventCreated = iota
@@ -24,224 +11,11 @@ const (
EventError
)
-// Pool controls worker creation, destruction and task routing.
-type Pool struct {
- // Observer is optional callback to handle worker create/destruct/error events.
- Observer func(event int, w *Worker, ctx interface{})
-
- // pool behaviour
- cfg Config
-
- // worker command creator
- cmd func() *exec.Cmd
-
- // creates and connects to workers
- factory Factory
-
- // active task executions
- tasks sync.WaitGroup
-
- // workers circular allocation buffer
- free chan *Worker
-
- // protects state of worker list, does not affect allocation
- muw sync.RWMutex
-
- // all registered workers
- workers []*Worker
-}
-
-// NewPool creates new worker pool and task multiplexer. Pool will initiate with one worker.
-func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*Pool, error) {
- if err := cfg.Valid(); err != nil {
- return nil, errors.Wrap(err, "config error")
- }
-
- p := &Pool{
- cfg: cfg,
- cmd: cmd,
- factory: factory,
- workers: make([]*Worker, 0, cfg.NumWorkers),
- free: make(chan *Worker, cfg.NumWorkers),
- }
-
- // constant number of workers simplify logic
- for i := uint64(0); i < p.cfg.NumWorkers; i++ {
- // to test if worker ready
- w, err := p.createWorker()
-
- if err != nil {
- p.Destroy()
- return nil, err
- }
-
- p.free <- w
- }
-
- return p, nil
-}
-
-// Config returns associated pool configuration. Immutable.
-func (p *Pool) Config() Config {
- return p.cfg
-}
-
-// Workers returns worker list associated with the pool.
-func (p *Pool) Workers() (workers []*Worker) {
- p.muw.RLock()
- defer p.muw.RUnlock()
-
- for _, w := range p.workers {
- workers = append(workers, w)
- }
-
- return workers
-}
-
-// Exec one task with given payload and context, returns result or error.
-func (p *Pool) Exec(rqs *Payload) (rsp *Payload, err error) {
- p.tasks.Add(1)
- defer p.tasks.Done()
-
- w, err := p.allocateWorker()
- if err != nil {
- return nil, errors.Wrap(err, "unable to allocate worker")
- }
-
- rsp, err = w.Exec(rqs)
-
- if err != nil {
- // soft job errors are allowed
- if _, jobError := err.(JobError); jobError {
- p.free <- w
- return nil, err
- }
-
- go p.replaceWorker(w, err)
- return nil, err
- }
-
- // worker want's to be terminated
- if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- go p.replaceWorker(w, err)
- return p.Exec(rqs)
- }
-
- if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions {
- go p.replaceWorker(w, p.cfg.MaxExecutions)
- } else {
- p.free <- w
- }
-
- return rsp, nil
-}
-
-// Destroy all underlying workers (but let them to complete the task).
-func (p *Pool) Destroy() {
- p.tasks.Wait()
-
- var wg sync.WaitGroup
- for _, w := range p.Workers() {
- wg.Add(1)
- go func(w *Worker) {
- defer wg.Done()
-
- p.destroyWorker(w)
- }(w)
- }
-
- wg.Wait()
-}
-
-// finds free worker in a given time interval or creates new if allowed.
-func (p *Pool) allocateWorker() (w *Worker, err error) {
- select {
- case w = <-p.free:
- return w, nil
- default:
- // enable timeout handler
- }
-
- timeout := time.NewTimer(p.cfg.AllocateTimeout)
- select {
- case <-timeout.C:
- return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
- case w := <-p.free:
- timeout.Stop()
- return w, nil
- }
-}
-
-// replaces dead or expired worker with new instance
-func (p *Pool) replaceWorker(w *Worker, caused interface{}) {
- go p.destroyWorker(w)
-
- if nw, err := p.createWorker(); err != nil {
- p.throw(EventError, w, err)
-
- if len(p.Workers()) == 0 {
- // possible situation when major error causes all PHP scripts to die (for example dead DB)
- p.throw(EventError, nil, fmt.Errorf("all workers dead"))
- }
- } else {
- p.free <- nw
- }
-}
-
-// destroy and remove worker from the pool.
-func (p *Pool) destroyWorker(w *Worker) {
- p.throw(EventDestruct, w, nil)
-
- // detaching
- p.muw.Lock()
- for i, wc := range p.workers {
- if wc == w {
- p.workers = p.workers[:i+1]
- break
- }
- }
- p.muw.Unlock()
-
- go w.Stop()
-
- select {
- case <-w.waitDone:
- // worker is dead
- case <-time.NewTimer(p.cfg.DestroyTimeout).C:
- // failed to stop process
- if err := w.Kill(); err != nil {
- p.throw(EventError, w, err)
- }
- }
-}
-
-// creates new worker using associated factory. automatically
-// adds worker to the worker list (background)
-func (p *Pool) createWorker() (*Worker, error) {
- w, err := p.factory.SpawnWorker(p.cmd())
- if err != nil {
- return nil, err
- }
-
- p.throw(EventCreated, w, nil)
-
- go func(w *Worker) {
- if err := w.Wait(); err != nil {
- p.throw(EventError, w, err)
- }
- }(w)
-
- p.muw.Lock()
- defer p.muw.Unlock()
-
- p.workers = append(p.workers, w)
-
- return w, nil
-}
+// Pool managed set of inner worker processes.
+type Pool interface {
+ // Exec one task with given payload and context, returns result or error.
+ Exec(rqs *Payload) (rsp *Payload, err error)
-// throw invokes event handler if any.
-func (p *Pool) throw(event int, w *Worker, ctx interface{}) {
- if p.Observer != nil {
- p.Observer(event, w, ctx)
- }
+ // Destroy all underlying workers (but let them to complete the task).
+ Destroy()
}