summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2019-05-02 17:17:16 +0300
committerWolfy-J <[email protected]>2019-05-02 17:17:16 +0300
commit7bd5da5327e0d6f602627006d55cf803b8c4a4c7 (patch)
tree2e76fba4b391e2733a6e59c8149bb22f6c91dfbc /static_pool.go
parent859441b4d084babca70266bd23ceb2a95269b3ff (diff)
fixing tests
Diffstat (limited to 'static_pool.go')
-rw-r--r--static_pool.go30
1 files changed, 28 insertions, 2 deletions
diff --git a/static_pool.go b/static_pool.go
index c9473699..336ae520 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -46,8 +46,9 @@ type StaticPool struct {
destroy chan interface{}
// lsn is optional callback to handle worker create/destruct/error events.
- mul sync.Mutex
- lsn func(event int, ctx interface{})
+ mul sync.Mutex
+ watcher Watcher
+ lsn func(event int, ctx interface{})
}
// NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
@@ -80,6 +81,14 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
return p, nil
}
+// WatchWorkers enables worker watching.
+func (p *StaticPool) WatchWorkers(w Watcher) {
+ p.mul.Lock()
+ defer p.mul.Unlock()
+
+ p.watcher = w
+}
+
// Listen attaches pool event watcher.
func (p *StaticPool) Listen(l func(event int, ctx interface{})) {
p.mul.Lock()
@@ -181,6 +190,14 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
continue
}
+ if p.watcher != nil {
+ if keep, err := p.watcher.Keep(p, w); !keep {
+ i++
+ w.markDestroying()
+ go p.destroyWorker(w, err)
+ }
+ }
+
return w, nil
case <-p.destroy:
return nil, fmt.Errorf("pool has been stopped")
@@ -199,6 +216,15 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
atomic.AddInt64(&p.numDead, ^int64(0))
continue
}
+
+ if p.watcher != nil {
+ if keep, err := p.watcher.Keep(p, w); !keep {
+ i++
+ w.markDestroying()
+ go p.destroyWorker(w, err)
+ }
+ }
+
return w, nil
case <-p.destroy:
return nil, fmt.Errorf("pool has been stopped")