diff options
-rw-r--r-- | pool.go | 4 | ||||
-rw-r--r-- | static_pool.go | 70 |
2 files changed, 37 insertions, 37 deletions
@@ -19,8 +19,8 @@ type Pool interface { // Workers returns worker list associated with the pool. Workers() (workers []*Worker) - // DestroyWorker destroys workers and removes it from the pool. - DestroyWorker(w *Worker) + // Replace replaces dead or expired worker with new instance. + Replace(w *Worker, caused interface{}) // Destroy all underlying workers (but let them to complete the task). Destroy() diff --git a/static_pool.go b/static_pool.go index 2ac6a593..73caf841 100644 --- a/static_pool.go +++ b/static_pool.go @@ -111,18 +111,18 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { return nil, err } - go p.replaceWorker(w, err) + go p.Replace(w, err) return nil, err } // worker want's to be terminated if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - go p.replaceWorker(w, err) + go p.Replace(w, err) return p.Exec(rqs) } if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions { - go p.replaceWorker(w, p.cfg.MaxExecutions) + go p.Replace(w, p.cfg.MaxExecutions) } else { p.free <- w } @@ -130,30 +130,19 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { return rsp, nil } -// DestroyWorker destroys workers and removes it from the pool. -func (p *StaticPool) DestroyWorker(w *Worker) { - p.throw(EventDestruct, w, nil) +// Replace replaces dead or expired worker with new instance. +func (p *StaticPool) Replace(w *Worker, caused interface{}) { + go p.destroyWorker(w) - // detaching - p.muw.Lock() - for i, wc := range p.workers { - if wc == w { - p.workers = p.workers[:i+1] - break - } - } - p.muw.Unlock() - - go w.Stop() + if nw, err := p.createWorker(); err != nil { + p.throw(EventError, w, err) - select { - case <-w.waitDone: - // worker is dead - case <-time.NewTimer(p.cfg.DestroyTimeout).C: - // failed to stop process - if err := w.Kill(); err != nil { - p.throw(EventError, w, err) + if len(p.Workers()) == 0 { + // possible situation when major error causes all PHP scripts to die (for example dead DB) + p.throw(EventError, nil, fmt.Errorf("all workers dead")) } + } else { + p.free <- nw } } @@ -167,7 +156,7 @@ func (p *StaticPool) Destroy() { go func(w *Worker) { defer wg.Done() - p.DestroyWorker(w) + p.destroyWorker(w) }(w) } @@ -193,19 +182,30 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { } } -// replaces dead or expired worker with new instance -func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) { - go p.DestroyWorker(w) +// destroyWorker destroys workers and removes it from the pool. +func (p *StaticPool) destroyWorker(w *Worker) { + p.throw(EventDestruct, w, nil) - if nw, err := p.createWorker(); err != nil { - p.throw(EventError, w, err) + // detaching + p.muw.Lock() + for i, wc := range p.workers { + if wc == w { + p.workers = p.workers[:i+1] + break + } + } + p.muw.Unlock() - if len(p.Workers()) == 0 { - // possible situation when major error causes all PHP scripts to die (for example dead DB) - p.throw(EventError, nil, fmt.Errorf("all workers dead")) + go w.Stop() + + select { + case <-w.waitDone: + // worker is dead + case <-time.NewTimer(p.cfg.DestroyTimeout).C: + // failed to stop process + if err := w.Kill(); err != nil { + p.throw(EventError, w, err) } - } else { - p.free <- nw } } |