diff options
author | Wolfy-J <[email protected]> | 2018-06-03 12:54:43 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-06-03 12:54:43 +0300 |
commit | 36ea77baa5a41de10bd604cd0e5b5b3cafaaeb64 (patch) | |
tree | 13ca8abd454a6668f490eec2e44b1520bd3953fe /static_pool.go | |
parent | b02611b7266589d888e054a1d2e4432ae370617d (diff) |
service bus, http service, rpc bus, cli commands, new configs
Diffstat (limited to 'static_pool.go')
-rw-r--r-- | static_pool.go | 29 |
1 files changed, 18 insertions, 11 deletions
diff --git a/static_pool.go b/static_pool.go index be5e9b06..0527d024 100644 --- a/static_pool.go +++ b/static_pool.go @@ -107,7 +107,7 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { if err != nil { // soft job errors are allowed if _, jobError := err.(JobError); jobError { - p.free <- w + p.release(w) return nil, err } @@ -121,12 +121,7 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { return p.Exec(rqs) } - if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions { - go p.replaceWorker(w, p.cfg.MaxExecutions) - } else { - p.free <- w - } - + p.release(w) return rsp, nil } @@ -165,6 +160,16 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { } } +// release releases or replaces the worker. +func (p *StaticPool) release(w *Worker) { + if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions { + go p.replaceWorker(w, p.cfg.MaxExecutions) + return + } + + p.free <- w +} + // replaceWorker replaces dead or expired worker with new instance. func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) { go p.destroyWorker(w) @@ -183,13 +188,11 @@ func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) { // destroyWorker destroys workers and removes it from the pool. func (p *StaticPool) destroyWorker(w *Worker) { - p.throw(EventWorkerDestruct, w) - // detaching p.muw.Lock() for i, wc := range p.workers { if wc == w { - p.workers = p.workers[:i+1] + p.workers = append(p.workers[:i], p.workers[i+1:]...) break } } @@ -200,11 +203,15 @@ func (p *StaticPool) destroyWorker(w *Worker) { select { case <-w.waitDone: // worker is dead + p.throw(EventWorkerDestruct, w) + case <-time.NewTimer(p.cfg.DestroyTimeout).C: - // failed to stop process + // failed to stop process in given time if err := w.Kill(); err != nil { p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err}) } + + p.throw(EventWorkerKill, w) } } |