diff options
Diffstat (limited to 'pool/supervisor_test.go')
-rw-r--r-- | pool/supervisor_test.go | 413 |
1 files changed, 413 insertions, 0 deletions
diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go new file mode 100644 index 00000000..032e220b --- /dev/null +++ b/pool/supervisor_test.go @@ -0,0 +1,413 @@ +package pool + +import ( + "context" + "os" + "os/exec" + "testing" + "time" + + "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/payload" + "github.com/spiral/roadrunner/v2/transport/pipe" + "github.com/spiral/roadrunner/v2/worker" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var cfgSupervised = &Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 100 * time.Second, + MaxWorkerMemory: 100, + }, +} + +func TestSupervisedPool_Exec(t *testing.T) { + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") }, + pipe.NewPipeFactory(), + cfgSupervised, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + time.Sleep(time.Second) + + pidBefore := p.Workers()[0].Pid() + + for i := 0; i < 100; i++ { + time.Sleep(time.Millisecond * 100) + _, err = p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + assert.NoError(t, err) + } + + assert.NotEqual(t, pidBefore, p.Workers()[0].Pid()) + + p.Destroy(context.Background()) +} + +// This test should finish without freezes +func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { + var cfgSupervised = cfgSupervised + cfgSupervised.Debug = true + + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../tests/supervised.php") }, + pipe.NewPipeFactory(), + cfgSupervised, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + time.Sleep(time.Second) + + for i := 0; i < 100; i++ { + time.Sleep(time.Millisecond * 500) + _, err = p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + assert.NoError(t, err) + } + + p.Destroy(context.Background()) +} + +func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { + var cfgExecTTL = &Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 1 * time.Second, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + resp, err := p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.Error(t, err) + assert.Empty(t, resp) + + time.Sleep(time.Second * 1) + // should be new worker with new pid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) +} + +func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { + var cfgExecTTL = &Config{ + NumWorkers: uint64(1), + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 5 * time.Second, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../tests/sleep-ttl.php") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + pid := p.Workers()[0].Pid() + + resp, err := p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Equal(t, string(resp.Body), "hello world") + assert.Empty(t, resp.Context) + + time.Sleep(time.Second) + assert.NotEqual(t, pid, p.Workers()[0].Pid()) + require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady) + pid = p.Workers()[0].Pid() + + resp, err = p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Equal(t, string(resp.Body), "hello world") + assert.Empty(t, resp.Context) + + time.Sleep(time.Second) + // should be new worker with new pid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) + require.Equal(t, p.Workers()[0].State().Value(), worker.StateReady) + + p.Destroy(context.Background()) +} + +func TestSupervisedPool_Idle(t *testing.T) { + var cfgExecTTL = &Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 1 * time.Second, + ExecTTL: 100 * time.Second, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../tests/idle.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + pid := p.Workers()[0].Pid() + + resp, err := p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.Nil(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + time.Sleep(time.Second * 5) + + // worker should be marked as invalid and reallocated + _, err = p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + assert.NoError(t, err) + // should be new worker with new pid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) + p.Destroy(context.Background()) +} + +func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { + var cfgExecTTL = &Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 1 * time.Second, + IdleTTL: 1 * time.Second, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + time.Sleep(time.Millisecond * 100) + resp, err := p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + time.Sleep(time.Second * 2) + // should be destroyed, state should be Ready, not Invalid + assert.NotEqual(t, pid, p.Workers()[0].Pid()) + assert.Equal(t, int64(1), p.Workers()[0].State().Value()) +} + +func TestSupervisedPool_ExecTTL_OK(t *testing.T) { + var cfgExecTTL = &Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 4 * time.Second, + MaxWorkerMemory: 100, + }, + } + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + defer p.Destroy(context.Background()) + + pid := p.Workers()[0].Pid() + + time.Sleep(time.Millisecond * 100) + resp, err := p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + time.Sleep(time.Second * 1) + // should be the same pid + assert.Equal(t, pid, p.Workers()[0].Pid()) +} + +func TestSupervisedPool_MaxMemoryReached(t *testing.T) { + var cfgExecTTL = &Config{ + NumWorkers: uint64(1), + AllocateTimeout: time.Second, + DestroyTimeout: time.Second, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 100 * time.Second, + IdleTTL: 100 * time.Second, + ExecTTL: 4 * time.Second, + MaxWorkerMemory: 1, + }, + } + + block := make(chan struct{}, 10) + listener := func(event interface{}) { + if ev, ok := event.(events.PoolEvent); ok { + if ev.Event == events.EventMaxMemory { + block <- struct{}{} + } + } + } + + // constructed + // max memory + // constructed + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") }, + pipe.NewPipeFactory(), + cfgExecTTL, + AddListeners(listener), + ) + + assert.NoError(t, err) + assert.NotNil(t, p) + + resp, err := p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + assert.NoError(t, err) + assert.Empty(t, resp.Body) + assert.Empty(t, resp.Context) + + <-block + p.Destroy(context.Background()) +} + +func TestSupervisedPool_AllocateFailedOK(t *testing.T) { + var cfgExecTTL = &Config{ + NumWorkers: uint64(2), + AllocateTimeout: time.Second * 15, + DestroyTimeout: time.Second * 5, + Supervisor: &SupervisorConfig{ + WatchTick: 1 * time.Second, + TTL: 5 * time.Second, + }, + } + + ctx := context.Background() + p, err := Initialize( + ctx, + func() *exec.Cmd { return exec.Command("php", "../tests/allocate-failed.php") }, + pipe.NewPipeFactory(), + cfgExecTTL, + ) + + assert.NoError(t, err) + require.NotNil(t, p) + + time.Sleep(time.Second) + + // should be ok + _, err = p.Exec(&payload.Payload{ + Context: []byte(""), + Body: []byte("foo"), + }) + + require.NoError(t, err) + + // after creating this file, PHP will fail + file, err := os.Create("break") + require.NoError(t, err) + + time.Sleep(time.Second * 5) + assert.NoError(t, file.Close()) + assert.NoError(t, os.Remove("break")) + + defer func() { + if r := recover(); r != nil { + assert.Fail(t, "panic should not be fired!") + } else { + p.Destroy(context.Background()) + } + }() +} |