summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rwxr-xr-xpkg/pipe/pipe_factory.go4
-rwxr-xr-xpkg/pipe/pipe_factory_test.go72
-rwxr-xr-xpkg/pool/static_pool.go55
-rwxr-xr-xpkg/pool/static_pool_test.go1
-rwxr-xr-xpkg/socket/socket_factory.go4
-rwxr-xr-xpkg/socket/socket_factory_test.go58
-rwxr-xr-xpkg/worker/sync_worker.go9
-rwxr-xr-xpkg/worker/worker.go58
-rwxr-xr-xpkg/worker_watcher/worker_watcher.go11
9 files changed, 128 insertions, 144 deletions
diff --git a/pkg/pipe/pipe_factory.go b/pkg/pipe/pipe_factory.go
index 34735fe6..a0e0c258 100755
--- a/pkg/pipe/pipe_factory.go
+++ b/pkg/pipe/pipe_factory.go
@@ -31,7 +31,7 @@ type SpawnResult struct {
// SpawnWorker creates new Process and connects it to goridge relay,
// method Wait() must be handled on level above.
-func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) {
c := make(chan SpawnResult)
const op = errors.Op("spawn worker with context")
go func() {
@@ -159,6 +159,6 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
}
// Close the factory.
-func (f *Factory) Close(ctx context.Context) error {
+func (f *Factory) Close() error {
return nil
}
diff --git a/pkg/pipe/pipe_factory_test.go b/pkg/pipe/pipe_factory_test.go
index 40797747..0d548b7a 100755
--- a/pkg/pipe/pipe_factory_test.go
+++ b/pkg/pipe/pipe_factory_test.go
@@ -20,7 +20,7 @@ func Test_GetState(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
assert.Equal(t, internal.StateStopped, w.State().Value())
@@ -30,7 +30,7 @@ func Test_GetState(t *testing.T) {
assert.NotNil(t, w)
assert.Equal(t, internal.StateReady, w.State().Value())
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -40,7 +40,7 @@ func Test_Kill(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
@@ -65,7 +65,7 @@ func Test_Pipe_Start(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
@@ -73,7 +73,7 @@ func Test_Pipe_Start(t *testing.T) {
assert.NoError(t, w.Wait())
}()
- assert.NoError(t, w.Stop(ctx))
+ assert.NoError(t, w.Stop())
}
func Test_Pipe_StartError(t *testing.T) {
@@ -84,7 +84,7 @@ func Test_Pipe_StartError(t *testing.T) {
}
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -97,7 +97,7 @@ func Test_Pipe_PipeError(t *testing.T) {
}
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -110,7 +110,7 @@ func Test_Pipe_PipeError2(t *testing.T) {
}
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -118,7 +118,7 @@ func Test_Pipe_PipeError2(t *testing.T) {
func Test_Pipe_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
@@ -128,7 +128,7 @@ func Test_Pipe_Failboot(t *testing.T) {
func Test_Pipe_Invalid(t *testing.T) {
cmd := exec.Command("php", "../../tests/invalid.php")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -136,12 +136,12 @@ func Test_Pipe_Invalid(t *testing.T) {
func Test_Pipe_Echo(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -165,13 +165,13 @@ func Test_Pipe_Echo(t *testing.T) {
func Test_Pipe_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
defer func() {
time.Sleep(time.Second)
- err = w.Stop(ctx)
+ err = w.Stop()
assert.Error(t, err)
}()
@@ -191,14 +191,14 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
f := NewPipeFactory()
for n := 0; n < b.N; n++ {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := f.SpawnWorkerWithContext(context.Background(), cmd)
+ w, _ := f.SpawnWorkerWithTimeout(context.Background(), cmd)
go func() {
if w.Wait() != nil {
b.Fail()
}
}()
- err := w.Stop(context.Background())
+ err := w.Stop()
if err != nil {
b.Errorf("error stopping the worker: error %v", err)
}
@@ -208,7 +208,7 @@ func Benchmark_Pipe_SpawnWorker_Stop(b *testing.B) {
func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithContext(context.Background(), cmd)
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(context.Background(), cmd)
sw, err := workerImpl.From(w)
if err != nil {
b.Fatal(err)
@@ -222,7 +222,7 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
}
}()
defer func() {
- err := w.Stop(context.Background())
+ err := w.Stop()
if err != nil {
b.Errorf("error stopping the worker: error %v", err)
}
@@ -238,13 +238,13 @@ func Benchmark_Pipe_Worker_ExecEcho(b *testing.B) {
func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
b.Errorf("error stopping the Process: error %v", err)
}
@@ -265,13 +265,13 @@ func Benchmark_Pipe_Worker_ExecEcho3(b *testing.B) {
func Benchmark_Pipe_Worker_ExecEchoWithoutContext(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
b.Errorf("error stopping the Process: error %v", err)
}
@@ -293,7 +293,7 @@ func Test_Echo(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -306,7 +306,7 @@ func Test_Echo(t *testing.T) {
assert.NoError(t, syncWorker.Wait())
}()
defer func() {
- err := syncWorker.Stop(ctx)
+ err := syncWorker.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -326,7 +326,7 @@ func Test_BadPayload(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
syncWorker, err := workerImpl.From(w)
if err != nil {
@@ -337,7 +337,7 @@ func Test_BadPayload(t *testing.T) {
assert.NoError(t, syncWorker.Wait())
}()
defer func() {
- err := syncWorker.Stop(ctx)
+ err := syncWorker.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -356,12 +356,12 @@ func Test_String(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
defer func() {
- err := w.Stop(ctx)
+ err := w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -376,12 +376,12 @@ func Test_Echo_Slow(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "pipes", "10", "10")
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
defer func() {
- err := w.Stop(ctx)
+ err := w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -406,7 +406,7 @@ func Test_Broken(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "broken", "pipes")
- w, err := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -437,20 +437,20 @@ func Test_Broken(t *testing.T) {
t.Fail()
}
mu.Unlock()
- assert.Error(t, w.Stop(ctx))
+ assert.Error(t, w.Stop())
}
func Test_Error(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "error", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
defer func() {
- err := w.Stop(ctx)
+ err := w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -476,12 +476,12 @@ func Test_NumExecs(t *testing.T) {
ctx := context.Background()
cmd := exec.Command("php", "../../tests/client.php", "echo", "pipes")
- w, _ := NewPipeFactory().SpawnWorkerWithContext(ctx, cmd)
+ w, _ := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
defer func() {
- err := w.Stop(ctx)
+ err := w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
diff --git a/pkg/pool/static_pool.go b/pkg/pool/static_pool.go
index 9cf79fd4..2a06b255 100755
--- a/pkg/pool/static_pool.go
+++ b/pkg/pool/static_pool.go
@@ -3,6 +3,7 @@ package pool
import (
"context"
"os/exec"
+ "time"
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/interfaces/events"
@@ -18,8 +19,6 @@ import (
// StopRequest can be sent by worker to indicate that restart is required.
const StopRequest = "{\"stop\":true}"
-var bCtx = context.Background()
-
// ErrorEncoder encode error or make a decision based on the error type
type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)
@@ -77,10 +76,10 @@ func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory worker.Factory,
before: make([]Before, 0, 0),
}
- p.allocator = newPoolAllocator(factory, cmd)
+ p.allocator = newPoolAllocator(ctx, p.cfg.AllocateTimeout, factory, cmd)
p.ww = workerWatcher.NewWorkerWatcher(p.allocator, p.cfg.NumWorkers, p.events)
- workers, err := p.allocateWorkers(ctx, p.cfg.NumWorkers)
+ workers, err := p.allocateWorkers(p.cfg.NumWorkers)
if err != nil {
return nil, errors.E(op, err)
}
@@ -169,7 +168,7 @@ func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error) {
// TODO careful with string(rsp.Context)
if len(rsp.Body) == 0 && string(rsp.Context) == StopRequest {
sw.State().Set(internal.StateInvalid)
- err = sw.Stop(bCtx)
+ err = sw.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
}
@@ -204,8 +203,6 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload)
return payload.Payload{}, errors.E(op, err)
}
- sw := w.(worker.SyncWorker)
-
// apply all before function
if len(sp.before) > 0 {
for i := 0; i < len(sp.before); i++ {
@@ -213,29 +210,29 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload)
}
}
- rsp, err := sw.ExecWithContext(ctx, rqs)
+ rsp, err := w.ExecWithTimeout(ctx, rqs)
if err != nil {
- return sp.errEncoder(err, sw)
+ return sp.errEncoder(err, w)
}
// worker want's to be terminated
if rsp.Body == nil && rsp.Context != nil && string(rsp.Context) == StopRequest {
- sw.State().Set(internal.StateInvalid)
- err = sw.Stop(bCtx)
+ w.State().Set(internal.StateInvalid)
+ err = w.Stop()
if err != nil {
- sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: errors.E(op, err)})
+ sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
return sp.Exec(rqs)
}
- if sp.cfg.MaxJobs != 0 && sw.State().NumExecs() >= sp.cfg.MaxJobs {
+ if sp.cfg.MaxJobs != 0 && w.State().NumExecs() >= sp.cfg.MaxJobs {
err = sp.ww.AllocateNew()
if err != nil {
return payload.Payload{}, errors.E(op, err)
}
} else {
- sp.ww.PushWorker(sw)
+ sp.ww.PushWorker(w)
}
// apply all after functions
@@ -248,7 +245,7 @@ func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs payload.Payload)
return rsp, nil
}
-func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.BaseProcess, error) {
+func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worker.SyncWorker, error) {
// GetFreeWorker function consumes context with timeout
w, err := sp.ww.GetFreeWorker(ctxGetFree)
if err != nil {
@@ -260,7 +257,7 @@ func (sp *StaticPool) getWorker(ctxGetFree context.Context, op errors.Op) (worke
// else if err not nil - return error
return nil, errors.E(op, err)
}
- return w, nil
+ return w.(worker.SyncWorker), nil
}
// Destroy all underlying stack (but let them to complete the task).
@@ -280,7 +277,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
w.State().Set(internal.StateInvalid)
- err = w.Stop(bCtx)
+ err = w.Stop()
if err != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: w, Payload: errors.E(op, err)})
}
@@ -293,7 +290,7 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
w.State().Set(internal.StateInvalid)
sp.events.Push(events.PoolEvent{Event: events.EventWorkerDestruct, Payload: w})
- errS := w.Stop(bCtx)
+ errS := w.Stop()
if errS != nil {
return payload.Payload{}, errors.E(op, errors.Errorf("%v, %v", err, errS))
@@ -303,9 +300,11 @@ func defaultErrEncoder(sp *StaticPool) ErrorEncoder {
}
}
-func newPoolAllocator(factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator {
+func newPoolAllocator(ctx context.Context, timeout time.Duration, factory worker.Factory, cmd func() *exec.Cmd) worker.Allocator {
return func() (worker.BaseProcess, error) {
- w, err := factory.SpawnWorkerWithContext(bCtx, cmd())
+ ctx, cancel := context.WithTimeout(ctx, timeout)
+ defer cancel()
+ w, err := factory.SpawnWorkerWithTimeout(ctx, cmd())
if err != nil {
return nil, err
}
@@ -326,7 +325,7 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
r, err := sw.(worker.SyncWorker).Exec(p)
- if stopErr := sw.Stop(context.Background()); stopErr != nil {
+ if stopErr := sw.Stop(); stopErr != nil {
sp.events.Push(events.WorkerEvent{Event: events.EventWorkerError, Worker: sw, Payload: err})
}
@@ -334,20 +333,22 @@ func (sp *StaticPool) execDebug(p payload.Payload) (payload.Payload, error) {
}
// allocate required number of stack
-func (sp *StaticPool) allocateWorkers(ctx context.Context, numWorkers int64) ([]worker.BaseProcess, error) {
+func (sp *StaticPool) allocateWorkers(numWorkers int64) ([]worker.BaseProcess, error) {
const op = errors.Op("allocate workers")
var workers []worker.BaseProcess
// constant number of stack simplify logic
for i := int64(0); i < numWorkers; i++ {
- ctx, cancel := context.WithTimeout(ctx, sp.cfg.AllocateTimeout)
- w, err := sp.factory.SpawnWorkerWithContext(ctx, sp.cmd())
+ w, err := sp.allocator()
if err != nil {
- cancel()
return nil, errors.E(op, errors.WorkerAllocate, err)
}
- workers = append(workers, w)
- cancel()
+
+ sw, err := syncWorker.From(w)
+ if err != nil {
+ return nil, errors.E(op, err)
+ }
+ workers = append(workers, sw)
}
return workers, nil
}
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index b96e9214..30345aee 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -190,6 +190,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
}
})
+ time.Sleep(time.Second)
res, err := p.ExecWithContext(ctx, payload.Payload{Body: []byte("hello")})
assert.Error(t, err)
assert.Nil(t, res.Context)
diff --git a/pkg/socket/socket_factory.go b/pkg/socket/socket_factory.go
index b08d24e4..49456bd9 100755
--- a/pkg/socket/socket_factory.go
+++ b/pkg/socket/socket_factory.go
@@ -85,7 +85,7 @@ type socketSpawn struct {
}
// SpawnWorker creates Process and connects it to appropriate relay or returns error
-func (f *Factory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) {
+func (f *Factory) SpawnWorkerWithTimeout(ctx context.Context, cmd *exec.Cmd) (worker.BaseProcess, error) {
const op = errors.Op("spawn_worker_with_context")
c := make(chan socketSpawn)
go func() {
@@ -174,7 +174,7 @@ func (f *Factory) SpawnWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
}
// Close socket factory and underlying socket connection.
-func (f *Factory) Close(ctx context.Context) error {
+func (f *Factory) Close() error {
return f.ls.Close()
}
diff --git a/pkg/socket/socket_factory_test.go b/pkg/socket/socket_factory_test.go
index 6a88713a..983f3e8e 100755
--- a/pkg/socket/socket_factory_test.go
+++ b/pkg/socket/socket_factory_test.go
@@ -31,7 +31,7 @@ func Test_Tcp_Start(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
@@ -39,7 +39,7 @@ func Test_Tcp_Start(t *testing.T) {
assert.NoError(t, w.Wait())
}()
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -64,11 +64,11 @@ func Test_Tcp_StartCloseFactory(t *testing.T) {
}
}()
- w, err := f.SpawnWorkerWithContext(ctx, cmd)
+ w, err := f.SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -95,7 +95,7 @@ func Test_Tcp_StartError(t *testing.T) {
t.Errorf("error executing the command: error %v", err)
}
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -118,7 +118,7 @@ func Test_Tcp_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithContext(ctx, cmd)
+ w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err2)
assert.Contains(t, err2.Error(), "failboot")
@@ -141,7 +141,7 @@ func Test_Tcp_Timeout(t *testing.T) {
cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "tcp", "200", "0")
- w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Millisecond*1).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
assert.Contains(t, err.Error(), "context deadline exceeded")
@@ -164,7 +164,7 @@ func Test_Tcp_Invalid(t *testing.T) {
cmd := exec.Command("php", "../../tests/invalid.php")
- w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Second*1).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -186,7 +186,7 @@ func Test_Tcp_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -201,7 +201,7 @@ func Test_Tcp_Broken(t *testing.T) {
defer func() {
time.Sleep(time.Second)
- err2 := w.Stop(ctx)
+ err2 := w.Stop()
// write tcp 127.0.0.1:9007->127.0.0.1:34204: use of closed network connection
assert.Error(t, err2)
}()
@@ -235,12 +235,12 @@ func Test_Tcp_Echo(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, _ := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
go func() {
assert.NoError(t, w.Wait())
}()
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -277,7 +277,7 @@ func Test_Unix_Start(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
assert.NoError(t, err)
assert.NotNil(t, w)
@@ -285,7 +285,7 @@ func Test_Unix_Start(t *testing.T) {
assert.NoError(t, w.Wait())
}()
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -307,7 +307,7 @@ func Test_Unix_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../tests/failboot.php")
- w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
assert.Contains(t, err.Error(), "failboot")
@@ -329,7 +329,7 @@ func Test_Unix_Timeout(t *testing.T) {
cmd := exec.Command("php", "../../tests/slow-client.php", "echo", "unix", "200", "0")
- w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Millisecond*100).SpawnWorkerWithTimeout(ctx, cmd)
assert.Nil(t, w)
assert.Error(t, err)
assert.Contains(t, err.Error(), "context deadline exceeded")
@@ -351,7 +351,7 @@ func Test_Unix_Invalid(t *testing.T) {
cmd := exec.Command("php", "../../tests/invalid.php")
- w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Second*10).SpawnWorkerWithTimeout(ctx, cmd)
assert.Error(t, err)
assert.Nil(t, w)
}
@@ -372,7 +372,7 @@ func Test_Unix_Broken(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "broken", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -387,7 +387,7 @@ func Test_Unix_Broken(t *testing.T) {
defer func() {
time.Sleep(time.Second)
- err = w.Stop(ctx)
+ err = w.Stop()
assert.Error(t, err)
}()
@@ -420,7 +420,7 @@ func Test_Unix_Echo(t *testing.T) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
t.Fatal(err)
}
@@ -428,7 +428,7 @@ func Test_Unix_Echo(t *testing.T) {
assert.NoError(t, w.Wait())
}()
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
t.Errorf("error stopping the Process: error %v", err)
}
@@ -467,7 +467,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
for n := 0; n < b.N; n++ {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- w, err := f.SpawnWorkerWithContext(ctx, cmd)
+ w, err := f.SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
@@ -475,7 +475,7 @@ func Benchmark_Tcp_SpawnWorker_Stop(b *testing.B) {
assert.NoError(b, w.Wait())
}()
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
b.Errorf("error stopping the Process: error %v", err)
}
@@ -498,12 +498,12 @@ func Benchmark_Tcp_Worker_ExecEcho(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
b.Errorf("error stopping the Process: error %v", err)
}
@@ -539,11 +539,11 @@ func Benchmark_Unix_SpawnWorker_Stop(b *testing.B) {
for n := 0; n < b.N; n++ {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
- w, err := f.SpawnWorkerWithContext(ctx, cmd)
+ w, err := f.SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
b.Errorf("error stopping the Process: error %v", err)
}
@@ -566,12 +566,12 @@ func Benchmark_Unix_Worker_ExecEcho(b *testing.B) {
cmd := exec.Command("php", "../../tests/client.php", "echo", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithContext(ctx, cmd)
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
if err != nil {
b.Fatal(err)
}
defer func() {
- err = w.Stop(ctx)
+ err = w.Stop()
if err != nil {
b.Errorf("error stopping the Process: error %v", err)
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index eacb8a8a..11992f22 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -63,9 +63,10 @@ type wexec struct {
}
// Exec payload without TTL timeout.
-func (tw *syncWorker) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error) {
- const op = errors.Op("ExecWithContext")
+func (tw *syncWorker) ExecWithTimeout(ctx context.Context, p payload.Payload) (payload.Payload, error) {
+ const op = errors.Op("ExecWithTimeout")
c := make(chan wexec, 1)
+
go func() {
if len(p.Body) == 0 && len(p.Context) == 0 {
c <- wexec{
@@ -211,8 +212,8 @@ func (tw *syncWorker) Wait() error {
return tw.w.Wait()
}
-func (tw *syncWorker) Stop(ctx context.Context) error {
- return tw.w.Stop(ctx)
+func (tw *syncWorker) Stop() error {
+ return tw.w.Stop()
}
func (tw *syncWorker) Kill() error {
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index ae59d611..456f4bea 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -2,7 +2,6 @@ package worker
import (
"bytes"
- "context"
"fmt"
"io"
"os"
@@ -30,13 +29,6 @@ const (
ReadBufSize = 10240 // Kb
)
-var syncPool = sync.Pool{
- New: func() interface{} {
- buf := make([]byte, ReadBufSize)
- return &buf
- },
-}
-
// Process - supervised process with api over goridge.Relay.
type Process struct {
// created indicates at what time Process has been created.
@@ -79,6 +71,8 @@ type Process struct {
rd io.Reader
// stop signal terminates io.Pipe from reading from stderr
stop chan struct{}
+
+ syncPool sync.Pool
}
// InitBaseWorker creates new Process over given exec.cmd.
@@ -93,6 +87,14 @@ func InitBaseWorker(cmd *exec.Cmd) (worker.BaseProcess, error) {
state: internal.NewWorkerState(internal.StateInactive),
stderr: new(bytes.Buffer),
stop: make(chan struct{}, 1),
+ // sync pool for STDERR
+ // All receivers are pointers
+ syncPool: sync.Pool{
+ New: func() interface{} {
+ buf := make([]byte, ReadBufSize)
+ return &buf
+ },
+ },
}
w.rd, w.cmd.Stderr = io.Pipe()
@@ -217,30 +219,16 @@ func (w *Process) closeRelay() error {
}
// Stop sends soft termination command to the Process and waits for process completion.
-func (w *Process) Stop(ctx context.Context) error {
- c := make(chan error)
-
- go func() {
- var err error
- w.state.Set(internal.StateStopping)
- err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true}))
- if err != nil {
- w.state.Set(internal.StateKilling)
- c <- multierr.Append(err, w.cmd.Process.Kill())
- }
- w.state.Set(internal.StateStopped)
- c <- nil
- }()
-
- select {
- case <-ctx.Done():
- return ctx.Err()
- case err := <-c:
- if err != nil {
- return err
- }
- return nil
+func (w *Process) Stop() error {
+ var err error
+ w.state.Set(internal.StateStopping)
+ err = multierr.Append(err, internal.SendControl(w.relay, &internal.StopCommand{Stop: true}))
+ if err != nil {
+ w.state.Set(internal.StateKilling)
+ return multierr.Append(err, w.cmd.Process.Kill())
}
+ w.state.Set(internal.StateStopped)
+ return nil
}
// Kill kills underlying process, make sure to call Wait() func to gather
@@ -258,15 +246,12 @@ func (w *Process) Kill() error {
// put the pointer, to not allocate new slice
// but erase it len and then return back
func (w *Process) put(data *[]byte) {
- *data = (*data)[:0]
- *data = (*data)[:cap(*data)]
-
- syncPool.Put(data)
+ w.syncPool.Put(data)
}
// get pointer to the byte slice
func (w *Process) get() *[]byte {
- return syncPool.Get().(*[]byte)
+ return w.syncPool.Get().(*[]byte)
}
// Write appends the contents of pool to the errBuffer, growing the errBuffer as
@@ -282,6 +267,7 @@ func (w *Process) watch() {
w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
w.mu.Lock()
// write new message
+ // we are sending only n read bytes, without sending previously written message as bytes slice from syncPool
w.stderr.Write((*buf)[:n])
w.mu.Unlock()
w.put(buf)
diff --git a/pkg/worker_watcher/worker_watcher.go b/pkg/worker_watcher/worker_watcher.go
index 8788e509..918145e5 100755
--- a/pkg/worker_watcher/worker_watcher.go
+++ b/pkg/worker_watcher/worker_watcher.go
@@ -10,7 +10,6 @@ import (
"github.com/spiral/roadrunner/v2/interfaces/events"
"github.com/spiral/roadrunner/v2/interfaces/worker"
"github.com/spiral/roadrunner/v2/internal"
- syncWorker "github.com/spiral/roadrunner/v2/pkg/worker"
)
type Stack struct {
@@ -163,16 +162,12 @@ type workerWatcher struct {
func (ww *workerWatcher) AddToWatch(workers []worker.BaseProcess) error {
for i := 0; i < len(workers); i++ {
- sw, err := syncWorker.From(workers[i])
- if err != nil {
- return err
- }
- ww.stack.Push(sw)
- sw.AddListener(ww.events.Push)
+ ww.stack.Push(workers[i])
+ workers[i].AddListener(ww.events.Push)
go func(swc worker.BaseProcess) {
ww.wait(swc)
- }(sw)
+ }(workers[i])
}
return nil
}