summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'static_pool.go')
-rw-r--r--static_pool.go61
1 files changed, 37 insertions, 24 deletions
diff --git a/static_pool.go b/static_pool.go
index 69edfd35..54e0caac 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -41,14 +41,17 @@ type StaticPool struct {
// all registered workers
workers []*Worker
+ // invalid declares set of workers to be removed from the pool.
+ mur sync.Mutex
+ remove map[*Worker]error
+
// pool is being destroyed
inDestroy int32
destroy chan interface{}
// lsn is optional callback to handle worker create/destruct/error events.
- mul sync.Mutex
- watcher Watcher
- lsn func(event int, ctx interface{})
+ mul sync.Mutex
+ lsn func(event int, ctx interface{})
}
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
@@ -62,6 +65,7 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
cmd: cmd,
factory: factory,
workers: make([]*Worker, 0, cfg.NumWorkers),
+ remove: make(map[*Worker]error),
free: make(chan *Worker, cfg.NumWorkers),
destroy: make(chan interface{}),
}
@@ -81,15 +85,6 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
return p, nil
}
-// Watch enables worker watching (to destroy expired workers or workers which experience
-// memory leaks).
-func (p *StaticPool) Watch(w Watcher) {
- p.mul.Lock()
- defer p.mul.Unlock()
-
- p.watcher = w
-}
-
// Listen attaches pool event watcher.
func (p *StaticPool) Listen(l func(event int, ctx interface{})) {
p.mul.Lock()
@@ -121,6 +116,13 @@ func (p *StaticPool) Workers() (workers []*Worker) {
return workers
}
+// Remove forces pool to destroy specific worker.
+func (p *StaticPool) Remove(w *Worker, err error) {
+ p.mur.Lock()
+ p.remove[w] = err
+ p.mur.Unlock()
+}
+
// Exec one task with given payload and context, returns result or error.
func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
p.tmu.Lock()
@@ -191,12 +193,10 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
continue
}
- if p.watcher != nil {
- if keep, err := p.watcher.Keep(p, w); !keep {
- i++
- w.markDestroying()
- go p.destroyWorker(w, err)
- }
+ if remove, err := p.removeWorker(w); remove {
+ i++
+ w.markDestroying()
+ go p.destroyWorker(w, err)
}
return w, nil
@@ -218,12 +218,10 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
continue
}
- if p.watcher != nil {
- if keep, err := p.watcher.Keep(p, w); !keep {
- i++
- w.markDestroying()
- go p.destroyWorker(w, err)
- }
+ if remove, err := p.removeWorker(w); remove {
+ i++
+ w.markDestroying()
+ go p.destroyWorker(w, err)
}
return w, nil
@@ -242,6 +240,11 @@ func (p *StaticPool) release(w *Worker) {
return
}
+ if remove, err := p.removeWorker(w); remove {
+ go p.destroyWorker(w, err)
+ return
+ }
+
p.free <- w
}
@@ -298,6 +301,9 @@ func (p *StaticPool) watchWorker(w *Worker) {
for i, wc := range p.workers {
if wc == w {
p.workers = append(p.workers[:i], p.workers[i+1:]...)
+ p.mur.Lock()
+ delete(p.remove, w)
+ p.mur.Unlock()
break
}
}
@@ -327,6 +333,13 @@ func (p *StaticPool) watchWorker(w *Worker) {
}
}
+func (p *StaticPool) removeWorker(w *Worker) (removed bool, err error) {
+ p.mur.Lock()
+ err, removed = p.remove[w]
+ p.mur.Unlock()
+ return
+}
+
func (p *StaticPool) destroyed() bool {
return atomic.LoadInt32(&p.inDestroy) != 0
}