summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2019-05-02 18:21:36 +0300
committerWolfy-J <[email protected]>2019-05-02 18:21:36 +0300
commiteb8c64941cbcd30ff79b6147efd5fef42eccb648 (patch)
treebde0ceb7e7236850cfe999da7c3ffecf62b58d00 /static_pool.go
parent34abca68708ed881c3360ee749d794b0000a3aec (diff)
miiiinor performance optimizations
Diffstat (limited to 'static_pool.go')
-rw-r--r--static_pool.go44
1 files changed, 15 insertions, 29 deletions
diff --git a/static_pool.go b/static_pool.go
index c88548ed..bd30afbd 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -42,8 +42,7 @@ type StaticPool struct {
workers []*Worker
// invalid declares set of workers to be removed from the pool.
- mur sync.Mutex
- remove map[*Worker]error
+ remove sync.Map
// pool is being destroyed
inDestroy int32
@@ -65,7 +64,6 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
cmd: cmd,
factory: factory,
workers: make([]*Worker, 0, cfg.NumWorkers),
- remove: make(map[*Worker]error),
free: make(chan *Worker, cfg.NumWorkers),
destroy: make(chan interface{}),
}
@@ -118,9 +116,7 @@ func (p *StaticPool) Workers() (workers []*Worker) {
// Remove forces pool to destroy specific worker.
func (p *StaticPool) Remove(w *Worker, err error) {
- p.mur.Lock()
- p.remove[w] = err
- p.mur.Unlock()
+ p.remove.Store(w, err)
}
// Exec one task with given payload and context, returns result or error.
@@ -193,12 +189,12 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
continue
}
- if remove, err := p.removeWorker(w); remove {
- i++
- atomic.AddInt64(&p.numDead, 1)
-
- w.markDestroying()
+ if err, remove := p.remove.Load(w); remove {
go p.destroyWorker(w, err)
+
+ // get next worker
+ i++
+ continue
}
return w, nil
@@ -220,12 +216,12 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
continue
}
- if remove, err := p.removeWorker(w); remove {
- i++
- atomic.AddInt64(&p.numDead, 1)
-
- w.markDestroying()
+ if err, remove := p.remove.Load(w); remove {
go p.destroyWorker(w, err)
+
+ // get next worker
+ i++
+ continue
}
return w, nil
@@ -244,7 +240,7 @@ func (p *StaticPool) release(w *Worker) {
return
}
- if remove, err := p.removeWorker(w); remove {
+ if err, remove := p.remove.Load(w); remove {
go p.destroyWorker(w, err)
return
}
@@ -305,9 +301,7 @@ func (p *StaticPool) watchWorker(w *Worker) {
for i, wc := range p.workers {
if wc == w {
p.workers = append(p.workers[:i], p.workers[i+1:]...)
- p.mur.Lock()
- delete(p.remove, w)
- p.mur.Unlock()
+ p.remove.Delete(w)
break
}
}
@@ -337,13 +331,6 @@ func (p *StaticPool) watchWorker(w *Worker) {
}
}
-func (p *StaticPool) removeWorker(w *Worker) (removed bool, err error) {
- p.mur.Lock()
- err, removed = p.remove[w]
- p.mur.Unlock()
- return
-}
-
func (p *StaticPool) destroyed() bool {
return atomic.LoadInt32(&p.inDestroy) != 0
}
@@ -351,9 +338,8 @@ func (p *StaticPool) destroyed() bool {
// throw invokes event handler if any.
func (p *StaticPool) throw(event int, ctx interface{}) {
p.mul.Lock()
- defer p.mul.Unlock()
-
if p.lsn != nil {
p.lsn(event, ctx)
}
+ p.mul.Unlock()
}