diff options
Diffstat (limited to 'pool')
-rwxr-xr-x | pool/static_pool.go | 8 | ||||
-rwxr-xr-x | pool/static_pool_test.go | 60 | ||||
-rw-r--r-- | pool/supervisor_test.go | 22 |
3 files changed, 46 insertions, 44 deletions
diff --git a/pool/static_pool.go b/pool/static_pool.go index 019c34b2..dfd9ffd3 100755 --- a/pool/static_pool.go +++ b/pool/static_pool.go @@ -7,8 +7,8 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/ipc" "github.com/spiral/roadrunner/v2/payload" - "github.com/spiral/roadrunner/v2/transport" "github.com/spiral/roadrunner/v2/utils" "github.com/spiral/roadrunner/v2/worker" workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher" @@ -36,7 +36,7 @@ type StaticPool struct { cmd Command // creates and connects to stack - factory transport.Factory + factory ipc.Factory // manages worker states and TTLs ww Watcher @@ -49,7 +49,7 @@ type StaticPool struct { } // NewStaticPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. -func NewStaticPool(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) { +func NewStaticPool(ctx context.Context, cmd Command, factory ipc.Factory, cfg *Config, options ...Options) (Pool, error) { if factory == nil { return nil, errors.Str("no factory initialized") } @@ -303,7 +303,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work return w, nil } -func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator { +func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory ipc.Factory, cmd func() *exec.Cmd) worker.Allocator { return func() (worker.SyncWorker, error) { ctxT, cancel := context.WithTimeout(ctx, timeout) defer cancel() diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go index 4f98ca91..5db2bd86 100755 --- a/pool/static_pool_test.go +++ b/pool/static_pool_test.go @@ -2,7 +2,7 @@ package pool import ( "context" - "log" + l "log" "os/exec" "runtime" "strconv" @@ -11,8 +11,8 @@ import ( "time" "github.com/spiral/errors" + "github.com/spiral/roadrunner/v2/ipc/pipe" "github.com/spiral/roadrunner/v2/payload" - "github.com/spiral/roadrunner/v2/transport/pipe" "github.com/spiral/roadrunner/v2/utils" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" @@ -26,12 +26,14 @@ var cfg = &Config{ DestroyTimeout: time.Second * 500, } +var log = zap.NewNop() + func Test_NewPool(t *testing.T) { ctx := context.Background() p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.NoError(t, err) @@ -46,7 +48,7 @@ func Test_NewPoolReset(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.NoError(t, err) @@ -73,7 +75,7 @@ func Test_StaticPool_Invalid(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/invalid.php") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) @@ -85,7 +87,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -102,7 +104,7 @@ func Test_StaticPool_Echo(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.NoError(t, err) @@ -126,7 +128,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.NoError(t, err) @@ -150,7 +152,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "head", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.NoError(t, err) @@ -174,7 +176,7 @@ func Test_StaticPool_JobError(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "error", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.NoError(t, err) @@ -203,7 +205,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, WithLogger(z), ) @@ -230,7 +232,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg2, ) assert.NoError(t, err) @@ -268,7 +270,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 1, AllocateTimeout: time.Nanosecond * 1, @@ -287,7 +289,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 1, MaxJobs: 1, @@ -326,7 +328,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ Debug: true, AllocateTimeout: time.Second, @@ -368,7 +370,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "stop", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 1, AllocateTimeout: time.Second, @@ -409,7 +411,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 1, AllocateTimeout: time.Second, @@ -431,7 +433,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 1, AllocateTimeout: time.Second, @@ -461,7 +463,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 5, AllocateTimeout: time.Second * 100, @@ -486,7 +488,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 5, AllocateTimeout: time.Second, @@ -507,7 +509,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) { ctx, // sleep for the 3 seconds func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ Debug: false, NumWorkers: 1, @@ -538,7 +540,7 @@ func Test_Static_Pool_WrongCommand1(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("phg", "../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 5, AllocateTimeout: time.Second, @@ -555,7 +557,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) { p, err := NewStaticPool( context.Background(), func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 5, AllocateTimeout: time.Second, @@ -572,7 +574,7 @@ func Test_CRC_WithPayload(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/crc_error.php") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) assert.Error(t, err) @@ -604,7 +606,7 @@ func Benchmark_Pool_Echo(b *testing.B) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfg, ) if err != nil { @@ -636,7 +638,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: uint64(runtime.NumCPU()), AllocateTimeout: time.Second * 100, @@ -664,7 +666,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { defer wg.Done() if _, err := p.Exec(pld); err != nil { b.Fail() - log.Println(err) + l.Println(err) } }() } @@ -678,7 +680,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), &Config{ NumWorkers: 1, MaxJobs: 1, @@ -694,7 +696,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { for n := 0; n < b.N; n++ { if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil { b.Fail() - log.Println(err) + l.Println(err) } } } diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go index 6ff62316..a479671f 100644 --- a/pool/supervisor_test.go +++ b/pool/supervisor_test.go @@ -7,8 +7,8 @@ import ( "testing" "time" + "github.com/spiral/roadrunner/v2/ipc/pipe" "github.com/spiral/roadrunner/v2/payload" - "github.com/spiral/roadrunner/v2/transport/pipe" "github.com/spiral/roadrunner/v2/worker" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -32,7 +32,7 @@ func TestSupervisedPool_Exec(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgSupervised, ) @@ -62,7 +62,7 @@ func Test_SupervisedPoolReset(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgSupervised, ) assert.NoError(t, err) @@ -93,7 +93,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/supervised.php") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgSupervised, ) @@ -131,7 +131,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) @@ -166,7 +166,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/sleep-ttl.php") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) @@ -223,7 +223,7 @@ func TestSupervisedPool_Idle(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/idle.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) @@ -273,7 +273,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) @@ -321,7 +321,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) @@ -367,7 +367,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) @@ -402,7 +402,7 @@ func TestSupervisedPool_AllocateFailedOK(t *testing.T) { p, err := NewStaticPool( ctx, func() *exec.Cmd { return exec.Command("php", "../tests/allocate-failed.php") }, - pipe.NewPipeFactory(), + pipe.NewPipeFactory(log), cfgExecTTL, ) |