summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'static_pool.go')
-rw-r--r--static_pool.go60
1 files changed, 55 insertions, 5 deletions
diff --git a/static_pool.go b/static_pool.go
index c9473699..02960825 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -41,6 +41,10 @@ type StaticPool struct {
// all registered workers
workers []*Worker
+ // invalid declares set of workers to be removed from the pool.
+ mur sync.Mutex
+ remove sync.Map
+
// pool is being destroyed
inDestroy int32
destroy chan interface{}
@@ -111,6 +115,21 @@ func (p *StaticPool) Workers() (workers []*Worker) {
return workers
}
+// Remove forces pool to remove specific worker.
+func (p *StaticPool) Remove(w *Worker, err error) bool {
+ if w.State().Value() != StateReady && w.State().Value() != StateWorking {
+ // unable to remove inactive worker
+ return false
+ }
+
+ if _, ok := p.remove.Load(w); ok {
+ return false
+ }
+
+ p.remove.Store(w, err)
+ return true
+}
+
// Exec one task with given payload and context, returns result or error.
func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
p.tmu.Lock()
@@ -133,13 +152,13 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
return nil, err
}
- go p.destroyWorker(w, err)
+ p.discardWorker(w, err)
return nil, err
}
// worker want's to be terminated
if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- go p.destroyWorker(w, err)
+ p.discardWorker(w, err)
return p.Exec(rqs)
}
@@ -159,6 +178,7 @@ func (p *StaticPool) Destroy() {
var wg sync.WaitGroup
for _, w := range p.Workers() {
wg.Add(1)
+ w.markInvalid()
go func(w *Worker) {
defer wg.Done()
p.destroyWorker(w, nil)
@@ -181,6 +201,14 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
continue
}
+ if err, remove := p.remove.Load(w); remove {
+ p.discardWorker(w, err)
+
+ // get next worker
+ i++
+ continue
+ }
+
return w, nil
case <-p.destroy:
return nil, fmt.Errorf("pool has been stopped")
@@ -199,8 +227,19 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
atomic.AddInt64(&p.numDead, ^int64(0))
continue
}
+
+ if err, remove := p.remove.Load(w); remove {
+ p.discardWorker(w, err)
+
+ // get next worker
+ i++
+ continue
+ }
+
return w, nil
case <-p.destroy:
+ timeout.Stop()
+
return nil, fmt.Errorf("pool has been stopped")
}
}
@@ -211,7 +250,12 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
// release releases or replaces the worker.
func (p *StaticPool) release(w *Worker) {
if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs {
- go p.destroyWorker(w, p.cfg.MaxJobs)
+ p.discardWorker(w, p.cfg.MaxJobs)
+ return
+ }
+
+ if err, remove := p.remove.Load(w); remove {
+ p.discardWorker(w, err)
return
}
@@ -242,6 +286,12 @@ func (p *StaticPool) createWorker() (*Worker, error) {
return w, nil
}
+// gentry remove worker
+func (p *StaticPool) discardWorker(w *Worker, caused interface{}) {
+ w.markInvalid()
+ go p.destroyWorker(w, caused)
+}
+
// destroyWorker destroys workers and removes it from the pool.
func (p *StaticPool) destroyWorker(w *Worker, caused interface{}) {
go w.Stop()
@@ -271,6 +321,7 @@ func (p *StaticPool) watchWorker(w *Worker) {
for i, wc := range p.workers {
if wc == w {
p.workers = append(p.workers[:i], p.workers[i+1:]...)
+ p.remove.Delete(w)
break
}
}
@@ -307,9 +358,8 @@ func (p *StaticPool) destroyed() bool {
// throw invokes event handler if any.
func (p *StaticPool) throw(event int, ctx interface{}) {
p.mul.Lock()
- defer p.mul.Unlock()
-
if p.lsn != nil {
p.lsn(event, ctx)
}
+ p.mul.Unlock()
}