summaryrefslogtreecommitdiff
path: root/static_pool.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-09-23 15:55:57 +0300
committerGitHub <[email protected]>2018-09-23 15:55:57 +0300
commit6628249e68a85e6c2fed6d5802fa247388b053dc (patch)
treedef759b509dbd32569afc8229c8888fa6599e1bf /static_pool.go
parentbdff4b25d2a879357bc0ed53e96c0b551de07f88 (diff)
parenteb64ebee3c77522202c5163513e7318bd630f8be (diff)
Merge pull request #37 from spiral/feature/1.3.0
Feature/1.3.0
Diffstat (limited to 'static_pool.go')
-rw-r--r--static_pool.go13
1 files changed, 13 insertions, 0 deletions
diff --git a/static_pool.go b/static_pool.go
index 95d2fe14..be1a6f3b 100644
--- a/static_pool.go
+++ b/static_pool.go
@@ -26,6 +26,7 @@ type StaticPool struct {
factory Factory
// active task executions
+ tmu sync.Mutex
tasks sync.WaitGroup
// workers circular allocation buf
@@ -42,6 +43,7 @@ type StaticPool struct {
// pool is being destroyed
inDestroy int32
+ destroy chan interface{}
// lsn is optional callback to handle worker create/destruct/error events.
mul sync.Mutex
@@ -60,6 +62,7 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er
factory: factory,
workers: make([]*Worker, 0, cfg.NumWorkers),
free: make(chan *Worker, cfg.NumWorkers),
+ destroy: make(chan interface{}),
}
// constant number of workers simplify logic
@@ -110,7 +113,10 @@ func (p *StaticPool) Workers() (workers []*Worker) {
// Exec one task with given payload and context, returns result or error.
func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
+ p.tmu.Lock()
p.tasks.Add(1)
+ p.tmu.Unlock()
+
defer p.tasks.Done()
w, err := p.allocateWorker()
@@ -145,7 +151,10 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) {
func (p *StaticPool) Destroy() {
atomic.AddInt32(&p.inDestroy, 1)
+ p.tmu.Lock()
+ close(p.destroy)
p.tasks.Wait()
+ p.tmu.Unlock()
var wg sync.WaitGroup
for _, w := range p.Workers() {
@@ -173,6 +182,8 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
}
return w, nil
+ case <-p.destroy:
+ return nil, fmt.Errorf("pool has been stopped")
default:
// enable timeout handler
}
@@ -189,6 +200,8 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) {
continue
}
return w, nil
+ case <-p.destroy:
+ return nil, fmt.Errorf("pool has been stopped")
}
}