summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xinterfaces/factory/factory.go22
-rw-r--r--interfaces/pool/pool.go3
-rw-r--r--interfaces/worker/factory.go4
-rw-r--r--interfaces/worker/worker.go4
-rwxr-xr-xpkg/pipe/pipe_factory.go4
-rwxr-xr-xpkg/pipe/pipe_factory_test.go72
-rwxr-xr-xpkg/pool/static_pool.go49
-rwxr-xr-xpkg/socket/socket_factory.go4
-rwxr-xr-xpkg/socket/socket_factory_test.go58
-rwxr-xr-xpkg/worker/sync_worker.go9
-rwxr-xr-xpkg/worker/worker.go33
-rw-r--r--plugins/server/plugin.go4
-rw-r--r--plugins/server/tests/plugin_pipes.go2
-rw-r--r--plugins/server/tests/plugin_sockets.go2
-rw-r--r--plugins/server/tests/plugin_tcp.go2
15 files changed, 116 insertions, 156 deletions
diff --git a/interfaces/factory/factory.go b/interfaces/factory/factory.go
deleted file mode 100755
index 51b73501..00000000
--- a/interfaces/factory/factory.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package worker
-
-import (
- "context"
- "os/exec"
-
- "github.com/spiral/roadrunner/v2/interfaces/worker"
-)
-
-// Factory is responsible of wrapping given command into tasks WorkerProcess.
-type Factory interface {
- // SpawnWorkerWithContext creates new WorkerProcess process based on given command with contex.
- // Process must not be started.
- SpawnWorkerWithContext(context.Context, *exec.Cmd) (worker.BaseProcess, error)
-
- // SpawnWorker creates new WorkerProcess process based on given command.
- // Process must not be started.
- SpawnWorker(*exec.Cmd) (worker.BaseProcess, error)
-
- // Close the factory and underlying connections.
- Close(ctx context.Context) error
-}
diff --git a/interfaces/pool/pool.go b/interfaces/pool/pool.go
index 72da9597..22552388 100644
--- a/interfaces/pool/pool.go
+++ b/interfaces/pool/pool.go
@@ -18,9 +18,10 @@ type Pool interface {
// GetConfig returns pool configuration.
GetConfig() interface{}
- // Exec
+ // Exec executes task with payload
Exec(rqs payload.Payload) (payload.Payload, error)
+ // ExecWithContext executes task with context which is used with timeout
ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error)
// Workers returns worker list associated with the pool.
diff --git a/interfaces/worker/factory.go b/interfaces/worker/factory.go
index 19e2bf5d..8db8ddcc 100644
--- a/interfaces/worker/factory.go
+++ b/interfaces/worker/factory.go
@@ -9,10 +9,10 @@ import (
type Factory interface {
// SpawnWorkerWithContext creates new WorkerProcess process based on given command with context.
// Process must not be started.
- SpawnWorkerWithContext(context.Context, *exec.Cmd) (BaseProcess, error)
+ SpawnWorkerWithTimeout(context.Context, *exec.Cmd) (BaseProcess, error)
// SpawnWorker creates new WorkerProcess process based on given command.
// Process must not be started.
SpawnWorker(*exec.Cmd) (BaseProcess, error)
// Close the factory and underlying connections.
- Close(ctx context.Context) error
+ Close() error
}
diff --git a/interfaces/worker/worker.go b/interfaces/worker/worker.go
index f830fdf2..7f2f8a53 100644
--- a/interfaces/worker/worker.go
+++ b/interfaces/worker/worker.go
@@ -40,7 +40,7 @@ type BaseProcess interface {
Wait() error
// Stop sends soft termination command to the WorkerProcess and waits for process completion.
- Stop(ctx context.Context) error
+ Stop() error
// Kill kills underlying process, make sure to call Wait() func to gather
// error log from the stderr. Does not waits for process completion!
@@ -59,5 +59,5 @@ type SyncWorker interface {
// Exec used to execute payload on the SyncWorker, there is no TIMEOUTS
Exec(rqs payload.Payload) (payload.Payload, error)
// ExecWithContext used to handle Exec with TTL
- ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error)
+ ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error)
}
diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go
index 34735fe6..a0e0c258 100755
--- a/pkg/pipe/pipe_factory.go
+++ b/pkg/pipe/pipe_factory.go
@@ -31,7 +31,7 @@ type SpawnResult struct {
// SpawnWorker creates new Process and connects it to goridge relay,
// method Wait() must be handled on level above.
-func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) {
c := make(chan SpawnResult)
const op = errors.Op("spawn worker with context")
go func() {
@@ -159,6 +159,6 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
}
// Close the factory.
-func (f *Factory) Close(ctx context.Context) error {
+func (f *Factory) Close() error {
return nil
}
diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go
index 40797747..0d548b7a 100755
--- a/pkg/pipe/pipe_factory_test.go
+++ b/pkg/pipe/pipe_factory_test.go
@@ -20,7 +20,7 @@ func Test_GetState(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
assert.Equal(t, internal.StateStopped, w.State().Value())
@@ -30,7 +30,7 @@ func Test_GetState(t *testing.T) {
assert.NotNil(t, w)
assert.Equal(t, internal.StateReady, w.State().Value())
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -40,7 +40,7 @@ func Test_Kill(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
@@ -65,7 +65,7 @@ func Test_Pipe_Start(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
@@ -73,7 +73,7 @@ func Test_Pipe_Start(t *testing.T) {
assert.NoError(t, w.Wait())
}()
- assert.NoError(t, w.Stop(ctx))
+ assert.NoError(t, w.Stop())
}
func Test_Pipe_StartError(t *testing.T) {
@@ -84,7 +84,7 @@ func Test_Pipe_StartError(t *testing.T) {
}
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -97,7 +97,7 @@ func Test_Pipe_PipeError(t *testing.T) {
}
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -110,7 +110,7 @@ func Test_Pipe_PipeError2(t *testing.T) {
}
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -118,7 +118,7 @@ func Test_Pipe_PipeError2(t *testing.T) {
func Test_Pipe_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
@@ -128,7 +128,7 @@ func Test_Pipe_Failboot(t *testing.T) {
func Test_Pipe_Invalid(t *testing.T) {
cmd := exec.Command("php", "../../tests/invalid.php")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -136,12 +136,12 @@ func Test_Pipe_Invalid(t *testing.T) {
func Test_Pipe_Echo(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -165,13 +165,13 @@ func Test_Pipe_Echo(t *testing.T) {
func Test_Pipe_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
defer func() {
time.Sleep(time.Second)
- err = w.Stop(ctx)
+ err = w.Stop()
assert.Error(t, err)
}()
@@ -191,14 +191,14 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
f := NewPipeFactory()
for n := 0; n < b.N; n++ {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := f.SpawnWorkerWithContext(context.Background(), cmd)
+ w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd)
go func() {
if w.Wait() != nil {
b.Fail()
}
}()
- err := w.Stop(context.Background())
+ err := w.Stop()
if err != nil {
b.Errorf("error stopping the worker: error %v", err)
}
@@ -208,7 +208,7 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithContext(context.Background(), cmd)
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd)
sw, err := workerImpl.From(w)
if err != nil {
b.Fatal(err)
@@ -222,7 +222,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
}
}()
defer func() {
- err := w.Stop(context.Background())
+ err := w.Stop()
if err != nil {
b.Errorf("error stopping the worker: error %v", err)
}
@@ -238,13 +238,13 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
b.Errorf("error stopping the Process: error %v", err)
}
@@ -265,13 +265,13 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
b.Errorf("error stopping the Process: error %v", err)
}
@@ -293,7 +293,7 @@ func Test_Echo(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -306,7 +306,7 @@ func Test_Echo(t *testing.T) {
assert.NoError(t, syncWorker.Wait())
}()
defer func() {
- err := syncWorker.Stop(ctx)
+ err := syncWorker.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -326,7 +326,7 @@ func Test_BadPayload(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
syncWorker, err := workerImpl.From(w)
if err != nil {
@@ -337,7 +337,7 @@ func Test_BadPayload(t *testing.T) {
assert.NoError(t, syncWorker.Wait())
}()
defer func() {
- err := syncWorker.Stop(ctx)
+ err := syncWorker.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -356,12 +356,12 @@ func Test_String(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
defer func() {
- err := w.Stop(ctx)
+ err := w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -376,12 +376,12 @@ func Test_Echo_Slow(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10")
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
defer func() {
- err := w.Stop(ctx)
+ err := w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -406,7 +406,7 @@ func Test_Broken(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -437,20 +437,20 @@ func Test_Broken(t *testing.T) {
t.Fail()
}
mu.Unlock()
- assert.Error(t, w.Stop(ctx))
+ assert.Error(t, w.Stop())
}
func Test_Error(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "error", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
defer func() {
- err := w.Stop(ctx)
+ err := w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -476,12 +476,12 @@ func Test_NumExecs(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
defer func() {
- err := w.Stop(ctx)
+ err := w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index e5a5a7e8..2a06b255 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -3,6 +3,7 @@ package pool
import (
"context"
"os/exec"
+ "time"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/interfaces/events"
@@ -18,8 +19,6 @@ import (
// StopRequest can be sent by worker to indicate that restart is required.
const StopRequest = "{\"stop\":true}"
-var bCtx = context.Background()
-
// ErrorEncoder encode error or make a decision based on the error type
type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
@@ -77,10 +76,10 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory,
before: make([]Before, 0, 0),
}
- p.allocator = newPoolAllocator(factory, cmd)
+ p.allocator = newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
- workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers)
+ workers, err := p.allocateWorkers(p.cfg.NumWorkers)
if err != nil {
return nil, errors.E(op, err)
}
@@ -169,7 +168,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
// TODO careful with string(rsp.Context)
if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
sw.State().Set(internal.StateInvalid)
- err = sw.Stop(bCtx)
+ err = sw.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
}
@@ -204,8 +203,6 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload)
return payload.Payload{}, errors.E(op, err)
}
- sw := w.(worker.SyncWorker)
-
// apply all before function
if len(sp.before) > 0 {
for i := 0; i < len(sp.before); i++ {
@@ -213,29 +210,29 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload)
}
}
- rsp, err := sw.ExecWithContext(ctx, rqs)
+ rsp, err := w.ExecWithTimeout(ctx, rqs)
if err != nil {
- return sp.errEncoder(err, sw)
+ return sp.errEncoder(err, w)
}
// worker want's to be terminated
if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- sw.State().Set(internal.StateInvalid)
- err = sw.Stop(bCtx)
+ w.State().Set(internal.StateInvalid)
+ err = w.Stop()
if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
return sp.Exec(rqs)
}
- if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
} else {
- sp.ww.PushWorker(sw)
+ sp.ww.PushWorker(w)
}
// apply all after functions
@@ -248,7 +245,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload)
return rsp, nil
}
-func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
+func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) {
// GetFreeWorker function consumes context with timeout
w, err := sp.ww.GetFreeWorker(ctxGetFree)
if err != nil {
@@ -260,7 +257,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke
// else if err not nil - return error
return nil, errors.E(op, err)
}
- return w, nil
+ return w.(worker.SyncWorker), nil
}
// Destroy all underlying stack (but let them to complete the task).
@@ -280,7 +277,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
w.State().Set(internal.StateInvalid)
- err = w.Stop(bCtx)
+ err = w.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
@@ -293,7 +290,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
w.State().Set(internal.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
- errS := w.Stop(bCtx)
+ errS := w.Stop()
if errS != nil {
return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS))
@@ -303,9 +300,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
}
-func newPoolAllocator(factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator {
+func newPoolAllocator(ctx context.Context, timeout time.Duration, factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator {
return func() (worker.BaseProcess, error) {
- w, err := factory.SpawnWorkerWithContext(bCtx, cmd())
+ ctx, cancel := context.WithTimeout(ctx, timeout)
+ defer cancel()
+ w, err := factory.SpawnWorkerWithTimeout(ctx, cmd())
if err != nil {
return nil, err
}
@@ -326,7 +325,7 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
r, err := sw.(worker.SyncWorker).Exec(p)
- if stopErr := sw.Stop(context.Background()); stopErr != nil {
+ if stopErr := sw.Stop(); stopErr != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
}
@@ -334,26 +333,22 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
}
// allocate required number of stack
-func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]worker.BaseProcess, error) {
+func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.BaseProcess, error) {
const op = errors.Op("allocate workers")
var workers []worker.BaseProcess
// constant number of stack simplify logic
for i := int64(0); i < numWorkers; i++ {
- ctx, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
- w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd())
+ w, err := sp.allocator()
if err != nil {
- cancel()
return nil, errors.E(op, errors.WorkerAllocate, err)
}
sw, err := syncWorker.From(w)
if err != nil {
- cancel()
return nil, errors.E(op, err)
}
workers = append(workers, sw)
- cancel()
}
return workers, nil
}
diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go
index b08d24e4..49456bd9 100755
--- a/pkg/socket/socket_factory.go
+++ b/pkg/socket/socket_factory.go
@@ -85,7 +85,7 @@ type socketSpawn struct {
}
// SpawnWorker creates Process and connects it to appropriate relay or returns error
-func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) {
const op = errors.Op("spawn_worker_with_context")
c := make(chan socketSpawn)
go func() {
@@ -174,7 +174,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
}
// Close socket factory and underlying socket connection.
-func (f *Factory) Close(ctx context.Context) error {
+func (f *Factory) Close() error {
return f.ls.Close()
}
diff --git a/pkg/socket/socket_factory_test.go b/pkg/socket/socket_factory_test.go
index 6a88713a..983f3e8e 100755
--- a/pkg/socket/socket_factory_test.go
+++ b/pkg/socket/socket_factory_test.go
@@ -31,7 +31,7 @@ func Test_Tcp_Start(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
@@ -39,7 +39,7 @@ func Test_Tcp_Start(t *testing.T) {
assert.NoError(t, w.Wait())
}()
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -64,11 +64,11 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
}
}()
- w, err := f.SpawnWorkerWithContext(ctx, cmd)
+ w, err := f.SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -95,7 +95,7 @@ func Test_Tcp_StartError(t *testing.T) {
t.Errorf("error executing the command: error %v", err)
}
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -118,7 +118,7 @@ func Test_Tcp_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithContext(ctx, cmd)
+ w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err2)
assert.Contains(t, err2.Error(), "failboot")
@@ -141,7 +141,7 @@ func Test_Tcp_Timeout(t *testing.T) {
cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "tcp", "200", "0")
- w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
assert.Contains(t, err.Error(), "context deadline exceeded")
@@ -164,7 +164,7 @@ func Test_Tcp_Invalid(t *testing.T) {
cmd := exec.Command("php", "../../tests/invalid.php")
- w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -186,7 +186,7 @@ func Test_Tcp_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -201,7 +201,7 @@ func Test_Tcp_Broken(t *testing.T) {
defer func() {
time.Sleep(time.Second)
- err2 := w.Stop(ctx)
+ err2 := w.Stop()
// write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection
assert.Error(t, err2)
}()
@@ -235,12 +235,12 @@ func Test_Tcp_Echo(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -277,7 +277,7 @@ func Test_Unix_Start(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
@@ -285,7 +285,7 @@ func Test_Unix_Start(t *testing.T) {
assert.NoError(t, w.Wait())
}()
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -307,7 +307,7 @@ func Test_Unix_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
assert.Contains(t, err.Error(), "failboot")
@@ -329,7 +329,7 @@ func Test_Unix_Timeout(t *testing.T) {
cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0")
- w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
assert.Contains(t, err.Error(), "context deadline exceeded")
@@ -351,7 +351,7 @@ func Test_Unix_Invalid(t *testing.T) {
cmd := exec.Command("php", "../../tests/invalid.php")
- w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -372,7 +372,7 @@ func Test_Unix_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -387,7 +387,7 @@ func Test_Unix_Broken(t *testing.T) {
defer func() {
time.Sleep(time.Second)
- err = w.Stop(ctx)
+ err = w.Stop()
assert.Error(t, err)
}()
@@ -420,7 +420,7 @@ func Test_Unix_Echo(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -428,7 +428,7 @@ func Test_Unix_Echo(t *testing.T) {
assert.NoError(t, w.Wait())
}()
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -467,7 +467,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
for n := 0; n < b.N; n++ {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- w, err := f.SpawnWorkerWithContext(ctx, cmd)
+ w, err := f.SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
@@ -475,7 +475,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
assert.NoError(b, w.Wait())
}()
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
b.Errorf("error stopping the Process: error %v", err)
}
@@ -498,12 +498,12 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
b.Errorf("error stopping the Process: error %v", err)
}
@@ -539,11 +539,11 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
for n := 0; n < b.N; n++ {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
- w, err := f.SpawnWorkerWithContext(ctx, cmd)
+ w, err := f.SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
b.Errorf("error stopping the Process: error %v", err)
}
@@ -566,12 +566,12 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
b.Errorf("error stopping the Process: error %v", err)
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index eacb8a8a..11992f22 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -63,9 +63,10 @@ type wexec struct {
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("ExecWithContext")
+func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+ const op = errors.Op("ExecWithTimeout")
c := make(chan wexec, 1)
+
go func() {
if len(p.Body) == 0 && len(p.Context) == 0 {
c <- wexec{
@@ -211,8 +212,8 @@ func (tw *syncWorker) Wait() error {
return tw.w.Wait()
}
-func (tw *syncWorker) Stop(ctx context.Context) error {
- return tw.w.Stop(ctx)
+func (tw *syncWorker) Stop() error {
+ return tw.w.Stop()
}
func (tw *syncWorker) Kill() error {
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index e60ab3f4..456f4bea 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -2,7 +2,6 @@ package worker
import (
"bytes"
- "context"
"fmt"
"io"
"os"
@@ -220,30 +219,16 @@ func (w *Process) closeRelay() error {
}
// Stop sends soft termination command to the Process and waits for process completion.
-func (w *Process) Stop(ctx context.Context) error {
- c := make(chan error)
-
- go func() {
- var err error
- w.state.Set(internal.StateStopping)
- err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true}))
- if err != nil {
- w.state.Set(internal.StateKilling)
- c <- multierr.Append(err, w.cmd.Process.Kill())
- }
- w.state.Set(internal.StateStopped)
- c <- nil
- }()
-
- select {
- case <-ctx.Done():
- return ctx.Err()
- case err := <-c:
- if err != nil {
- return err
- }
- return nil
+func (w *Process) Stop() error {
+ var err error
+ w.state.Set(internal.StateStopping)
+ err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true}))
+ if err != nil {
+ w.state.Set(internal.StateKilling)
+ return multierr.Append(err, w.cmd.Process.Kill())
}
+ w.state.Set(internal.StateStopped)
+ return nil
}
// Kill kills underlying process, make sure to call Wait() func to gather
diff --git a/plugins/server/plugin.go b/plugins/server/plugin.go
index 8555fd7e..580c1e10 100644
--- a/plugins/server/plugin.go
+++ b/plugins/server/plugin.go
@@ -62,7 +62,7 @@ func (server *Plugin) Stop() error {
return nil
}
- return server.factory.Close(context.Background())
+ return server.factory.Close()
}
// CmdFactory provides worker command factory assocated with given context.
@@ -105,7 +105,7 @@ func (server *Plugin) NewWorker(ctx context.Context, env server.Env) (worker.Bas
return nil, errors.E(op, err)
}
- w, err := server.factory.SpawnWorkerWithContext(ctx, spawnCmd())
+ w, err := server.factory.SpawnWorkerWithTimeout(ctx, spawnCmd())
if err != nil {
return nil, errors.E(op, err)
}
diff --git a/plugins/server/tests/plugin_pipes.go b/plugins/server/tests/plugin_pipes.go
index 9d7812a8..f49cf6dc 100644
--- a/plugins/server/tests/plugin_pipes.go
+++ b/plugins/server/tests/plugin_pipes.go
@@ -99,7 +99,7 @@ func (f *Foo) Serve() chan error {
}
// should not be errors
- err = sw.Stop(context.Background())
+ err = sw.Stop()
if err != nil {
errCh <- err
return errCh
diff --git a/plugins/server/tests/plugin_sockets.go b/plugins/server/tests/plugin_sockets.go
index e5b139d4..ee971e45 100644
--- a/plugins/server/tests/plugin_sockets.go
+++ b/plugins/server/tests/plugin_sockets.go
@@ -79,7 +79,7 @@ func (f *Foo2) Serve() chan error {
}
// should not be errors
- err = sw.Stop(context.Background())
+ err = sw.Stop()
if err != nil {
errCh <- err
return errCh
diff --git a/plugins/server/tests/plugin_tcp.go b/plugins/server/tests/plugin_tcp.go
index 866116a7..cdf23e21 100644
--- a/plugins/server/tests/plugin_tcp.go
+++ b/plugins/server/tests/plugin_tcp.go
@@ -79,7 +79,7 @@ func (f *Foo3) Serve() chan error {
}
// should not be errors
- err = sw.Stop(context.Background())
+ err = sw.Stop()
if err != nil {
errCh <- err
return errCh