summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-22 23:27:51 +0300
committerValery Piashchynski <[email protected]>2020-12-22 23:27:51 +0300
commit1f6749ed4cf3cfd2beade4949945a382abd66b15 (patch)
tree7dff57a0223376746fd6cd49dc439f8766fe9f1b
parent95a72fe8003c66ac2d9a52b964aba4ee1d88363f (diff)
Redisighn factory interface (add event listeners as variadic)v2.0.0-alpha28
-rwxr-xr-xgo.sum1
-rw-r--r--interfaces/worker/factory.go6
-rwxr-xr-xpkg/pipe/pipe_factory.go20
-rwxr-xr-xpkg/pipe/pipe_factory_test.go2
-rwxr-xr-xpkg/pool/static_pool.go6
-rwxr-xr-xpkg/pool/static_pool_test.go40
-rw-r--r--pkg/pool/supervisor_test.go6
-rwxr-xr-xpkg/socket/socket_factory.go23
8 files changed, 50 insertions, 54 deletions
diff --git a/go.sum b/go.sum
index 2f3e3c9d..08f84faa 100755
--- a/go.sum
+++ b/go.sum
@@ -435,6 +435,7 @@ github.com/spiral/roadrunner-plugins/resetter v1.0.0 h1:Nx4mIzeoH/IcUfY4LM9xhY0y
github.com/spiral/roadrunner-plugins/resetter v1.0.0/go.mod h1:DLFifJk1n3PWViXkT5+qAmzeRcPTowDRSbRqotf+WlE=
github.com/spiral/roadrunner-plugins/rpc v1.0.0 h1:cC17yCNqQUNtedKefdeT2P6z7q52L0QakxS4qwB7n+g=
github.com/spiral/roadrunner-plugins/rpc v1.0.0/go.mod h1:p+ClRf1ibW+xvekf+nGQtvipyrtPJP6WZ/J/DSp+Qck=
+github.com/spiral/roadrunner-plugins/server v1.0.1/go.mod h1:qedfnQFlK1+Jwv5M8mRXJCOWTvF/qfFyULK/0UMBeOk=
github.com/spiral/roadrunner/v2 v2.0.0-alpha26/go.mod h1:r7ojuHm9qCVbg4fKcqr4Aqk7VXqZ9YPefr1LOv7HNys=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
diff --git a/interfaces/worker/factory.go b/interfaces/worker/factory.go
index 8db8ddcc..8412428d 100644
--- a/interfaces/worker/factory.go
+++ b/interfaces/worker/factory.go
@@ -3,16 +3,18 @@ package worker
import (
"context"
"os/exec"
+
+ "github.com/spiral/roadrunner/v2/interfaces/events"
)
// Factory is responsible of wrapping given command into tasks WorkerProcess.
type Factory interface {
// SpawnWorkerWithContext creates new WorkerProcess process based on given command with context.
// Process must not be started.
- SpawnWorkerWithTimeout(context.Context, *exec.Cmd) (BaseProcess, error)
+ SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.EventListener) (BaseProcess, error)
// SpawnWorker creates new WorkerProcess process based on given command.
// Process must not be started.
- SpawnWorker(*exec.Cmd) (BaseProcess, error)
+ SpawnWorker(*exec.Cmd, ...events.EventListener) (BaseProcess, error)
// Close the factory and underlying connections.
Close() error
}
diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go
index f6211ab9..ecb3fa71 100755
--- a/pkg/pipe/pipe_factory.go
+++ b/pkg/pipe/pipe_factory.go
@@ -15,18 +15,12 @@ import (
// Factory connects to stack using standard
// streams (STDIN, STDOUT pipes).
-type Factory struct {
- listener []events.EventListener
-}
+type Factory struct{}
// NewPipeFactory returns new factory instance and starts
// listening
-
-// todo: review tests
-func NewPipeFactory(listeners ...events.EventListener) worker.Factory {
- return &Factory{
- listener: listeners,
- }
+func NewPipeFactory() worker.Factory {
+ return &Factory{}
}
type SpawnResult struct {
@@ -36,11 +30,11 @@ type SpawnResult struct {
// SpawnWorker creates new Process and connects it to goridge relay,
// method Wait() must be handled on level above.
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) {
c := make(chan SpawnResult)
const op = errors.Op("spawn worker with context")
go func() {
- w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(f.listener...))
+ w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...))
if err != nil {
c <- SpawnResult{
w: nil,
@@ -119,9 +113,9 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (wo
}
}
-func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) {
const op = errors.Op("spawn worker")
- w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(f.listener...))
+ w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...))
if err != nil {
return nil, errors.E(op, err)
}
diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go
index 3cef0646..dca09375 100755
--- a/pkg/pipe/pipe_factory_test.go
+++ b/pkg/pipe/pipe_factory_test.go
@@ -415,7 +415,7 @@ func Test_Broken(t *testing.T) {
}
}
- w, err := NewPipeFactory(listener).SpawnWorkerWithTimeout(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener)
if err != nil {
t.Fatal(err)
}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index b181a805..23bb2d5f 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -39,6 +39,9 @@ type StaticPool struct {
// distributes the events
events events.Handler
+ // saved list of event listeners
+ listeners []events.EventListener
+
// manages worker states and TTLs
ww worker.Watcher
@@ -103,6 +106,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co
func AddListeners(listeners ...events.EventListener) Options {
return func(p *StaticPool) {
+ p.listeners = listeners
for i := 0; i < len(listeners); i++ {
p.addListener(listeners[i])
}
@@ -265,7 +269,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio
return func() (worker.BaseProcess, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
- w, err := factory.SpawnWorkerWithTimeout(ctx, cmd())
+ w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...)
if err != nil {
return nil, err
}
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index dcc930f6..acdd6ab7 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -30,7 +30,7 @@ func Test_NewPool(t *testing.T) {
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
cfg,
)
assert.NoError(t, err)
@@ -44,7 +44,7 @@ func Test_StaticPool_Invalid(t *testing.T) {
p, err := Initialize(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/invalid.php") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
cfg,
)
@@ -56,7 +56,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) {
p, err := Initialize(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
Config{
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
@@ -72,7 +72,7 @@ func Test_StaticPool_Echo(t *testing.T) {
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
cfg,
)
assert.NoError(t, err)
@@ -96,7 +96,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) {
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
cfg,
)
assert.NoError(t, err)
@@ -120,7 +120,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) {
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "head", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
cfg,
)
assert.NoError(t, err)
@@ -144,7 +144,7 @@ func Test_StaticPool_JobError(t *testing.T) {
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "error", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
cfg,
)
assert.NoError(t, err)
@@ -184,7 +184,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") },
- pipe.NewPipeFactory(listener),
+ pipe.NewPipeFactory(),
cfg,
AddListeners(listener),
)
@@ -223,7 +223,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) {
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
cfg,
AddListeners(listener),
)
@@ -263,7 +263,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) {
p, err := Initialize(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
Config{
NumWorkers: 1,
AllocateTimeout: time.Nanosecond * 1,
@@ -282,7 +282,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) {
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
Config{
NumWorkers: 1,
MaxJobs: 1,
@@ -319,7 +319,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) {
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
Config{
Debug: true,
AllocateTimeout: time.Second,
@@ -359,7 +359,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) {
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
@@ -399,7 +399,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) {
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
@@ -421,7 +421,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) {
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
@@ -451,7 +451,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) {
p, err := Initialize(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
@@ -476,7 +476,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) {
p, err := Initialize(
context.Background(),
func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
Config{
NumWorkers: 5,
AllocateTimeout: time.Second,
@@ -495,7 +495,7 @@ func Benchmark_Pool_Echo(b *testing.B) {
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
cfg,
)
if err != nil {
@@ -517,7 +517,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) {
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
Config{
NumWorkers: int64(runtime.NumCPU()),
AllocateTimeout: time.Second * 100,
@@ -548,7 +548,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) {
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
Config{
NumWorkers: 1,
MaxJobs: 1,
diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go
index bdb64a3b..cb67ebe1 100644
--- a/pkg/pool/supervisor_test.go
+++ b/pkg/pool/supervisor_test.go
@@ -30,7 +30,7 @@ func TestSupervisedPool_Exec(t *testing.T) {
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
cfgSupervised,
)
@@ -88,7 +88,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) {
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
cfgExecTTL,
)
@@ -129,7 +129,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) {
p, err := Initialize(
ctx,
func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") },
- pipe.NewPipeFactory(nil),
+ pipe.NewPipeFactory(),
cfgExecTTL,
)
diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go
index a3a0bf18..38b3e7c9 100755
--- a/pkg/socket/socket_factory.go
+++ b/pkg/socket/socket_factory.go
@@ -25,14 +25,10 @@ type Factory struct {
// listens for incoming connections from underlying processes
ls net.Listener
- // events listener
- listeners []events.EventListener
-
// relay connection timeout
tout time.Duration
// sockets which are waiting for process association
- // relays map[int64]*goridge.SocketRelay
relays sync.Map
ErrCh chan error
@@ -42,13 +38,12 @@ type Factory struct {
// NewSocketServer returns Factory attached to a given socket listener.
// tout specifies for how long factory should serve for incoming relay connection
-func NewSocketServer(ls net.Listener, tout time.Duration, listeners ...events.EventListener) worker.Factory {
+func NewSocketServer(ls net.Listener, tout time.Duration) worker.Factory {
f := &Factory{
- ls: ls,
- tout: tout,
- relays: sync.Map{},
- listeners: listeners,
- ErrCh: make(chan error, 10),
+ ls: ls,
+ tout: tout,
+ relays: sync.Map{},
+ ErrCh: make(chan error, 10),
}
// Be careful
@@ -90,13 +85,13 @@ type socketSpawn struct {
}
// SpawnWorker creates Process and connects it to appropriate relay or returns error
-func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) {
const op = errors.Op("spawn_worker_with_context")
c := make(chan socketSpawn)
go func() {
ctx, cancel := context.WithTimeout(ctx, f.tout)
defer cancel()
- w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(f.listeners...))
+ w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...))
if err != nil {
c <- socketSpawn{
w: nil,
@@ -150,9 +145,9 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (wo
}
}
-func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) {
const op = errors.Op("spawn_worker")
- w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(f.listeners...))
+ w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...))
if err != nil {
return nil, err
}