diff options
author | Valery Piashchynski <[email protected]> | 2020-12-22 23:27:51 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-12-22 23:27:51 +0300 |
commit | 1f6749ed4cf3cfd2beade4949945a382abd66b15 (patch) | |
tree | 7dff57a0223376746fd6cd49dc439f8766fe9f1b | |
parent | 95a72fe8003c66ac2d9a52b964aba4ee1d88363f (diff) |
Redisighn factory interface (add event listeners as variadic)v2.0.0-alpha28
-rwxr-xr-x | go.sum | 1 | ||||
-rw-r--r-- | interfaces/worker/factory.go | 6 | ||||
-rwxr-xr-x | pkg/pipe/pipe_factory.go | 20 | ||||
-rwxr-xr-x | pkg/pipe/pipe_factory_test.go | 2 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 6 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 40 | ||||
-rw-r--r-- | pkg/pool/supervisor_test.go | 6 | ||||
-rwxr-xr-x | pkg/socket/socket_factory.go | 23 |
8 files changed, 50 insertions, 54 deletions
@@ -435,6 +435,7 @@ github.com/spiral/roadrunner-plugins/resetter v1.0.0 h1:Nx4mIzeoH/IcUfY4LM9xhY0y github.com/spiral/roadrunner-plugins/resetter v1.0.0/go.mod h1:DLFifJk1n3PWViXkT5+qAmzeRcPTowDRSbRqotf+WlE= github.com/spiral/roadrunner-plugins/rpc v1.0.0 h1:cC17yCNqQUNtedKefdeT2P6z7q52L0QakxS4qwB7n+g= github.com/spiral/roadrunner-plugins/rpc v1.0.0/go.mod h1:p+ClRf1ibW+xvekf+nGQtvipyrtPJP6WZ/J/DSp+Qck= +github.com/spiral/roadrunner-plugins/server v1.0.1/go.mod h1:qedfnQFlK1+Jwv5M8mRXJCOWTvF/qfFyULK/0UMBeOk= github.com/spiral/roadrunner/v2 v2.0.0-alpha26/go.mod h1:r7ojuHm9qCVbg4fKcqr4Aqk7VXqZ9YPefr1LOv7HNys= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= diff --git a/interfaces/worker/factory.go b/interfaces/worker/factory.go index 8db8ddcc..8412428d 100644 --- a/interfaces/worker/factory.go +++ b/interfaces/worker/factory.go @@ -3,16 +3,18 @@ package worker import ( "context" "os/exec" + + "github.com/spiral/roadrunner/v2/interfaces/events" ) // Factory is responsible of wrapping given command into tasks WorkerProcess. type Factory interface { // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context. // Process must not be started. - SpawnWorkerWithTimeout(context.Context, *exec.Cmd) (BaseProcess, error) + SpawnWorkerWithTimeout(context.Context, *exec.Cmd, ...events.EventListener) (BaseProcess, error) // SpawnWorker creates new WorkerProcess process based on given command. // Process must not be started. - SpawnWorker(*exec.Cmd) (BaseProcess, error) + SpawnWorker(*exec.Cmd, ...events.EventListener) (BaseProcess, error) // Close the factory and underlying connections. Close() error } diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go index f6211ab9..ecb3fa71 100755 --- a/pkg/pipe/pipe_factory.go +++ b/pkg/pipe/pipe_factory.go @@ -15,18 +15,12 @@ import ( // Factory connects to stack using standard // streams (STDIN, STDOUT pipes). -type Factory struct { - listener []events.EventListener -} +type Factory struct{} // NewPipeFactory returns new factory instance and starts // listening - -// todo: review tests -func NewPipeFactory(listeners ...events.EventListener) worker.Factory { - return &Factory{ - listener: listeners, - } +func NewPipeFactory() worker.Factory { + return &Factory{} } type SpawnResult struct { @@ -36,11 +30,11 @@ type SpawnResult struct { // SpawnWorker creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) { c := make(chan SpawnResult) const op = errors.Op("spawn worker with context") go func() { - w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(f.listener...)) + w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) if err != nil { c <- SpawnResult{ w: nil, @@ -119,9 +113,9 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (wo } } -func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) { const op = errors.Op("spawn worker") - w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(f.listener...)) + w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) if err != nil { return nil, errors.E(op, err) } diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go index 3cef0646..dca09375 100755 --- a/pkg/pipe/pipe_factory_test.go +++ b/pkg/pipe/pipe_factory_test.go @@ -415,7 +415,7 @@ func Test_Broken(t *testing.T) { } } - w, err := NewPipeFactory(listener).SpawnWorkerWithTimeout(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) if err != nil { t.Fatal(err) } diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index b181a805..23bb2d5f 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -39,6 +39,9 @@ type StaticPool struct { // distributes the events events events.Handler + // saved list of event listeners + listeners []events.EventListener + // manages worker states and TTLs ww worker.Watcher @@ -103,6 +106,7 @@ func Initialize(ctx context.Context, cmd Command, factory worker.Factory, cfg Co func AddListeners(listeners ...events.EventListener) Options { return func(p *StaticPool) { + p.listeners = listeners for i := 0; i < len(listeners); i++ { p.addListener(listeners[i]) } @@ -265,7 +269,7 @@ func (sp *StaticPool) newPoolAllocator(ctx context.Context, timeout time.Duratio return func() (worker.BaseProcess, error) { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - w, err := factory.SpawnWorkerWithTimeout(ctx, cmd()) + w, err := factory.SpawnWorkerWithTimeout(ctx, cmd(), sp.listeners...) if err != nil { return nil, err } diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index dcc930f6..acdd6ab7 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -30,7 +30,7 @@ func Test_NewPool(t *testing.T) { p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), cfg, ) assert.NoError(t, err) @@ -44,7 +44,7 @@ func Test_StaticPool_Invalid(t *testing.T) { p, err := Initialize( context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/invalid.php") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), cfg, ) @@ -56,7 +56,7 @@ func Test_ConfigNoErrorInitDefaults(t *testing.T) { p, err := Initialize( context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), Config{ AllocateTimeout: time.Second, DestroyTimeout: time.Second, @@ -72,7 +72,7 @@ func Test_StaticPool_Echo(t *testing.T) { p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), cfg, ) assert.NoError(t, err) @@ -96,7 +96,7 @@ func Test_StaticPool_Echo_NilContext(t *testing.T) { p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), cfg, ) assert.NoError(t, err) @@ -120,7 +120,7 @@ func Test_StaticPool_Echo_Context(t *testing.T) { p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "head", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), cfg, ) assert.NoError(t, err) @@ -144,7 +144,7 @@ func Test_StaticPool_JobError(t *testing.T) { p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "error", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), cfg, ) assert.NoError(t, err) @@ -184,7 +184,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "broken", "pipes") }, - pipe.NewPipeFactory(listener), + pipe.NewPipeFactory(), cfg, AddListeners(listener), ) @@ -223,7 +223,7 @@ func Test_StaticPool_Broken_FromOutside(t *testing.T) { p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), cfg, AddListeners(listener), ) @@ -263,7 +263,7 @@ func Test_StaticPool_AllocateTimeout(t *testing.T) { p, err := Initialize( context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), Config{ NumWorkers: 1, AllocateTimeout: time.Nanosecond * 1, @@ -282,7 +282,7 @@ func Test_StaticPool_Replace_Worker(t *testing.T) { p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), Config{ NumWorkers: 1, MaxJobs: 1, @@ -319,7 +319,7 @@ func Test_StaticPool_Debug_Worker(t *testing.T) { p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "pid", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), Config{ Debug: true, AllocateTimeout: time.Second, @@ -359,7 +359,7 @@ func Test_StaticPool_Stop_Worker(t *testing.T) { p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), Config{ NumWorkers: 1, AllocateTimeout: time.Second, @@ -399,7 +399,7 @@ func Test_Static_Pool_Destroy_And_Close(t *testing.T) { p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), Config{ NumWorkers: 1, AllocateTimeout: time.Second, @@ -421,7 +421,7 @@ func Test_Static_Pool_Destroy_And_Close_While_Wait(t *testing.T) { p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "delay", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), Config{ NumWorkers: 1, AllocateTimeout: time.Second, @@ -451,7 +451,7 @@ func Test_Static_Pool_Handle_Dead(t *testing.T) { p, err := Initialize( context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), Config{ NumWorkers: 5, AllocateTimeout: time.Second, @@ -476,7 +476,7 @@ func Test_Static_Pool_Slow_Destroy(t *testing.T) { p, err := Initialize( context.Background(), func() *exec.Cmd { return exec.Command("php", "../../tests/slow-destroy.php", "echo", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), Config{ NumWorkers: 5, AllocateTimeout: time.Second, @@ -495,7 +495,7 @@ func Benchmark_Pool_Echo(b *testing.B) { p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), cfg, ) if err != nil { @@ -517,7 +517,7 @@ func Benchmark_Pool_Echo_Batched(b *testing.B) { p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), Config{ NumWorkers: int64(runtime.NumCPU()), AllocateTimeout: time.Second * 100, @@ -548,7 +548,7 @@ func Benchmark_Pool_Echo_Replaced(b *testing.B) { p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/client.php", "echo", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), Config{ NumWorkers: 1, MaxJobs: 1, diff --git a/pkg/pool/supervisor_test.go b/pkg/pool/supervisor_test.go index bdb64a3b..cb67ebe1 100644 --- a/pkg/pool/supervisor_test.go +++ b/pkg/pool/supervisor_test.go @@ -30,7 +30,7 @@ func TestSupervisedPool_Exec(t *testing.T) { p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/memleak.php", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), cfgSupervised, ) @@ -88,7 +88,7 @@ func TestSupervisedPool_ExecTTL_TimedOut(t *testing.T) { p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), cfgExecTTL, ) @@ -129,7 +129,7 @@ func TestSupervisedPool_ExecTTL_OK(t *testing.T) { p, err := Initialize( ctx, func() *exec.Cmd { return exec.Command("php", "../../tests/sleep.php", "pipes") }, - pipe.NewPipeFactory(nil), + pipe.NewPipeFactory(), cfgExecTTL, ) diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go index a3a0bf18..38b3e7c9 100755 --- a/pkg/socket/socket_factory.go +++ b/pkg/socket/socket_factory.go @@ -25,14 +25,10 @@ type Factory struct { // listens for incoming connections from underlying processes ls net.Listener - // events listener - listeners []events.EventListener - // relay connection timeout tout time.Duration // sockets which are waiting for process association - // relays map[int64]*goridge.SocketRelay relays sync.Map ErrCh chan error @@ -42,13 +38,12 @@ type Factory struct { // NewSocketServer returns Factory attached to a given socket listener. // tout specifies for how long factory should serve for incoming relay connection -func NewSocketServer(ls net.Listener, tout time.Duration, listeners ...events.EventListener) worker.Factory { +func NewSocketServer(ls net.Listener, tout time.Duration) worker.Factory { f := &Factory{ - ls: ls, - tout: tout, - relays: sync.Map{}, - listeners: listeners, - ErrCh: make(chan error, 10), + ls: ls, + tout: tout, + relays: sync.Map{}, + ErrCh: make(chan error, 10), } // Be careful @@ -90,13 +85,13 @@ type socketSpawn struct { } // SpawnWorker creates Process and connects it to appropriate relay or returns error -func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) { const op = errors.Op("spawn_worker_with_context") c := make(chan socketSpawn) go func() { ctx, cancel := context.WithTimeout(ctx, f.tout) defer cancel() - w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(f.listeners...)) + w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) if err != nil { c <- socketSpawn{ w: nil, @@ -150,9 +145,9 @@ func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (wo } } -func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorker(cmd *exec.Cmd, listeners ...events.EventListener) (worker.BaseProcess, error) { const op = errors.Op("spawn_worker") - w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(f.listeners...)) + w, err := workerImpl.InitBaseWorker(cmd, workerImpl.AddListeners(listeners...)) if err != nil { return nil, err } |