diff options
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/pool/interface.go | 6 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 10 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 6 | ||||
-rwxr-xr-x | pkg/pool/supervisor_pool.go | 63 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 42 | ||||
-rw-r--r-- | pkg/worker/interface.go | 2 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 2 |
7 files changed, 68 insertions, 63 deletions
diff --git a/pkg/pool/interface.go b/pkg/pool/interface.go index bfc56c3f..4ef2f2e7 100644 --- a/pkg/pool/interface.go +++ b/pkg/pool/interface.go @@ -15,9 +15,6 @@ type Pool interface { // Exec executes task with payload Exec(rqs payload.Payload) (payload.Payload, error) - // ExecWithContext executes task with context which is used with timeout - ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) - // Workers returns worker list associated with the pool. Workers() (workers []worker.BaseProcess) @@ -26,4 +23,7 @@ type Pool interface { // Destroy all underlying stack (but let them to complete the task). Destroy(ctx context.Context) + + // ExecWithContext executes task with context which is used with timeout + execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error) } diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index f1b20bb9..0617cbc0 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -168,16 +168,16 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { } // Be careful, sync with pool.Exec method -func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (sp *StaticPool) execWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) { const op = errors.Op("static_pool_exec_with_context") - ctxGetFree, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) + ctxAlloc, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) defer cancel() - w, err := sp.getWorker(ctxGetFree, op) + w, err := sp.getWorker(ctxAlloc, op) if err != nil { return payload.Payload{}, errors.E(op, err) } - rsp, err := w.(worker.SyncWorker).ExecWithTimeout(ctx, p) + rsp, err := w.(worker.SyncWorker).ExecWithTTL(ctx, p) if err != nil { return sp.err_encoder(err, w) } @@ -185,7 +185,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (p // worker want's to be terminated if len(rsp.Body) == 0 && toString(rsp.Context) == StopRequest { sp.stopWorker(w) - return sp.ExecWithContext(ctx, p) + return sp.execWithTTL(ctx, p) } err = sp.checkMaxJobs(w) diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index c8b3616c..b1318f9d 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -191,7 +191,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { assert.NotNil(t, p) time.Sleep(time.Second) - res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")}) + res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Context) assert.Nil(t, res.Body) @@ -518,11 +518,11 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { assert.NotNil(t, p) go func() { - _, _ = p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")}) + _, _ = p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")}) }() time.Sleep(time.Second) - res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")}) + res, err := p.execWithTTL(ctx, payload.Payload{Body: []byte("hello")}) assert.Error(t, err) assert.Nil(t, res.Context) assert.Nil(t, res.Body) diff --git a/pkg/pool/supervisor_pool.go b/pkg/pool/supervisor_pool.go index bfb997d8..5abeae7a 100755 --- a/pkg/pool/supervisor_pool.go +++ b/pkg/pool/supervisor_pool.go @@ -48,7 +48,7 @@ type ttlExec struct { p payload.Payload } -func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { +func (sp *supervised) execWithTTL(ctx context.Context, rqs payload.Payload) (payload.Payload, error) { const op = errors.Op("supervised_exec_with_context") if sp.cfg.ExecTTL == 0 { return sp.pool.Exec(rqs) @@ -58,7 +58,7 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) ctx, cancel := context.WithTimeout(ctx, sp.cfg.ExecTTL) defer cancel() go func() { - res, err := sp.pool.ExecWithContext(ctx, rqs) + res, err := sp.pool.execWithTTL(ctx, rqs) if err != nil { c <- ttlExec{ err: errors.E(op, err), @@ -86,13 +86,42 @@ func (sp *supervised) ExecWithContext(ctx context.Context, rqs payload.Payload) } } -func (sp *supervised) Exec(p payload.Payload) (payload.Payload, error) { - const op = errors.Op("supervised_exec") - rsp, err := sp.pool.Exec(p) - if err != nil { - return payload.Payload{}, errors.E(op, err) +func (sp *supervised) Exec(rqs payload.Payload) (payload.Payload, error) { + const op = errors.Op("supervised_exec_with_context") + if sp.cfg.ExecTTL == 0 { + return sp.pool.Exec(rqs) + } + + c := make(chan ttlExec, 1) + ctx, cancel := context.WithTimeout(context.Background(), sp.cfg.ExecTTL) + defer cancel() + go func() { + res, err := sp.pool.execWithTTL(ctx, rqs) + if err != nil { + c <- ttlExec{ + err: errors.E(op, err), + p: payload.Payload{}, + } + } + + c <- ttlExec{ + err: nil, + p: res, + } + }() + + for { + select { + case <-ctx.Done(): + return payload.Payload{}, errors.E(op, errors.TimeOut, ctx.Err()) + case res := <-c: + if res.err != nil { + return payload.Payload{}, res.err + } + + return res.p, nil + } } - return rsp, nil } func (sp *supervised) GetConfig() interface{} { @@ -155,21 +184,13 @@ func (sp *supervised) control() { } if sp.cfg.TTL != 0 && now.Sub(workers[i].Created()).Seconds() >= sp.cfg.TTL.Seconds() { - err = sp.pool.RemoveWorker(workers[i]) - if err != nil { - sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) - return - } + workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventTTL, Payload: workers[i]}) continue } if sp.cfg.MaxWorkerMemory != 0 && s.MemoryUsage >= sp.cfg.MaxWorkerMemory*MB { - err = sp.pool.RemoveWorker(workers[i]) - if err != nil { - sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) - return - } + workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventMaxMemory, Payload: workers[i]}) continue } @@ -210,11 +231,7 @@ func (sp *supervised) control() { // After the control check, res will be 5, idle is 1 // 5 - 1 = 4, more than 0, YOU ARE FIRED (removed). Done. if int64(sp.cfg.IdleTTL.Seconds())-res <= 0 { - err = sp.pool.RemoveWorker(workers[i]) - if err != nil { - sp.events.Push(events.PoolEvent{Event: events.EventSupervisorError, Payload: errors.E(op, err)}) - return - } + workers[i].State().Set(worker.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventIdleTTL, Payload: workers[i]}) } } diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index 85af4672..d7e97fdd 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -9,7 +9,6 @@ import ( "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/transport/pipe" - "github.com/spiral/roadrunner/v2/tools" "github.com/stretchr/testify/assert" ) @@ -37,28 +36,8 @@ func TestSupervisedPool_Exec(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, p) - stopCh := make(chan struct{}) - defer p.Destroy(context.Background()) - go func() { - for { - select { - case <-stopCh: - return - default: - workers := p.Workers() - if len(workers) > 0 { - s, err := tools.WorkerProcessState(workers[0]) - assert.NoError(t, err) - assert.NotNil(t, s) - // since this is soft limit, double max memory limit watch - if (s.MemoryUsage / MB) > cfgSupervised.Supervisor.MaxWorkerMemory*2 { - assert.Fail(t, "max memory reached, worker still alive") - } - } - } - } - }() + pidBefore := p.Workers()[0].Pid() for i := 0; i < 100; i++ { time.Sleep(time.Millisecond * 100) @@ -69,7 +48,9 @@ func TestSupervisedPool_Exec(t *testing.T) { assert.NoError(t, err) } - stopCh <- struct{}{} + assert.NotEqual(t, pidBefore, p.Workers()[0].Pid()) + + p.Destroy(context.Background()) } func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { @@ -99,7 +80,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.ExecWithContext(context.Background(), payload.Payload{ + resp, err := p.execWithTTL(context.Background(), payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -129,7 +110,7 @@ func TestSupervisedPool_Idle(t *testing.T) { ctx := context.Background() p, err := Initialize( ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "../../tests/idle.php", "pipes") }, pipe.NewPipeFactory(), cfgExecTTL, ) @@ -139,7 +120,7 @@ func TestSupervisedPool_Idle(t *testing.T) { pid := p.Workers()[0].Pid() - resp, err := p.ExecWithContext(context.Background(), payload.Payload{ + resp, err := p.execWithTTL(context.Background(), payload.Payload{ Context: []byte(""), Body: []byte("foo"), }) @@ -149,6 +130,13 @@ func TestSupervisedPool_Idle(t *testing.T) { assert.Empty(t, resp.Context) time.Sleep(time.Second * 5) + + // worker should be marked as invalid and reallocated + _, err = p.execWithTTL(context.Background(), payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + assert.NoError(t, err) // should be new worker with new pid assert.NotEqual(t, pid, p.Workers()[0].Pid()) p.Destroy(context.Background()) @@ -170,7 +158,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { ctx := context.Background() p, err := Initialize( ctx, - func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, + func() *exec.Cmd { return exec.Command("php", "../../tests/exec_ttl.php", "pipes") }, pipe.NewPipeFactory(), cfgExecTTL, ) diff --git a/pkg/worker/interface.go b/pkg/worker/interface.go index 96eb25bc..2b68717a 100644 --- a/pkg/worker/interface.go +++ b/pkg/worker/interface.go @@ -70,5 +70,5 @@ type SyncWorker interface { // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS Exec(rqs payload.Payload) (payload.Payload, error) // ExecWithContext used to handle Exec with TTL - ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) + ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 82a5462a..ac987c14 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -63,7 +63,7 @@ type wexec struct { } // Exec payload without TTL timeout. -func (tw *SyncWorkerImpl) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) { +func (tw *SyncWorkerImpl) ExecWithTTL(ctx context.Context, p payload.Payload) (payload.Payload, error) { const op = errors.Op("sync_worker_exec_worker_with_timeout") c := make(chan wexec, 1) |