diff options
author | Wolfy-J <[email protected]> | 2019-05-02 17:30:43 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2019-05-02 17:30:43 +0300 |
commit | 8fc464213c471ad79be88d9926d8f45b66a23ec0 (patch) | |
tree | 4072167f6ca3a2d4bd9ed56ff9c017c8ed79e16a /static_pool.go | |
parent | a482d49122d92dd1cd82881ed221c55e5210e536 (diff) |
removing workers in runtime
Diffstat (limited to 'static_pool.go')
-rw-r--r-- | static_pool.go | 61 |
1 files changed, 37 insertions, 24 deletions
diff --git a/static_pool.go b/static_pool.go index 69edfd35..54e0caac 100644 --- a/static_pool.go +++ b/static_pool.go @@ -41,14 +41,17 @@ type StaticPool struct { // all registered workers workers []*Worker + // invalid declares set of workers to be removed from the pool. + mur sync.Mutex + remove map[*Worker]error + // pool is being destroyed inDestroy int32 destroy chan interface{} // lsn is optional callback to handle worker create/destruct/error events. - mul sync.Mutex - watcher Watcher - lsn func(event int, ctx interface{}) + mul sync.Mutex + lsn func(event int, ctx interface{}) } // NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. @@ -62,6 +65,7 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er cmd: cmd, factory: factory, workers: make([]*Worker, 0, cfg.NumWorkers), + remove: make(map[*Worker]error), free: make(chan *Worker, cfg.NumWorkers), destroy: make(chan interface{}), } @@ -81,15 +85,6 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er return p, nil } -// Watch enables worker watching (to destroy expired workers or workers which experience -// memory leaks). -func (p *StaticPool) Watch(w Watcher) { - p.mul.Lock() - defer p.mul.Unlock() - - p.watcher = w -} - // Listen attaches pool event watcher. func (p *StaticPool) Listen(l func(event int, ctx interface{})) { p.mul.Lock() @@ -121,6 +116,13 @@ func (p *StaticPool) Workers() (workers []*Worker) { return workers } +// Remove forces pool to destroy specific worker. +func (p *StaticPool) Remove(w *Worker, err error) { + p.mur.Lock() + p.remove[w] = err + p.mur.Unlock() +} + // Exec one task with given payload and context, returns result or error. func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { p.tmu.Lock() @@ -191,12 +193,10 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { continue } - if p.watcher != nil { - if keep, err := p.watcher.Keep(p, w); !keep { - i++ - w.markDestroying() - go p.destroyWorker(w, err) - } + if remove, err := p.removeWorker(w); remove { + i++ + w.markDestroying() + go p.destroyWorker(w, err) } return w, nil @@ -218,12 +218,10 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { continue } - if p.watcher != nil { - if keep, err := p.watcher.Keep(p, w); !keep { - i++ - w.markDestroying() - go p.destroyWorker(w, err) - } + if remove, err := p.removeWorker(w); remove { + i++ + w.markDestroying() + go p.destroyWorker(w, err) } return w, nil @@ -242,6 +240,11 @@ func (p *StaticPool) release(w *Worker) { return } + if remove, err := p.removeWorker(w); remove { + go p.destroyWorker(w, err) + return + } + p.free <- w } @@ -298,6 +301,9 @@ func (p *StaticPool) watchWorker(w *Worker) { for i, wc := range p.workers { if wc == w { p.workers = append(p.workers[:i], p.workers[i+1:]...) + p.mur.Lock() + delete(p.remove, w) + p.mur.Unlock() break } } @@ -327,6 +333,13 @@ func (p *StaticPool) watchWorker(w *Worker) { } } +func (p *StaticPool) removeWorker(w *Worker) (removed bool, err error) { + p.mur.Lock() + err, removed = p.remove[w] + p.mur.Unlock() + return +} + func (p *StaticPool) destroyed() bool { return atomic.LoadInt32(&p.inDestroy) != 0 } |