summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-06-12 21:13:54 +0300
committerWolfy-J <[email protected]>2018-06-12 21:13:54 +0300
commit47aa87a1cd438c6c99a4640f6af60fd408b5c3bc (patch)
treeb405ad7e1d3e6810c818be47a9cf67b7a534053c /static_pool.go
parent99c1121d8fd31b7b61b6c22e181dc3c05a3f9f82 (diff)
improved worker watching
Diffstat (limited to 'static_pool.go')
-rw-r--r--static_pool.go35
1 files changed, 22 insertions, 13 deletions
diff --git a/static_pool.go b/static_pool.go
index a972b04a..18bfaf60 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -150,20 +150,29 @@ func (p *StaticPool) Destroy() {
// finds free worker in a given time interval or creates new if allowed.
func (p *StaticPool) allocateWorker() (w *Worker, err error) {
- // this loop is required to skip issues with dead workers still being in a ring.
- select {
- case w = <-p.free:
- return w, nil
- default:
- // enable timeout handler
- }
+ for i := int64(0); i <= p.cfg.NumWorkers; i++ {
+ // this loop is required to skip issues with dead workers still being in a ring.
+ select {
+ case w = <-p.free:
+ if w.State().Value() != StateReady {
+ continue
+ }
+
+ return w, nil
+ default:
+ // enable timeout handler
+ }
- timeout := time.NewTimer(p.cfg.AllocateTimeout)
- select {
- case <-timeout.C:
- return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
- case w = <-p.free:
- timeout.Stop()
+ timeout := time.NewTimer(p.cfg.AllocateTimeout)
+ select {
+ case <-timeout.C:
+ return nil, fmt.Errorf("worker timeout (%s)", p.cfg.AllocateTimeout)
+ case w = <-p.free:
+ timeout.Stop()
+ if w.State().Value() != StateReady {
+ continue
+ }
+ }
}
return w, nil