diff options
Diffstat (limited to 'pkg/pool/supervisor_pool.go')
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 63 |
1 files changed, 40 insertions, 23 deletions
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index bfb997d8..5abeae7a 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -48,7 +48,7 @@ type ttlExec struct { p payload.Payload } -func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { +func (sp *supervised) execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { const op = errors.Op("supervised_exec_with_context") if sp.cfg.ExecTTL == 0 { return sp.pool.Exec(rqs) @@ -58,7 +58,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL) defer cancel() go func() { - res, err := sp.pool.ExecWithContext(ctx, rqs) + res, err := sp.pool.execWithTTL(ctx, rqs) if err != nil { c <- ttlExec{ err: errors.E(op, err), @@ -86,13 +86,42 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) } } -func (sp *supervised) Exec(p payload.Payload) (payload.Payload, error) { - const op = errors.Op("supervised_exec") - rsp, err := sp.pool.Exec(p) - if err != nil { - return payload.Payload{}, errors.E(op, err) +func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) { + const op = errors.Op("supervised_exec_with_context") + if sp.cfg.ExecTTL == 0 { + return sp.pool.Exec(rqs) + } + + c := make(chan ttlExec, 1) + ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL) + defer cancel() + go func() { + res, err := sp.pool.execWithTTL(ctx, rqs) + if err != nil { + c <- ttlExec{ + err: errors.E(op, err), + p: payload.Payload{}, + } + } + + c <- ttlExec{ + err: nil, + p: res, + } + }() + + for { + select { + case <-ctx.Done(): + return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) + case res := <-c: + if res.err != nil { + return payload.Payload{}, res.err + } + + return res.p, nil + } } - return rsp, nil } func (sp *supervised) GetConfig() interface{} { @@ -155,21 +184,13 @@ func (sp *supervised) control() { } if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() { - err = sp.pool.RemoveWorker(workers[i]) - if err != nil { - sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) - return - } + workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) continue } if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { - err = sp.pool.RemoveWorker(workers[i]) - if err != nil { - sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) - return - } + workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) continue } @@ -210,11 +231,7 @@ func (sp *supervised) control() { // After the control check, res will be 5, idle is 1 // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 { - err = sp.pool.RemoveWorker(workers[i]) - if err != nil { - sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) - return - } + workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) } } |