summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-06-10 20:42:02 +0300
committerWolfy-J <[email protected]>2018-06-10 20:42:02 +0300
commit6831a8865388189f248933e1023fbb4e3b11f67e (patch)
treeb503e15d8450d93dc19f323be2536b016b941ba5 /static_pool.go
parented3a0f2bb25077c7a32a54ddb3754f04ffbdccf3 (diff)
debug mode have been added
Diffstat (limited to 'static_pool.go')
-rw-r--r--static_pool.go37
1 files changed, 23 insertions, 14 deletions
diff --git a/static_pool.go b/static_pool.go
index 0ae345e5..9f4aab23 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -142,21 +142,30 @@ func (p *StaticPool) Destroy() {
// finds free worker in a given time interval or creates new if allowed.
func (p *StaticPool) allocateWorker() (w *Worker, err error) {
- select {
- case w = <-p.free:
- return w, nil
- default:
- // enable timeout handler
- }
+ // this loop is required to skip issues with dead workers still being in a ring.
+ for i := uint64(0); i < p.cfg.NumWorkers; i++ {
+ select {
+ case w = <-p.free:
+ if w.state.Value() == StateReady {
+ return w, nil
+ }
+ default:
+ // enable timeout handler
+ }
- timeout := time.NewTimer(p.cfg.AllocateTimeout)
- select {
- case <-timeout.C:
- return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
- case w := <-p.free:
- timeout.Stop()
- return w, nil
+ timeout := time.NewTimer(p.cfg.AllocateTimeout)
+ select {
+ case <-timeout.C:
+ return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
+ case w := <-p.free:
+ timeout.Stop()
+ if w.state.Value() == StateReady {
+ return w, nil
+ }
+ }
}
+
+ return w, nil
}
// release releases or replaces the worker.
@@ -209,7 +218,7 @@ func (p *StaticPool) createWorker() (*Worker, error) {
// attempting to replace worker
if err := p.replaceWorker(w, err); err != nil {
- p.throw(EventPoolError, fmt.Errorf("unable to replace dead worker: %s", err))
+ p.throw(EventPoolError, fmt.Errorf("unable to replace: %s", err))
}
}
}(w)