summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'static_pool.go')
-rw-r--r--static_pool.go10
1 files changed, 9 insertions, 1 deletions
diff --git a/static_pool.go b/static_pool.go
index 42913b28..07aa2028 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -31,6 +31,9 @@ type StaticPool struct {
// workers circular allocation buffer
free chan *Worker
+ // number of workers expected to be dead in a buffer.
+ numDead int64
+
// protects state of worker list, does not affect allocation
muw sync.RWMutex
@@ -154,11 +157,12 @@ func (p *StaticPool) Destroy() {
// finds free worker in a given time interval or creates new if allowed.
func (p *StaticPool) allocateWorker() (w *Worker, err error) {
- for i := int64(0); i <= p.cfg.NumWorkers; i++ {
+ for i := atomic.LoadInt64(&p.numDead); i >= 0; i++ {
// this loop is required to skip issues with dead workers still being in a ring.
select {
case w = <-p.free:
if w.State().Value() != StateReady {
+ atomic.AddInt64(&p.numDead, ^int64(0))
continue
}
@@ -175,6 +179,7 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
timeout.Stop()
if w.State().Value() != StateReady {
+ atomic.AddInt64(&p.numDead, ^int64(0))
continue
}
return w, nil
@@ -245,6 +250,9 @@ func (p *StaticPool) watchWorker(w *Worker) {
}
p.muw.Unlock()
+ // registering dead worker
+ atomic.AddInt64(&p.numDead, 1)
+
// worker have died unexpectedly, pool should attempt to replace it with alive version safely
if err != nil {
p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})