summaryrefslogtreecommitdiff
path: root/pool
diff options
context:
space:
mode:
Diffstat (limited to 'pool')
-rwxr-xr-xpool/static_pool.go8
-rwxr-xr-xpool/static_pool_test.go60
-rw-r--r--pool/supervisor_test.go22
3 files changed, 46 insertions, 44 deletions
diff --git a/pool/static_pool.go b/pool/static_pool.go
index 019c34b2..dfd9ffd3 100755
--- a/pool/static_pool.go
+++ b/pool/static_pool.go
@@ -7,8 +7,8 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/events"
+ "github.com/spiral/roadrunner/v2/ipc"
"github.com/spiral/roadrunner/v2/payload"
- "github.com/spiral/roadrunner/v2/transport"
"github.com/spiral/roadrunner/v2/utils"
"github.com/spiral/roadrunner/v2/worker"
workerWatcher "github.com/spiral/roadrunner/v2/worker_watcher"
@@ -36,7 +36,7 @@ type StaticPool struct {
cmd Command
// creates and connects to stack
- factory transport.Factory
+ factory ipc.Factory
// manages worker states and TTLs
ww Watcher
@@ -49,7 +49,7 @@ type StaticPool struct {
}
// NewStaticPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
-func NewStaticPool(ctx context.Context, cmd Command, factory transport.Factory, cfg *Config, options ...Options) (Pool, error) {
+func NewStaticPool(ctx context.Context, cmd Command, factory ipc.Factory, cfg *Config, options ...Options) (Pool, error) {
if factory == nil {
return nil, errors.Str("no factory initialized")
}
@@ -303,7 +303,7 @@ func (sp *StaticPool) takeWorker(ctxGetFree context.Context, op errors.Op) (work
return w, nil
}
-func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory transport.Factory, cmd func() *exec.Cmd) worker.Allocator {
+func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duration, factory ipc.Factory, cmd func() *exec.Cmd) worker.Allocator {
return func() (worker.SyncWorker, error) {
ctxT, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
diff --git a/pool/static_pool_test.go b/pool/static_pool_test.go
index 4f98ca91..5db2bd86 100755
--- a/pool/static_pool_test.go
+++ b/pool/static_pool_test.go
@@ -2,7 +2,7 @@ package pool
import (
"context"
- "log"
+ l "log"
"os/exec"
"runtime"
"strconv"
@@ -11,8 +11,8 @@ import (
"time"
"github.com/spiral/errors"
+ "github.com/spiral/roadrunner/v2/ipc/pipe"
"github.com/spiral/roadrunner/v2/payload"
- "github.com/spiral/roadrunner/v2/transport/pipe"
"github.com/spiral/roadrunner/v2/utils"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
@@ -26,12 +26,14 @@ var cfg = &Config{
DestroyTimeout: time.Second * 500,
}
+var log = zap.NewNop()
+
func Test_NewPool(t *testing.T) {
ctx := context.Background()
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
)
assert.NoError(t, err)
@@ -46,7 +48,7 @@ func Test_NewPoolReset(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
)
assert.NoError(t, err)
@@ -73,7 +75,7 @@ func Test_StaticPool_Invalid(t *testing.T) {
p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/invalid.php") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
)
@@ -85,7 +87,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) {
p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -102,7 +104,7 @@ func Test_StaticPool_Echo(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
)
assert.NoError(t, err)
@@ -126,7 +128,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
)
assert.NoError(t, err)
@@ -150,7 +152,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "head", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
)
assert.NoError(t, err)
@@ -174,7 +176,7 @@ func Test_StaticPool_JobError(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "error", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
)
assert.NoError(t, err)
@@ -203,7 +205,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "broken", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
WithLogger(z),
)
@@ -230,7 +232,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg2,
)
assert.NoError(t, err)
@@ -268,7 +270,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) {
p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 1,
AllocateTimeout: time.Nanosecond * 1,
@@ -287,7 +289,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 1,
MaxJobs: 1,
@@ -326,7 +328,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "pid", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
Debug: true,
AllocateTimeout: time.Second,
@@ -368,7 +370,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "stop", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
@@ -409,7 +411,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
@@ -431,7 +433,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "delay", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
@@ -461,7 +463,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 5,
AllocateTimeout: time.Second * 100,
@@ -486,7 +488,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../tests/slow-destroy.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
@@ -507,7 +509,7 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
ctx,
// sleep for the 3 seconds
func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
Debug: false,
NumWorkers: 1,
@@ -538,7 +540,7 @@ func Test_Static_Pool_WrongCommand1(t *testing.T) {
p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("phg", "../tests/slow-destroy.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
@@ -555,7 +557,7 @@ func Test_Static_Pool_WrongCommand2(t *testing.T) {
p, err := NewStaticPool(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
@@ -572,7 +574,7 @@ func Test_CRC_WithPayload(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/crc_error.php") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
)
assert.Error(t, err)
@@ -604,7 +606,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfg,
)
if err != nil {
@@ -636,7 +638,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: uint64(runtime.NumCPU()),
AllocateTimeout: time.Second * 100,
@@ -664,7 +666,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
defer wg.Done()
if _, err := p.Exec(pld); err != nil {
b.Fail()
- log.Println(err)
+ l.Println(err)
}
}()
}
@@ -678,7 +680,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
&Config{
NumWorkers: 1,
MaxJobs: 1,
@@ -694,7 +696,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
for n := 0; n < b.N; n++ {
if _, err := p.Exec(&payload.Payload{Body: []byte("hello")}); err != nil {
b.Fail()
- log.Println(err)
+ l.Println(err)
}
}
}
diff --git a/pool/supervisor_test.go b/pool/supervisor_test.go
index 6ff62316..a479671f 100644
--- a/pool/supervisor_test.go
+++ b/pool/supervisor_test.go
@@ -7,8 +7,8 @@ import (
"testing"
"time"
+ "github.com/spiral/roadrunner/v2/ipc/pipe"
"github.com/spiral/roadrunner/v2/payload"
- "github.com/spiral/roadrunner/v2/transport/pipe"
"github.com/spiral/roadrunner/v2/worker"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -32,7 +32,7 @@ func TestSupervisedPool_Exec(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgSupervised,
)
@@ -62,7 +62,7 @@ func Test_SupervisedPoolReset(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgSupervised,
)
assert.NoError(t, err)
@@ -93,7 +93,7 @@ func TestSupervisedPool_ExecWithDebugMode(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/supervised.php") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgSupervised,
)
@@ -131,7 +131,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/sleep.php", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgExecTTL,
)
@@ -166,7 +166,7 @@ func TestSupervisedPool_ExecTTL_WorkerRestarted(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/sleep-ttl.php") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgExecTTL,
)
@@ -223,7 +223,7 @@ func TestSupervisedPool_Idle(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/idle.php", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgExecTTL,
)
@@ -273,7 +273,7 @@ func TestSupervisedPool_IdleTTL_StateAfterTimeout(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgExecTTL,
)
@@ -321,7 +321,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/exec_ttl.php", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgExecTTL,
)
@@ -367,7 +367,7 @@ func TestSupervisedPool_MaxMemoryReached(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/memleak.php", "pipes") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgExecTTL,
)
@@ -402,7 +402,7 @@ func TestSupervisedPool_AllocateFailedOK(t *testing.T) {
p, err := NewStaticPool(
ctx,
func() *exec.Cmd { return exec.Command("php", "../tests/allocate-failed.php") },
- pipe.NewPipeFactory(),
+ pipe.NewPipeFactory(log),
cfgExecTTL,
)