diff options
author | Wolfy-J <[email protected]> | 2019-05-02 18:21:36 +0300 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2019-05-02 18:21:36 +0300 |
commit | eb8c64941cbcd30ff79b6147efd5fef42eccb648 (patch) | |
tree | bde0ceb7e7236850cfe999da7c3ffecf62b58d00 /static_pool.go | |
parent | 34abca68708ed881c3360ee749d794b0000a3aec (diff) |
miiiinor performance optimizations
Diffstat (limited to 'static_pool.go')
-rw-r--r-- | static_pool.go | 44 |
1 files changed, 15 insertions, 29 deletions
diff --git a/static_pool.go b/static_pool.go index c88548ed..bd30afbd 100644 --- a/static_pool.go +++ b/static_pool.go @@ -42,8 +42,7 @@ type StaticPool struct { workers []*Worker // invalid declares set of workers to be removed from the pool. - mur sync.Mutex - remove map[*Worker]error + remove sync.Map // pool is being destroyed inDestroy int32 @@ -65,7 +64,6 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er cmd: cmd, factory: factory, workers: make([]*Worker, 0, cfg.NumWorkers), - remove: make(map[*Worker]error), free: make(chan *Worker, cfg.NumWorkers), destroy: make(chan interface{}), } @@ -118,9 +116,7 @@ func (p *StaticPool) Workers() (workers []*Worker) { // Remove forces pool to destroy specific worker. func (p *StaticPool) Remove(w *Worker, err error) { - p.mur.Lock() - p.remove[w] = err - p.mur.Unlock() + p.remove.Store(w, err) } // Exec one task with given payload and context, returns result or error. @@ -193,12 +189,12 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { continue } - if remove, err := p.removeWorker(w); remove { - i++ - atomic.AddInt64(&p.numDead, 1) - - w.markDestroying() + if err, remove := p.remove.Load(w); remove { go p.destroyWorker(w, err) + + // get next worker + i++ + continue } return w, nil @@ -220,12 +216,12 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { continue } - if remove, err := p.removeWorker(w); remove { - i++ - atomic.AddInt64(&p.numDead, 1) - - w.markDestroying() + if err, remove := p.remove.Load(w); remove { go p.destroyWorker(w, err) + + // get next worker + i++ + continue } return w, nil @@ -244,7 +240,7 @@ func (p *StaticPool) release(w *Worker) { return } - if remove, err := p.removeWorker(w); remove { + if err, remove := p.remove.Load(w); remove { go p.destroyWorker(w, err) return } @@ -305,9 +301,7 @@ func (p *StaticPool) watchWorker(w *Worker) { for i, wc := range p.workers { if wc == w { p.workers = append(p.workers[:i], p.workers[i+1:]...) - p.mur.Lock() - delete(p.remove, w) - p.mur.Unlock() + p.remove.Delete(w) break } } @@ -337,13 +331,6 @@ func (p *StaticPool) watchWorker(w *Worker) { } } -func (p *StaticPool) removeWorker(w *Worker) (removed bool, err error) { - p.mur.Lock() - err, removed = p.remove[w] - p.mur.Unlock() - return -} - func (p *StaticPool) destroyed() bool { return atomic.LoadInt32(&p.inDestroy) != 0 } @@ -351,9 +338,8 @@ func (p *StaticPool) destroyed() bool { // throw invokes event handler if any. func (p *StaticPool) throw(event int, ctx interface{}) { p.mul.Lock() - defer p.mul.Unlock() - if p.lsn != nil { p.lsn(event, ctx) } + p.mul.Unlock() } |