summaryrefslogtreecommitdiff
path: root/pkg/pool/supervisor_pool.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-24 00:12:05 +0300
committerValery Piashchynski <[email protected]>2021-02-24 00:12:05 +0300
commite684ac16035bed9a4c09677b0db3b33477955dc9 (patch)
tree0d226eb8ad9730ede1f7cd80b5f7b44d1fb23b0a /pkg/pool/supervisor_pool.go
parent18a097292a567fccdd02304ff236bf78d769965d (diff)
Fix pool behavior, update tests
Diffstat (limited to 'pkg/pool/supervisor_pool.go')
-rwxr-xr-xpkg/pool/supervisor_pool.go63
1 files changed, 40 insertions, 23 deletions
diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go
index bfb997d8..5abeae7a 100755
--- a/pkg/pool/supervisor_pool.go
+++ b/pkg/pool/supervisor_pool.go
@@ -48,7 +48,7 @@ type ttlExec struct {
p payload.Payload
}
-func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
+func (sp *supervised) execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error) {
const op = errors.Op("supervised_exec_with_context")
if sp.cfg.ExecTTL == 0 {
return sp.pool.Exec(rqs)
@@ -58,7 +58,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload)
ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL)
defer cancel()
go func() {
- res, err := sp.pool.ExecWithContext(ctx, rqs)
+ res, err := sp.pool.execWithTTL(ctx, rqs)
if err != nil {
c <- ttlExec{
err: errors.E(op, err),
@@ -86,13 +86,42 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload)
}
}
-func (sp *supervised) Exec(p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("supervised_exec")
- rsp, err := sp.pool.Exec(p)
- if err != nil {
- return payload.Payload{}, errors.E(op, err)
+func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) {
+ const op = errors.Op("supervised_exec_with_context")
+ if sp.cfg.ExecTTL == 0 {
+ return sp.pool.Exec(rqs)
+ }
+
+ c := make(chan ttlExec, 1)
+ ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL)
+ defer cancel()
+ go func() {
+ res, err := sp.pool.execWithTTL(ctx, rqs)
+ if err != nil {
+ c <- ttlExec{
+ err: errors.E(op, err),
+ p: payload.Payload{},
+ }
+ }
+
+ c <- ttlExec{
+ err: nil,
+ p: res,
+ }
+ }()
+
+ for {
+ select {
+ case <-ctx.Done():
+ return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err())
+ case res := <-c:
+ if res.err != nil {
+ return payload.Payload{}, res.err
+ }
+
+ return res.p, nil
+ }
}
- return rsp, nil
}
func (sp *supervised) GetConfig() interface{} {
@@ -155,21 +184,13 @@ func (sp *supervised) control() {
}
if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() {
- err = sp.pool.RemoveWorker(workers[i])
- if err != nil {
- sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
- return
- }
+ workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]})
continue
}
if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB {
- err = sp.pool.RemoveWorker(workers[i])
- if err != nil {
- sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
- return
- }
+ workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]})
continue
}
@@ -210,11 +231,7 @@ func (sp *supervised) control() {
// After the control check, res will be 5, idle is 1
// 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done.
if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 {
- err = sp.pool.RemoveWorker(workers[i])
- if err != nil {
- sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)})
- return
- }
+ workers[i].State().Set(worker.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]})
}
}