summaryrefslogtreecommitdiff
path: root/pool/supervisor_pool.go
diff options
context:
space:
mode:
Diffstat (limited to 'pool/supervisor_pool.go')
-rwxr-xr-xpool/supervisor_pool.go10
1 files changed, 9 insertions, 1 deletions
diff --git a/pool/supervisor_pool.go b/pool/supervisor_pool.go
index 1a94f6a0..3dbae417 100755
--- a/pool/supervisor_pool.go
+++ b/pool/supervisor_pool.go
@@ -54,12 +54,16 @@ func (sp *supervised) execWithTTL(_ context.Context, _ *payload.Payload) (*paylo
func (sp *supervised) Exec(rqs *payload.Payload) (*payload.Payload, error) {
const op = errors.Op("supervised_exec_with_context")
if sp.cfg.ExecTTL == 0 {
+ sp.mu.RLock()
+ defer sp.mu.RUnlock()
return sp.pool.Exec(rqs)
}
ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL)
defer cancel()
+ sp.mu.RLock()
+ defer sp.mu.RUnlock()
res, err := sp.pool.execWithTTL(ctx, rqs)
if err != nil {
return nil, errors.E(op, err)
@@ -83,16 +87,20 @@ func (sp *supervised) RemoveWorker(worker worker.BaseProcess) error {
}
func (sp *supervised) Destroy(ctx context.Context) {
+ sp.Stop()
+ sp.mu.Lock()
sp.pool.Destroy(ctx)
+ sp.mu.Unlock()
}
func (sp *supervised) Start() {
go func() {
watchTout := time.NewTicker(sp.cfg.WatchTick)
+ defer watchTout.Stop()
+
for {
select {
case <-sp.stopCh:
- watchTout.Stop()
return
// stop here
case <-watchTout.C: