diff options
author | Valery Piashchynski <[email protected]> | 2020-12-21 10:27:10 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-12-21 10:27:10 +0300 |
commit | 2f71f79ac704ed95dad961677b6e602e38641b5d (patch) | |
tree | 7452bbedd1444079757a848ad07089bc6093561f | |
parent | 3d4c75aadd9ffd0d46728f48f685de2e1bfc44bb (diff) |
Remove unused contex from interfaces. Update pool allocator.
-rwxr-xr-x | interfaces/factory/factory.go | 22 | ||||
-rw-r--r-- | interfaces/pool/pool.go | 3 | ||||
-rw-r--r-- | interfaces/worker/factory.go | 4 | ||||
-rw-r--r-- | interfaces/worker/worker.go | 4 | ||||
-rwxr-xr-x | pkg/pipe/pipe_factory.go | 4 | ||||
-rwxr-xr-x | pkg/pipe/pipe_factory_test.go | 72 | ||||
-rwxr-xr-x | pkg/pool/static_pool.go | 49 | ||||
-rwxr-xr-x | pkg/socket/socket_factory.go | 4 | ||||
-rwxr-xr-x | pkg/socket/socket_factory_test.go | 58 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 9 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 33 | ||||
-rw-r--r-- | plugins/server/plugin.go | 4 | ||||
-rw-r--r-- | plugins/server/tests/plugin_pipes.go | 2 | ||||
-rw-r--r-- | plugins/server/tests/plugin_sockets.go | 2 | ||||
-rw-r--r-- | plugins/server/tests/plugin_tcp.go | 2 |
15 files changed, 116 insertions, 156 deletions
diff --git a/interfaces/factory/factory.go b/interfaces/factory/factory.go deleted file mode 100755 index 51b73501..00000000 --- a/interfaces/factory/factory.go +++ /dev/null @@ -1,22 +0,0 @@ -package worker - -import ( - "context" - "os/exec" - - "github.com/spiral/roadrunner/v2/interfaces/worker" -) - -// Factory is responsible of wrapping given command into tasks WorkerProcess. -type Factory interface { - // SpawnWorkerWithContext creates new WorkerProcess process based on given command with contex. - // Process must not be started. - SpawnWorkerWithContext(context.Context, *exec.Cmd) (worker.BaseProcess, error) - - // SpawnWorker creates new WorkerProcess process based on given command. - // Process must not be started. - SpawnWorker(*exec.Cmd) (worker.BaseProcess, error) - - // Close the factory and underlying connections. - Close(ctx context.Context) error -} diff --git a/interfaces/pool/pool.go b/interfaces/pool/pool.go index 72da9597..22552388 100644 --- a/interfaces/pool/pool.go +++ b/interfaces/pool/pool.go @@ -18,9 +18,10 @@ type Pool interface { // GetConfig returns pool configuration. GetConfig() interface{} - // Exec + // Exec executes task with payload Exec(rqs payload.Payload) (payload.Payload, error) + // ExecWithContext executes task with context which is used with timeout ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error) // Workers returns worker list associated with the pool. diff --git a/interfaces/worker/factory.go b/interfaces/worker/factory.go index 19e2bf5d..8db8ddcc 100644 --- a/interfaces/worker/factory.go +++ b/interfaces/worker/factory.go @@ -9,10 +9,10 @@ import ( type Factory interface { // SpawnWorkerWithContext creates new WorkerProcess process based on given command with context. // Process must not be started. - SpawnWorkerWithContext(context.Context, *exec.Cmd) (BaseProcess, error) + SpawnWorkerWithTimeout(context.Context, *exec.Cmd) (BaseProcess, error) // SpawnWorker creates new WorkerProcess process based on given command. // Process must not be started. SpawnWorker(*exec.Cmd) (BaseProcess, error) // Close the factory and underlying connections. - Close(ctx context.Context) error + Close() error } diff --git a/interfaces/worker/worker.go b/interfaces/worker/worker.go index f830fdf2..7f2f8a53 100644 --- a/interfaces/worker/worker.go +++ b/interfaces/worker/worker.go @@ -40,7 +40,7 @@ type BaseProcess interface { Wait() error // Stop sends soft termination command to the WorkerProcess and waits for process completion. - Stop(ctx context.Context) error + Stop() error // Kill kills underlying process, make sure to call Wait() func to gather // error log from the stderr. Does not waits for process completion! @@ -59,5 +59,5 @@ type SyncWorker interface { // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS Exec(rqs payload.Payload) (payload.Payload, error) // ExecWithContext used to handle Exec with TTL - ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) + ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) } diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go index 34735fe6..a0e0c258 100755 --- a/pkg/pipe/pipe_factory.go +++ b/pkg/pipe/pipe_factory.go @@ -31,7 +31,7 @@ type SpawnResult struct { // SpawnWorker creates new Process and connects it to goridge relay, // method Wait() must be handled on level above. -func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) { c := make(chan SpawnResult) const op = errors.Op("spawn worker with context") go func() { @@ -159,6 +159,6 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { } // Close the factory. -func (f *Factory) Close(ctx context.Context) error { +func (f *Factory) Close() error { return nil } diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go index 40797747..0d548b7a 100755 --- a/pkg/pipe/pipe_factory_test.go +++ b/pkg/pipe/pipe_factory_test.go @@ -20,7 +20,7 @@ func Test_GetState(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) assert.Equal(t, internal.StateStopped, w.State().Value()) @@ -30,7 +30,7 @@ func Test_GetState(t *testing.T) { assert.NotNil(t, w) assert.Equal(t, internal.StateReady, w.State().Value()) - err = w.Stop(ctx) + err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -40,7 +40,7 @@ func Test_Kill(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) wg := &sync.WaitGroup{} wg.Add(1) go func() { @@ -65,7 +65,7 @@ func Test_Pipe_Start(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.NoError(t, err) assert.NotNil(t, w) @@ -73,7 +73,7 @@ func Test_Pipe_Start(t *testing.T) { assert.NoError(t, w.Wait()) }() - assert.NoError(t, w.Stop(ctx)) + assert.NoError(t, w.Stop()) } func Test_Pipe_StartError(t *testing.T) { @@ -84,7 +84,7 @@ func Test_Pipe_StartError(t *testing.T) { } ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -97,7 +97,7 @@ func Test_Pipe_PipeError(t *testing.T) { } ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -110,7 +110,7 @@ func Test_Pipe_PipeError2(t *testing.T) { } ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -118,7 +118,7 @@ func Test_Pipe_PipeError2(t *testing.T) { func Test_Pipe_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) @@ -128,7 +128,7 @@ func Test_Pipe_Failboot(t *testing.T) { func Test_Pipe_Invalid(t *testing.T) { cmd := exec.Command("php", "../../tests/invalid.php") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -136,12 +136,12 @@ func Test_Pipe_Invalid(t *testing.T) { func Test_Pipe_Echo(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } defer func() { - err = w.Stop(ctx) + err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -165,13 +165,13 @@ func Test_Pipe_Echo(t *testing.T) { func Test_Pipe_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } defer func() { time.Sleep(time.Second) - err = w.Stop(ctx) + err = w.Stop() assert.Error(t, err) }() @@ -191,14 +191,14 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { f := NewPipeFactory() for n := 0; n < b.N; n++ { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := f.SpawnWorkerWithContext(context.Background(), cmd) + w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd) go func() { if w.Wait() != nil { b.Fail() } }() - err := w.Stop(context.Background()) + err := w.Stop() if err != nil { b.Errorf("error stopping the worker: error %v", err) } @@ -208,7 +208,7 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) { func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory().SpawnWorkerWithContext(context.Background(), cmd) + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd) sw, err := workerImpl.From(w) if err != nil { b.Fatal(err) @@ -222,7 +222,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { } }() defer func() { - err := w.Stop(context.Background()) + err := w.Stop() if err != nil { b.Errorf("error stopping the worker: error %v", err) } @@ -238,13 +238,13 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) { func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { b.Fatal(err) } defer func() { - err = w.Stop(ctx) + err = w.Stop() if err != nil { b.Errorf("error stopping the Process: error %v", err) } @@ -265,13 +265,13 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) { func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { b.Fatal(err) } defer func() { - err = w.Stop(ctx) + err = w.Stop() if err != nil { b.Errorf("error stopping the Process: error %v", err) } @@ -293,7 +293,7 @@ func Test_Echo(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -306,7 +306,7 @@ func Test_Echo(t *testing.T) { assert.NoError(t, syncWorker.Wait()) }() defer func() { - err := syncWorker.Stop(ctx) + err := syncWorker.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -326,7 +326,7 @@ func Test_BadPayload(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) syncWorker, err := workerImpl.From(w) if err != nil { @@ -337,7 +337,7 @@ func Test_BadPayload(t *testing.T) { assert.NoError(t, syncWorker.Wait()) }() defer func() { - err := syncWorker.Stop(ctx) + err := syncWorker.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -356,12 +356,12 @@ func Test_String(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) }() defer func() { - err := w.Stop(ctx) + err := w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -376,12 +376,12 @@ func Test_Echo_Slow(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10") - w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) }() defer func() { - err := w.Stop(ctx) + err := w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -406,7 +406,7 @@ func Test_Broken(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes") - w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -437,20 +437,20 @@ func Test_Broken(t *testing.T) { t.Fail() } mu.Unlock() - assert.Error(t, w.Stop(ctx)) + assert.Error(t, w.Stop()) } func Test_Error(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "error", "pipes") - w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) }() defer func() { - err := w.Stop(ctx) + err := w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -476,12 +476,12 @@ func Test_NumExecs(t *testing.T) { ctx := context.Background() cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes") - w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd) + w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) }() defer func() { - err := w.Stop(ctx) + err := w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go index e5a5a7e8..2a06b255 100755 --- a/pkg/pool/static_pool.go +++ b/pkg/pool/static_pool.go @@ -3,6 +3,7 @@ package pool import ( "context" "os/exec" + "time" "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/interfaces/events" @@ -18,8 +19,6 @@ import ( // StopRequest can be sent by worker to indicate that restart is required. const StopRequest = "{\"stop\":true}" -var bCtx = context.Background() - // ErrorEncoder encode error or make a decision based on the error type type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error) @@ -77,10 +76,10 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory, before: make([]Before, 0, 0), } - p.allocator = newPoolAllocator(factory, cmd) + p.allocator = newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd) p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events) - workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers) + workers, err := p.allocateWorkers(p.cfg.NumWorkers) if err != nil { return nil, errors.E(op, err) } @@ -169,7 +168,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) { // TODO careful with string(rsp.Context) if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest { sw.State().Set(internal.StateInvalid) - err = sw.Stop(bCtx) + err = sw.Stop() if err != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) } @@ -204,8 +203,6 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) return payload.Payload{}, errors.E(op, err) } - sw := w.(worker.SyncWorker) - // apply all before function if len(sp.before) > 0 { for i := 0; i < len(sp.before); i++ { @@ -213,29 +210,29 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) } } - rsp, err := sw.ExecWithContext(ctx, rqs) + rsp, err := w.ExecWithTimeout(ctx, rqs) if err != nil { - return sp.errEncoder(err, sw) + return sp.errEncoder(err, w) } // worker want's to be terminated if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest { - sw.State().Set(internal.StateInvalid) - err = sw.Stop(bCtx) + w.State().Set(internal.StateInvalid) + err = w.Stop() if err != nil { - sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)}) + sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) } return sp.Exec(rqs) } - if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs { + if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs { err = sp.ww.AllocateNew() if err != nil { return payload.Payload{}, errors.E(op, err) } } else { - sp.ww.PushWorker(sw) + sp.ww.PushWorker(w) } // apply all after functions @@ -248,7 +245,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload) return rsp, nil } -func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) { +func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) { // GetFreeWorker function consumes context with timeout w, err := sp.ww.GetFreeWorker(ctxGetFree) if err != nil { @@ -260,7 +257,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke // else if err not nil - return error return nil, errors.E(op, err) } - return w, nil + return w.(worker.SyncWorker), nil } // Destroy all underlying stack (but let them to complete the task). @@ -280,7 +277,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { } w.State().Set(internal.StateInvalid) - err = w.Stop(bCtx) + err = w.Stop() if err != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)}) } @@ -293,7 +290,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { w.State().Set(internal.StateInvalid) sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w}) - errS := w.Stop(bCtx) + errS := w.Stop() if errS != nil { return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS)) @@ -303,9 +300,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder { } } -func newPoolAllocator(factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator { +func newPoolAllocator(ctx context.Context, timeout time.Duration, factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator { return func() (worker.BaseProcess, error) { - w, err := factory.SpawnWorkerWithContext(bCtx, cmd()) + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + w, err := factory.SpawnWorkerWithTimeout(ctx, cmd()) if err != nil { return nil, err } @@ -326,7 +325,7 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { r, err := sw.(worker.SyncWorker).Exec(p) - if stopErr := sw.Stop(context.Background()); stopErr != nil { + if stopErr := sw.Stop(); stopErr != nil { sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err}) } @@ -334,26 +333,22 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) { } // allocate required number of stack -func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]worker.BaseProcess, error) { +func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.BaseProcess, error) { const op = errors.Op("allocate workers") var workers []worker.BaseProcess // constant number of stack simplify logic for i := int64(0); i < numWorkers; i++ { - ctx, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout) - w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd()) + w, err := sp.allocator() if err != nil { - cancel() return nil, errors.E(op, errors.WorkerAllocate, err) } sw, err := syncWorker.From(w) if err != nil { - cancel() return nil, errors.E(op, err) } workers = append(workers, sw) - cancel() } return workers, nil } diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go index b08d24e4..49456bd9 100755 --- a/pkg/socket/socket_factory.go +++ b/pkg/socket/socket_factory.go @@ -85,7 +85,7 @@ type socketSpawn struct { } // SpawnWorker creates Process and connects it to appropriate relay or returns error -func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) { +func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) { const op = errors.Op("spawn_worker_with_context") c := make(chan socketSpawn) go func() { @@ -174,7 +174,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) { } // Close socket factory and underlying socket connection. -func (f *Factory) Close(ctx context.Context) error { +func (f *Factory) Close() error { return f.ls.Close() } diff --git a/pkg/socket/socket_factory_test.go b/pkg/socket/socket_factory_test.go index 6a88713a..983f3e8e 100755 --- a/pkg/socket/socket_factory_test.go +++ b/pkg/socket/socket_factory_test.go @@ -31,7 +31,7 @@ func Test_Tcp_Start(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) assert.NoError(t, err) assert.NotNil(t, w) @@ -39,7 +39,7 @@ func Test_Tcp_Start(t *testing.T) { assert.NoError(t, w.Wait()) }() - err = w.Stop(ctx) + err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -64,11 +64,11 @@ func Test_Tcp_StartCloseFactory(t *testing.T) { } }() - w, err := f.SpawnWorkerWithContext(ctx, cmd) + w, err := f.SpawnWorkerWithTimeout(ctx, cmd) assert.NoError(t, err) assert.NotNil(t, w) - err = w.Stop(ctx) + err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -95,7 +95,7 @@ func Test_Tcp_StartError(t *testing.T) { t.Errorf("error executing the command: error %v", err) } - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -118,7 +118,7 @@ func Test_Tcp_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithContext(ctx, cmd) + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err2) assert.Contains(t, err2.Error(), "failboot") @@ -141,7 +141,7 @@ func Test_Tcp_Timeout(t *testing.T) { cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "tcp", "200", "0") - w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) assert.Contains(t, err.Error(), "context deadline exceeded") @@ -164,7 +164,7 @@ func Test_Tcp_Invalid(t *testing.T) { cmd := exec.Command("php", "../../tests/invalid.php") - w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -186,7 +186,7 @@ func Test_Tcp_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -201,7 +201,7 @@ func Test_Tcp_Broken(t *testing.T) { defer func() { time.Sleep(time.Second) - err2 := w.Stop(ctx) + err2 := w.Stop() // write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection assert.Error(t, err2) }() @@ -235,12 +235,12 @@ func Test_Tcp_Echo(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) go func() { assert.NoError(t, w.Wait()) }() defer func() { - err = w.Stop(ctx) + err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -277,7 +277,7 @@ func Test_Unix_Start(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) assert.NoError(t, err) assert.NotNil(t, w) @@ -285,7 +285,7 @@ func Test_Unix_Start(t *testing.T) { assert.NoError(t, w.Wait()) }() - err = w.Stop(ctx) + err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -307,7 +307,7 @@ func Test_Unix_Failboot(t *testing.T) { cmd := exec.Command("php", "../../tests/failboot.php") - w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) assert.Contains(t, err.Error(), "failboot") @@ -329,7 +329,7 @@ func Test_Unix_Timeout(t *testing.T) { cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0") - w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithTimeout(ctx, cmd) assert.Nil(t, w) assert.Error(t, err) assert.Contains(t, err.Error(), "context deadline exceeded") @@ -351,7 +351,7 @@ func Test_Unix_Invalid(t *testing.T) { cmd := exec.Command("php", "../../tests/invalid.php") - w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd) assert.Error(t, err) assert.Nil(t, w) } @@ -372,7 +372,7 @@ func Test_Unix_Broken(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "broken", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -387,7 +387,7 @@ func Test_Unix_Broken(t *testing.T) { defer func() { time.Sleep(time.Second) - err = w.Stop(ctx) + err = w.Stop() assert.Error(t, err) }() @@ -420,7 +420,7 @@ func Test_Unix_Echo(t *testing.T) { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { t.Fatal(err) } @@ -428,7 +428,7 @@ func Test_Unix_Echo(t *testing.T) { assert.NoError(t, w.Wait()) }() defer func() { - err = w.Stop(ctx) + err = w.Stop() if err != nil { t.Errorf("error stopping the Process: error %v", err) } @@ -467,7 +467,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { for n := 0; n < b.N; n++ { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - w, err := f.SpawnWorkerWithContext(ctx, cmd) + w, err := f.SpawnWorkerWithTimeout(ctx, cmd) if err != nil { b.Fatal(err) } @@ -475,7 +475,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) { assert.NoError(b, w.Wait()) }() - err = w.Stop(ctx) + err = w.Stop() if err != nil { b.Errorf("error stopping the Process: error %v", err) } @@ -498,12 +498,12 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { b.Fatal(err) } defer func() { - err = w.Stop(ctx) + err = w.Stop() if err != nil { b.Errorf("error stopping the Process: error %v", err) } @@ -539,11 +539,11 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) { for n := 0; n < b.N; n++ { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - w, err := f.SpawnWorkerWithContext(ctx, cmd) + w, err := f.SpawnWorkerWithTimeout(ctx, cmd) if err != nil { b.Fatal(err) } - err = w.Stop(ctx) + err = w.Stop() if err != nil { b.Errorf("error stopping the Process: error %v", err) } @@ -566,12 +566,12 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) { cmd := exec.Command("php", "../../tests/client.php", "echo", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd) + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) if err != nil { b.Fatal(err) } defer func() { - err = w.Stop(ctx) + err = w.Stop() if err != nil { b.Errorf("error stopping the Process: error %v", err) } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index eacb8a8a..11992f22 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -63,9 +63,10 @@ type wexec struct { } // Exec payload without TTL timeout. -func (tw *syncWorker) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) { - const op = errors.Op("ExecWithContext") +func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) { + const op = errors.Op("ExecWithTimeout") c := make(chan wexec, 1) + go func() { if len(p.Body) == 0 && len(p.Context) == 0 { c <- wexec{ @@ -211,8 +212,8 @@ func (tw *syncWorker) Wait() error { return tw.w.Wait() } -func (tw *syncWorker) Stop(ctx context.Context) error { - return tw.w.Stop(ctx) +func (tw *syncWorker) Stop() error { + return tw.w.Stop() } func (tw *syncWorker) Kill() error { diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index e60ab3f4..456f4bea 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -2,7 +2,6 @@ package worker import ( "bytes" - "context" "fmt" "io" "os" @@ -220,30 +219,16 @@ func (w *Process) closeRelay() error { } // Stop sends soft termination command to the Process and waits for process completion. -func (w *Process) Stop(ctx context.Context) error { - c := make(chan error) - - go func() { - var err error - w.state.Set(internal.StateStopping) - err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true})) - if err != nil { - w.state.Set(internal.StateKilling) - c <- multierr.Append(err, w.cmd.Process.Kill()) - } - w.state.Set(internal.StateStopped) - c <- nil - }() - - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-c: - if err != nil { - return err - } - return nil +func (w *Process) Stop() error { + var err error + w.state.Set(internal.StateStopping) + err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true})) + if err != nil { + w.state.Set(internal.StateKilling) + return multierr.Append(err, w.cmd.Process.Kill()) } + w.state.Set(internal.StateStopped) + return nil } // Kill kills underlying process, make sure to call Wait() func to gather diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go index 8555fd7e..580c1e10 100644 --- a/plugins/server/plugin.go +++ b/plugins/server/plugin.go @@ -62,7 +62,7 @@ func (server *Plugin) Stop() error { return nil } - return server.factory.Close(context.Background()) + return server.factory.Close() } // CmdFactory provides worker command factory assocated with given context. @@ -105,7 +105,7 @@ func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (worker.Bas return nil, errors.E(op, err) } - w, err := server.factory.SpawnWorkerWithContext(ctx, spawnCmd()) + w, err := server.factory.SpawnWorkerWithTimeout(ctx, spawnCmd()) if err != nil { return nil, errors.E(op, err) } diff --git a/plugins/server/tests/plugin_pipes.go b/plugins/server/tests/plugin_pipes.go index 9d7812a8..f49cf6dc 100644 --- a/plugins/server/tests/plugin_pipes.go +++ b/plugins/server/tests/plugin_pipes.go @@ -99,7 +99,7 @@ func (f *Foo) Serve() chan error { } // should not be errors - err = sw.Stop(context.Background()) + err = sw.Stop() if err != nil { errCh <- err return errCh diff --git a/plugins/server/tests/plugin_sockets.go b/plugins/server/tests/plugin_sockets.go index e5b139d4..ee971e45 100644 --- a/plugins/server/tests/plugin_sockets.go +++ b/plugins/server/tests/plugin_sockets.go @@ -79,7 +79,7 @@ func (f *Foo2) Serve() chan error { } // should not be errors - err = sw.Stop(context.Background()) + err = sw.Stop() if err != nil { errCh <- err return errCh diff --git a/plugins/server/tests/plugin_tcp.go b/plugins/server/tests/plugin_tcp.go index 866116a7..cdf23e21 100644 --- a/plugins/server/tests/plugin_tcp.go +++ b/plugins/server/tests/plugin_tcp.go @@ -79,7 +79,7 @@ func (f *Foo3) Serve() chan error { } // should not be errors - err = sw.Stop(context.Background()) + err = sw.Stop() if err != nil { errCh <- err return errCh |