diff options
author | Wolfy-J <[email protected]> | 2018-09-23 15:55:57 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2018-09-23 15:55:57 +0300 |
commit | 6628249e68a85e6c2fed6d5802fa247388b053dc (patch) | |
tree | def759b509dbd32569afc8229c8888fa6599e1bf /static_pool.go | |
parent | bdff4b25d2a879357bc0ed53e96c0b551de07f88 (diff) | |
parent | eb64ebee3c77522202c5163513e7318bd630f8be (diff) |
Merge pull request #37 from spiral/feature/1.3.0
Feature/1.3.0
Diffstat (limited to 'static_pool.go')
-rw-r--r-- | static_pool.go | 13 |
1 files changed, 13 insertions, 0 deletions
diff --git a/static_pool.go b/static_pool.go index 95d2fe14..be1a6f3b 100644 --- a/static_pool.go +++ b/static_pool.go @@ -26,6 +26,7 @@ type StaticPool struct { factory Factory // active task executions + tmu sync.Mutex tasks sync.WaitGroup // workers circular allocation buf @@ -42,6 +43,7 @@ type StaticPool struct { // pool is being destroyed inDestroy int32 + destroy chan interface{} // lsn is optional callback to handle worker create/destruct/error events. mul sync.Mutex @@ -60,6 +62,7 @@ func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, er factory: factory, workers: make([]*Worker, 0, cfg.NumWorkers), free: make(chan *Worker, cfg.NumWorkers), + destroy: make(chan interface{}), } // constant number of workers simplify logic @@ -110,7 +113,10 @@ func (p *StaticPool) Workers() (workers []*Worker) { // Exec one task with given payload and context, returns result or error. func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { + p.tmu.Lock() p.tasks.Add(1) + p.tmu.Unlock() + defer p.tasks.Done() w, err := p.allocateWorker() @@ -145,7 +151,10 @@ func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error) { func (p *StaticPool) Destroy() { atomic.AddInt32(&p.inDestroy, 1) + p.tmu.Lock() + close(p.destroy) p.tasks.Wait() + p.tmu.Unlock() var wg sync.WaitGroup for _, w := range p.Workers() { @@ -173,6 +182,8 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { } return w, nil + case <-p.destroy: + return nil, fmt.Errorf("pool has been stopped") default: // enable timeout handler } @@ -189,6 +200,8 @@ func (p *StaticPool) allocateWorker() (w *Worker, err error) { continue } return w, nil + case <-p.destroy: + return nil, fmt.Errorf("pool has been stopped") } } |