diff options
author | Valery Piashchynski <[email protected]> | 2021-06-26 14:08:33 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-06-26 14:08:33 +0300 |
commit | 53e50a05bd27ecec03695b69defd920fc4a25c5c (patch) | |
tree | e86ca391e5a85118098c6340a0f0ae86747db042 /pkg/pool | |
parent | ad1ca84b26bb6a4ba410a8a684fe3d2e2f86eaea (diff) | |
parent | fc540f6029772ff51913b8ee3c082f8197010c52 (diff) |
Merge remote-tracking branch 'origin/master' into feature/jobs_plugin
Diffstat (limited to 'pkg/pool')
-rwxr-xr-x | pkg/pool/static_pool.go | 2 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 105 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 48 |
3 files changed, 81 insertions, 74 deletions
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index 74e06b81..1c149c51 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -174,7 +174,7 @@ func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (paylo return sp.execDebugWithTTL(ctx, p) } - ctxAlloc, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) + ctxAlloc, cancel := context.WithTimeout(context.Background(), sp.cfg.AllocateTimeout) defer cancel() w, err := sp.getWorker(ctxAlloc, op) if err != nil { diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index ca61dbc4..b09b6f6c 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -43,47 +43,8 @@ func supervisorWrapper(pool Pool, events events.Handler, cfg *SupervisorConfig) return sp } -type ttlExec struct { - err error - p payload.Payload -} - -func (sp *supervised) execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { - const op = errors.Op("supervised_exec_with_context") - if sp.cfg.ExecTTL == 0 { - return sp.pool.Exec(rqs) - } - - c := make(chan ttlExec, 1) - ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL) - defer cancel() - go func() { - res, err := sp.pool.execWithTTL(ctx, rqs) - if err != nil { - c <- ttlExec{ - err: errors.E(op, err), - p: payload.Payload{}, - } - } - - c <- ttlExec{ - err: nil, - p: res, - } - }() - - for { - select { - case <-ctx.Done(): - return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) - case res := <-c: - if res.err != nil { - return payload.Payload{}, res.err - } - - return res.p, nil - } - } +func (sp *supervised) execWithTTL(_ context.Context, _ payload.Payload) (payload.Payload, error) { + panic("used to satisfy pool interface") } func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) { @@ -92,36 +53,15 @@ func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) { return sp.pool.Exec(rqs) } - c := make(chan ttlExec, 1) ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL) defer cancel() - go func() { - res, err := sp.pool.execWithTTL(ctx, rqs) - if err != nil { - c <- ttlExec{ - err: errors.E(op, err), - p: payload.Payload{}, - } - } - - c <- ttlExec{ - err: nil, - p: res, - } - }() - - for { - select { - case <-ctx.Done(): - return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) - case res := <-c: - if res.err != nil { - return payload.Payload{}, res.err - } - return res.p, nil - } + res, err := sp.pool.execWithTTL(ctx, rqs) + if err != nil { + return payload.Payload{}, errors.E(op, err) } + + return res, nil } func (sp *supervised) GetConfig() interface{} { @@ -164,7 +104,7 @@ func (sp *supervised) Stop() { sp.stopCh <- struct{}{} } -func (sp *supervised) control() { +func (sp *supervised) control() { //nolint:gocognit now := time.Now() // MIGHT BE OUTDATED @@ -172,7 +112,16 @@ func (sp *supervised) control() { workers := sp.pool.Workers() for i := 0; i < len(workers); i++ { - if workers[i].State().Value() == worker.StateInvalid { + // if worker not in the Ready OR working state + // skip such worker + switch workers[i].State().Value() { + case + worker.StateInvalid, + worker.StateErrored, + worker.StateDestroyed, + worker.StateInactive, + worker.StateStopped, + worker.StateStopping: continue } @@ -183,12 +132,23 @@ func (sp *supervised) control() { } if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() { - workers[i].State().Set(worker.StateInvalid) + // SOFT termination. DO NOT STOP active workers + if workers[i].State().Value() != worker.StateWorking { + workers[i].State().Set(worker.StateInvalid) + _ = workers[i].Stop() + } sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) continue } if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { + // SOFT termination. DO NOT STOP active workers + if workers[i].State().Value() != worker.StateWorking { + workers[i].State().Set(worker.StateInvalid) + _ = workers[i].Stop() + } + + // mark it as invalid, worker likely in the StateWorking, so, it will be killed after work will be done workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) continue @@ -230,6 +190,11 @@ func (sp *supervised) control() { // After the control check, res will be 5, idle is 1 // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 { + if workers[i].State().Value() != worker.StateWorking { + workers[i].State().Set(worker.StateInvalid) + _ = workers[i].Stop() + } + workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) } diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index 348622c7..06cbe904 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -108,7 +108,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.execWithTTL(context.Background(), payload.Payload{ + resp, err := p.Exec(payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -148,7 +148,7 @@ func TestSupervisedPool_Idle(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.execWithTTL(context.Background(), payload.Payload{ + resp, err := p.Exec(payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -160,7 +160,7 @@ func TestSupervisedPool_Idle(t *testing.T) { time.Sleep(time.Second * 5) // worker should be marked as invalid and reallocated - _, err = p.execWithTTL(context.Background(), payload.Payload{ + _, err = p.Exec(payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -170,6 +170,48 @@ func TestSupervisedPool_Idle(t *testing.T) { p.Destroy(context.Background()) } +func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { + var cfgExecTTL = Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 1 * time.Second, + IdleTTL: 1 * time.Second, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../../tests/exec_ttl.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + time.Sleep(time.Millisecond * 100) + resp, err := p.Exec(payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + time.Sleep(time.Second * 2) + // should be destroyed, state should be Ready, not Invalid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) + assert.Equal(t, int64(1), p.Workers()[0].State().Value()) +} + func TestSupervisedPool_ExecTTL_OK(t *testing.T) { var cfgExecTTL = &Config{ NumWorkers: uint64(1), |