summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-06-03 12:54:43 +0300
committerWolfy-J <[email protected]>2018-06-03 12:54:43 +0300
commit36ea77baa5a41de10bd604cd0e5b5b3cafaaeb64 (patch)
tree13ca8abd454a6668f490eec2e44b1520bd3953fe /static_pool.go
parentb02611b7266589d888e054a1d2e4432ae370617d (diff)
service bus, http service, rpc bus, cli commands, new configs
Diffstat (limited to 'static_pool.go')
-rw-r--r--static_pool.go29
1 files changed, 18 insertions, 11 deletions
diff --git a/static_pool.go b/static_pool.go
index be5e9b06..0527d024 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -107,7 +107,7 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
if err != nil {
// soft job errors are allowed
if _, jobError := err.(JobError); jobError {
- p.free <- w
+ p.release(w)
return nil, err
}
@@ -121,12 +121,7 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
return p.Exec(rqs)
}
- if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions {
- go p.replaceWorker(w, p.cfg.MaxExecutions)
- } else {
- p.free <- w
- }
-
+ p.release(w)
return rsp, nil
}
@@ -165,6 +160,16 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
}
}
+// release releases or replaces the worker.
+func (p *StaticPool) release(w *Worker) {
+ if p.cfg.MaxExecutions != 0 && w.State().NumExecs() >= p.cfg.MaxExecutions {
+ go p.replaceWorker(w, p.cfg.MaxExecutions)
+ return
+ }
+
+ p.free <- w
+}
+
// replaceWorker replaces dead or expired worker with new instance.
func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) {
go p.destroyWorker(w)
@@ -183,13 +188,11 @@ func (p *StaticPool) replaceWorker(w *Worker, caused interface{}) {
// destroyWorker destroys workers and removes it from the pool.
func (p *StaticPool) destroyWorker(w *Worker) {
- p.throw(EventWorkerDestruct, w)
-
// detaching
p.muw.Lock()
for i, wc := range p.workers {
if wc == w {
- p.workers = p.workers[:i+1]
+ p.workers = append(p.workers[:i], p.workers[i+1:]...)
break
}
}
@@ -200,11 +203,15 @@ func (p *StaticPool) destroyWorker(w *Worker) {
select {
case <-w.waitDone:
// worker is dead
+ p.throw(EventWorkerDestruct, w)
+
case <-time.NewTimer(p.cfg.DestroyTimeout).C:
- // failed to stop process
+ // failed to stop process in given time
if err := w.Kill(); err != nil {
p.throw(EventWorkerError, WorkerError{Worker: w, Caused: err})
}
+
+ p.throw(EventWorkerKill, w)
}
}