diff options
author | Wolfy-J <[email protected]> | 2018-06-10 20:42:02 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-06-10 20:42:02 +0300 |
commit | 6831a8865388189f248933e1023fbb4e3b11f67e (patch) | |
tree | b503e15d8450d93dc19f323be2536b016b941ba5 /static_pool.go | |
parent | ed3a0f2bb25077c7a32a54ddb3754f04ffbdccf3 (diff) |
debug mode have been added
Diffstat (limited to 'static_pool.go')
-rw-r--r-- | static_pool.go | 37 |
1 files changed, 23 insertions, 14 deletions
diff --git a/static_pool.go b/static_pool.go index 0ae345e5..9f4aab23 100644 --- a/static_pool.go +++ b/static_pool.go @@ -142,21 +142,30 @@ func (p *StaticPool) Destroy() { // finds free worker in a given time interval or creates new if allowed. func (p *StaticPool) allocateWorker() (w *Worker, err error) { - select { - case w = <-p.free: - return w, nil - default: - // enable timeout handler - } + // this loop is required to skip issues with dead workers still being in a ring. + for i := uint64(0); i < p.cfg.NumWorkers; i++ { + select { + case w = <-p.free: + if w.state.Value() == StateReady { + return w, nil + } + default: + // enable timeout handler + } - timeout := time.NewTimer(p.cfg.AllocateTimeout) - select { - case <-timeout.C: - return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout) - case w := <-p.free: - timeout.Stop() - return w, nil + timeout := time.NewTimer(p.cfg.AllocateTimeout) + select { + case <-timeout.C: + return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout) + case w := <-p.free: + timeout.Stop() + if w.state.Value() == StateReady { + return w, nil + } + } } + + return w, nil } // release releases or replaces the worker. @@ -209,7 +218,7 @@ func (p *StaticPool) createWorker() (*Worker, error) { // attempting to replace worker if err := p.replaceWorker(w, err); err != nil { - p.throw(EventPoolError, fmt.Errorf("unable to replace dead worker: %s", err)) + p.throw(EventPoolError, fmt.Errorf("unable to replace: %s", err)) } } }(w) |