diff options
Diffstat (limited to 'static_pool.go')
-rw-r--r-- | static_pool.go | 60 |
1 files changed, 55 insertions, 5 deletions
diff --git a/static_pool.go b/static_pool.go index c9473699..02960825 100644 --- a/static_pool.go +++ b/static_pool.go @@ -41,6 +41,10 @@ type StaticPool struct { // all registered workers workers []*Worker + // invalid declares set of workers to be removed from the pool. + mur sync.Mutex + remove sync.Map + // pool is being destroyed inDestroy int32 destroy chan interface{} @@ -111,6 +115,21 @@ func (p *StaticPool) Workers() (workers []*Worker) { return workers } +// Remove forces pool to remove specific worker. +func (p *StaticPool) Remove(w *Worker, err error) bool { + if w.State().Value() != StateReady && w.State().Value() != StateWorking { + // unable to remove inactive worker + return false + } + + if _, ok := p.remove.Load(w); ok { + return false + } + + p.remove.Store(w, err) + return true +} + // Exec one task with given payload and context, returns result or error. func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { p.tmu.Lock() @@ -133,13 +152,13 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { return nil, err } - go p.destroyWorker(w, err) + p.discardWorker(w, err) return nil, err } // worker want's to be terminated if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - go p.destroyWorker(w, err) + p.discardWorker(w, err) return p.Exec(rqs) } @@ -159,6 +178,7 @@ func (p *StaticPool) Destroy() { var wg sync.WaitGroup for _, w := range p.Workers() { wg.Add(1) + w.markInvalid() go func(w *Worker) { defer wg.Done() p.destroyWorker(w, nil) @@ -181,6 +201,14 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { continue } + if err, remove := p.remove.Load(w); remove { + p.discardWorker(w, err) + + // get next worker + i++ + continue + } + return w, nil case <-p.destroy: return nil, fmt.Errorf("pool has been stopped") @@ -199,8 +227,19 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { atomic.AddInt64(&p.numDead, ^int64(0)) continue } + + if err, remove := p.remove.Load(w); remove { + p.discardWorker(w, err) + + // get next worker + i++ + continue + } + return w, nil case <-p.destroy: + timeout.Stop() + return nil, fmt.Errorf("pool has been stopped") } } @@ -211,7 +250,12 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { // release releases or replaces the worker. func (p *StaticPool) release(w *Worker) { if p.cfg.MaxJobs != 0 && w.State().NumExecs() >= p.cfg.MaxJobs { - go p.destroyWorker(w, p.cfg.MaxJobs) + p.discardWorker(w, p.cfg.MaxJobs) + return + } + + if err, remove := p.remove.Load(w); remove { + p.discardWorker(w, err) return } @@ -242,6 +286,12 @@ func (p *StaticPool) createWorker() (*Worker, error) { return w, nil } +// gentry remove worker +func (p *StaticPool) discardWorker(w *Worker, caused interface{}) { + w.markInvalid() + go p.destroyWorker(w, caused) +} + // destroyWorker destroys workers and removes it from the pool. func (p *StaticPool) destroyWorker(w *Worker, caused interface{}) { go w.Stop() @@ -271,6 +321,7 @@ func (p *StaticPool) watchWorker(w *Worker) { for i, wc := range p.workers { if wc == w { p.workers = append(p.workers[:i], p.workers[i+1:]...) + p.remove.Delete(w) break } } @@ -307,9 +358,8 @@ func (p *StaticPool) destroyed() bool { // throw invokes event handler if any. func (p *StaticPool) throw(event int, ctx interface{}) { p.mul.Lock() - defer p.mul.Unlock() - if p.lsn != nil { p.lsn(event, ctx) } + p.mul.Unlock() } |