summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-09-23 13:50:12 +0300
committerWolfy-J <[email protected]>2018-09-23 13:50:12 +0300
commit897fa09e144e6ff4d6213346eef917c7112d75e0 (patch)
tree1fcd425e2c7c9f1bdbcc8ac3c3b44e7a4d4b5a77 /static_pool.go
parentbdff4b25d2a879357bc0ed53e96c0b551de07f88 (diff)
improved server stopping mechanism
Diffstat (limited to 'static_pool.go')
-rw-r--r--static_pool.go8
1 files changed, 7 insertions, 1 deletions
diff --git a/static_pool.go b/static_pool.go
index 95d2fe14..b6e43ddc 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -42,6 +42,7 @@ type StaticPool struct {
// pool is being destroyed
inDestroy int32
+ destroy chan interface{}
// lsn is optional callback to handle worker create/destruct/error events.
mul sync.Mutex
@@ -60,6 +61,7 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
factory: factory,
workers: make([]*Worker, 0, cfg.NumWorkers),
free: make(chan *Worker, cfg.NumWorkers),
+ destroy: make(chan interface{}),
}
// constant number of workers simplify logic
@@ -144,7 +146,7 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
// Destroy all underlying workers (but let them to complete the task).
func (p *StaticPool) Destroy() {
atomic.AddInt32(&p.inDestroy, 1)
-
+ close(p.destroy)
p.tasks.Wait()
var wg sync.WaitGroup
@@ -173,6 +175,8 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
}
return w, nil
+ case <-p.destroy:
+ return nil, fmt.Errorf("pool has been stopped")
default:
// enable timeout handler
}
@@ -189,6 +193,8 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
continue
}
return w, nil
+ case <-p.destroy:
+ return nil, fmt.Errorf("pool has been stopped")
}
}