diff options
Diffstat (limited to 'static_pool.go')
-rw-r--r-- | static_pool.go | 10 |
1 files changed, 9 insertions, 1 deletions
diff --git a/static_pool.go b/static_pool.go index 42913b28..07aa2028 100644 --- a/static_pool.go +++ b/static_pool.go @@ -31,6 +31,9 @@ type StaticPool struct { // workers circular allocation buffer free chan *Worker + // number of workers expected to be dead in a buffer. + numDead int64 + // protects state of worker list, does not affect allocation muw sync.RWMutex @@ -154,11 +157,12 @@ func (p *StaticPool) Destroy() { // finds free worker in a given time interval or creates new if allowed. func (p *StaticPool) allocateWorker() (w *Worker, err error) { - for i := int64(0); i <= p.cfg.NumWorkers; i++ { + for i := atomic.LoadInt64(&p.numDead); i >= 0; i++ { // this loop is required to skip issues with dead workers still being in a ring. select { case w = <-p.free: if w.State().Value() != StateReady { + atomic.AddInt64(&p.numDead, ^int64(0)) continue } @@ -175,6 +179,7 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { timeout.Stop() if w.State().Value() != StateReady { + atomic.AddInt64(&p.numDead, ^int64(0)) continue } return w, nil @@ -245,6 +250,9 @@ func (p *StaticPool) watchWorker(w *Worker) { } p.muw.Unlock() + // registering dead worker + atomic.AddInt64(&p.numDead, 1) + // worker have died unexpectedly, pool should attempt to replace it with alive version safely if err != nil { p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err}) |