diff options
author | Wolfy-J <[email protected]> | 2018-01-23 19:51:15 -0500 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-01-23 19:51:15 -0500 |
commit | 78a42de837928cf7d10a1ae04d7e82e56d66e1e2 (patch) | |
tree | 8882b9a051bcc9c42328df583c0bb8c39a89591e /pool.go | |
parent | fa4bd78d9f7c5f74e8445374370927c742fc4e78 (diff) |
API update
Diffstat (limited to 'pool.go')
-rw-r--r-- | pool.go | 231 |
1 files changed, 131 insertions, 100 deletions
@@ -2,103 +2,89 @@ package roadrunner import ( "fmt" + "log" "os/exec" "sync" - "sync/atomic" "time" + "github.com/pkg/errors" ) const ( - // ContextTerminate must be sent by worker in control payload if worker want to die. - ContextTerminate = "TERMINATE" + // Control header to be made by worker to request termination. + TerminateRequest = "{\"terminate\": true}" ) // Pool controls worker creation, destruction and task routing. type Pool struct { - cfg Config // pool behaviour - cmd func() *exec.Cmd // worker command creator - factory Factory // creates and connects to workers - numWorkers uint64 // current number of tasks workers - tasks sync.WaitGroup // counts all tasks executions - mua sync.Mutex // protects worker allocation - muw sync.RWMutex // protects state of worker list - workers []*Worker // all registered workers - free chan *Worker // freed workers + // pool behaviour + cfg Config + + // worker command creator + cmd func() *exec.Cmd + + // creates and connects to workers + factory Factory + + // active task executions + tasks sync.WaitGroup + + // workers circular allocation buffer + free chan *Worker + + // protects state of worker list, does not affect allocation + muw sync.RWMutex + + // all registered workers + workers []*Worker } // NewPool creates new worker pool and task multiplexer. Pool will initiate with one worker. func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*Pool, error) { + if err := cfg.Valid(); err != nil { + return nil, errors.Wrap(err, "config error") + } + p := &Pool{ cfg: cfg, cmd: cmd, factory: factory, - workers: make([]*Worker, 0, cfg.MaxWorkers), - free: make(chan *Worker, cfg.MaxWorkers), + workers: make([]*Worker, 0, cfg.NumWorkers), + free: make(chan *Worker, cfg.NumWorkers), } - // to test if worker ready - w, err := p.createWorker() - if err != nil { - return nil, err - } + //todo: watch for error from workers!!! - p.free <- w - return p, nil -} + // constant number of workers simplify logic + for i := uint64(0); i < p.cfg.NumWorkers; i++ { + // to test if worker ready + w, err := p.createWorker() -// Execute one task with given payload and context, returns result and context or error. Must not be used once pool is -// being destroyed. -func (p *Pool) Execute(payload []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) { - p.tasks.Add(1) - defer p.tasks.Done() + if err != nil { + p.Destroy() - w, err := p.allocateWorker() - if err != nil { - return nil, nil, err - } - - if resp, rCtx, err = w.Execute(payload, ctx); err != nil { - if !p.cfg.DestroyOnError { - if err, jobError := err.(JobError); jobError { - p.free <- w - return nil, nil, err - } + return nil, err } - // worker level error - p.destroyWorker(w) - - return nil, nil, err - } - - // controlled destruction - if len(resp) == 0 && string(rCtx) == ContextTerminate { - p.destroyWorker(w) - go func() { - //immediate refill - if w, err := p.createWorker(); err != nil { - p.free <- w + // worker watcher + go func(w *Worker) { + if err := w.Wait(); err != nil { + // todo: register error + log.Println(err) } - }() - - return p.Execute(payload, ctx) - } + }(w) - if p.cfg.MaxExecutions != 0 && atomic.LoadUint64(&w.NumExecutions) > p.cfg.MaxExecutions { - p.destroyWorker(w) - } else { p.free <- w } - return resp, rCtx, nil + return p, nil } -// Config returns associated pool configuration. +// Config returns associated pool configuration. Immutable. func (p *Pool) Config() Config { return p.cfg } -// Workers returns workers associated with the pool. +// Workers returns worker list associated with the pool. func (p *Pool) Workers() (workers []*Worker) { p.muw.RLock() defer p.muw.RUnlock() @@ -110,8 +96,46 @@ func (p *Pool) Workers() (workers []*Worker) { return workers } -// Close all underlying workers (but let them to complete the task). -func (p *Pool) Close() { +// Exec one task with given payload and context, returns result or error. +func (p *Pool) Exec(rqs *Payload) (rsp *Payload, err error) { + p.tasks.Add(1) + defer p.tasks.Done() + + w, err := p.allocateWorker() + if err != nil { + return nil, errors.Wrap(err, "unable to allocate worker") + } + + rsp, err = w.Exec(rqs) + + if err != nil { + // soft job errors are allowed + if _, jobError := err.(JobError); jobError { + p.free <- w + return nil, err + } + + go p.replaceWorker(w, err) + return nil, err + } + + // worker want's to be terminated + if rsp.Body == nil && rsp.Head != nil && string(rsp.Head) == TerminateRequest { + go p.replaceWorker(w, err) + return p.Exec(rqs) + } + + if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions { + go p.replaceWorker(w, p.cfg.MaxExecutions) + } else { + p.free <- w + } + + return rsp, nil +} + +// Destroy all underlying workers (but let them to complete the task). +func (p *Pool) Destroy() { p.tasks.Wait() var wg sync.WaitGroup @@ -119,6 +143,7 @@ func (p *Pool) Close() { wg.Add(1) go func(w *Worker) { defer wg.Done() + p.destroyWorker(w) }(w) } @@ -127,63 +152,69 @@ func (p *Pool) Close() { } // finds free worker in a given time interval or creates new if allowed. -func (p *Pool) allocateWorker() (*Worker, error) { - p.mua.Lock() - defer p.mua.Unlock() - +func (p *Pool) allocateWorker() (w *Worker, err error) { select { - case w := <-p.free: - // we already have free worker + case w = <-p.free: return w, nil default: - if p.numWorkers < p.cfg.MaxWorkers { - return p.createWorker() - } + // enable timeout handler + } - timeout := time.NewTimer(p.cfg.AllocateTimeout) - select { - case <-timeout.C: - return nil, fmt.Errorf("unable to allocate worker, timeout (%s)", p.cfg.AllocateTimeout) - case w := <-p.free: - timeout.Stop() - return w, nil - } + timeout := time.NewTimer(p.cfg.AllocateTimeout) + select { + case <-timeout.C: + return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout) + case w := <-p.free: + timeout.Stop() + return w, nil } } +// replaces dead or expired worker with new instance +func (p *Pool) replaceWorker(w *Worker, caused interface{}) { + go p.destroyWorker(w) + + nw, _ := p.createWorker() + p.free <- nw +} + // destroy and remove worker from the pool. func (p *Pool) destroyWorker(w *Worker) { - atomic.AddUint64(&p.numWorkers, ^uint64(0)) - - go func() { - w.Stop() + // detaching + p.muw.Lock() + for i, wc := range p.workers { + if wc == w { + p.workers = p.workers[:i+1] + break + } + } + p.muw.Unlock() - p.muw.Lock() - defer p.muw.Unlock() + go w.Stop() - for i, wc := range p.workers { - if wc == w { - p.workers = p.workers[:i+1] - break - } + select { + case <-w.waitDone: + // worker is dead + case <-time.NewTimer(time.Second * 10).C: + // failed to stop process + if err := w.Kill(); err != nil { + //todo: can't kill or already killed? } - }() + } } -// creates new worker (must be called in a locked state). +// creates new worker using associated factory. automatically +// adds worker to the worker list (background) func (p *Pool) createWorker() (*Worker, error) { - w, err := p.factory.NewWorker(p.cmd()) + w, err := p.factory.SpawnWorker(p.cmd()) if err != nil { return nil, err } - atomic.AddUint64(&p.numWorkers, 1) + p.muw.Lock() + defer p.muw.Unlock() - go func() { - p.muw.Lock() - defer p.muw.Unlock() - p.workers = append(p.workers, w) - }() + p.workers = append(p.workers, w) return w, nil } |