diff options
author | Valery Piashchynski <[email protected]> | 2021-02-03 17:31:17 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-02-03 17:31:17 +0300 |
commit | 8eda5dc6f0f7e05d7b3d62e1861af05b49a2574a (patch) | |
tree | fae66ad49d2a4624a7caf45a5bf07d53e5c7d26f /pkg | |
parent | 20a1a5d2eb26090e0eef0e6772330ee2a52526fa (diff) |
Fix memory leak in the Worker.go
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/events/worker_events.go | 5 | ||||
-rwxr-xr-x | pkg/pool/static_pool_test.go | 2 | ||||
-rw-r--r-- | pkg/states/worker_states.go | 2 | ||||
-rw-r--r-- | pkg/transport/pipe/pipe_factory_spawn_test.go | 14 | ||||
-rwxr-xr-x | pkg/transport/pipe/pipe_factory_test.go | 16 | ||||
-rw-r--r-- | pkg/transport/socket/socket_factory_spawn_test.go | 62 | ||||
-rwxr-xr-x | pkg/transport/socket/socket_factory_test.go | 64 | ||||
-rwxr-xr-x | pkg/worker/sync_worker.go | 2 | ||||
-rwxr-xr-x | pkg/worker/worker.go | 134 |
9 files changed, 168 insertions, 133 deletions
diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go index 9d428f7d..11bd6ab7 100644 --- a/pkg/events/worker_events.go +++ b/pkg/events/worker_events.go @@ -3,9 +3,10 @@ package events const ( // EventWorkerError triggered after WorkerProcess. Except payload to be error. EventWorkerError W = iota + 11000 - // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. EventWorkerLog + // EventWorkerStderr is the worker standard error output + EventWorkerStderr ) type W int64 @@ -16,6 +17,8 @@ func (ev W) String() string { return "EventWorkerError" case EventWorkerLog: return "EventWorkerLog" + case EventWorkerStderr: + return "EventWorkerStderr" } return "Unknown event type" } diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go index 2d2b2b7d..a8fe3baa 100755 --- a/pkg/pool/static_pool_test.go +++ b/pkg/pool/static_pool_test.go @@ -171,7 +171,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) { listener := func(event interface{}) { if wev, ok := event.(events.WorkerEvent); ok { - if wev.Event == events.EventWorkerLog { + if wev.Event == events.EventWorkerStderr { e := string(wev.Payload.([]byte)) if strings.ContainsAny(e, "undefined_function()") { block <- struct{}{} diff --git a/pkg/states/worker_states.go b/pkg/states/worker_states.go index 22fdfe8a..fe653cb4 100644 --- a/pkg/states/worker_states.go +++ b/pkg/states/worker_states.go @@ -16,6 +16,7 @@ const ( // StateStopping - process is being softly stopped. StateStopping + // StateKilling - process is being forcibly stopped StateKilling // State of worker, when no need to allocate new one @@ -27,5 +28,6 @@ const ( // StateErrored - error WorkerState (can't be used). StateErrored + // StateRemove - worker is killed and removed from the stack StateRemove ) diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go index e247324c..663b3dd5 100644 --- a/pkg/transport/pipe/pipe_factory_spawn_test.go +++ b/pkg/transport/pipe/pipe_factory_spawn_test.go @@ -106,11 +106,21 @@ func Test_Pipe_PipeError4(t *testing.T) { func Test_Pipe_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../../tests/failboot.php") - w, err := NewPipeFactory().SpawnWorker(cmd) + finish := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + w, err := NewPipeFactory().SpawnWorker(cmd, listener) assert.Nil(t, w) assert.Error(t, err) - assert.Contains(t, err.Error(), "failboot") + <-finish } func Test_Pipe_Invalid2(t *testing.T) { diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go index b23af19f..6045dd91 100755 --- a/pkg/transport/pipe/pipe_factory_test.go +++ b/pkg/transport/pipe/pipe_factory_test.go @@ -117,11 +117,23 @@ func Test_Pipe_PipeError2(t *testing.T) { func Test_Pipe_Failboot(t *testing.T) { cmd := exec.Command("php", "../../../tests/failboot.php") ctx := context.Background() - w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd) + + finish := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener) assert.Nil(t, w) assert.Error(t, err) - assert.Contains(t, err.Error(), "failboot") + <-finish } func Test_Pipe_Invalid(t *testing.T) { diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go index 0e29e7d2..50729546 100644 --- a/pkg/transport/socket/socket_factory_spawn_test.go +++ b/pkg/transport/socket/socket_factory_spawn_test.go @@ -3,11 +3,13 @@ package socket import ( "net" "os/exec" + "strings" "sync" "syscall" "testing" "time" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" @@ -108,10 +110,21 @@ func Test_Tcp_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../../tests/failboot.php") - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) + finish := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener) assert.Nil(t, w) assert.Error(t, err2) - assert.Contains(t, err2.Error(), "failboot") + <-finish } func Test_Tcp_Invalid2(t *testing.T) { @@ -149,7 +162,18 @@ func Test_Tcp_Broken2(t *testing.T) { cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp") - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + finish := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener) if err != nil { t.Fatal(err) } @@ -159,7 +183,6 @@ func Test_Tcp_Broken2(t *testing.T) { defer wg.Done() err := w.Wait() assert.Error(t, err) - assert.Contains(t, err.Error(), "undefined_function()") }() defer func() { @@ -176,6 +199,7 @@ func Test_Tcp_Broken2(t *testing.T) { assert.Nil(t, res.Body) assert.Nil(t, res.Context) wg.Wait() + <-finish } func Test_Tcp_Echo2(t *testing.T) { @@ -250,10 +274,21 @@ func Test_Unix_Failboot2(t *testing.T) { cmd := exec.Command("php", "../../../tests/failboot.php") - w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd) + finish := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener) assert.Nil(t, w) assert.Error(t, err) - assert.Contains(t, err.Error(), "failboot") + <-finish } func Test_Unix_Timeout2(t *testing.T) { @@ -297,7 +332,18 @@ func Test_Unix_Broken2(t *testing.T) { cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd) + finish := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener) if err != nil { t.Fatal(err) } @@ -307,7 +353,6 @@ func Test_Unix_Broken2(t *testing.T) { defer wg.Done() err := w.Wait() assert.Error(t, err) - assert.Contains(t, err.Error(), "undefined_function()") }() defer func() { @@ -324,6 +369,7 @@ func Test_Unix_Broken2(t *testing.T) { assert.Nil(t, res.Context) assert.Nil(t, res.Body) wg.Wait() + <-finish } func Test_Unix_Echo2(t *testing.T) { diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go index f55fc3dd..4abcd5d9 100755 --- a/pkg/transport/socket/socket_factory_test.go +++ b/pkg/transport/socket/socket_factory_test.go @@ -4,10 +4,12 @@ import ( "context" "net" "os/exec" + "strings" "sync" "testing" "time" + "github.com/spiral/roadrunner/v2/pkg/events" "github.com/spiral/roadrunner/v2/pkg/payload" "github.com/spiral/roadrunner/v2/pkg/worker" "github.com/stretchr/testify/assert" @@ -118,10 +120,21 @@ func Test_Tcp_Failboot(t *testing.T) { cmd := exec.Command("php", "../../../tests/failboot.php") - w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) + finish := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener) assert.Nil(t, w) assert.Error(t, err2) - assert.Contains(t, err2.Error(), "failboot") + <-finish } func Test_Tcp_Timeout(t *testing.T) { @@ -186,7 +199,18 @@ func Test_Tcp_Broken(t *testing.T) { cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + finish := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener) if err != nil { t.Fatal(err) } @@ -196,7 +220,6 @@ func Test_Tcp_Broken(t *testing.T) { defer wg.Done() err := w.Wait() assert.Error(t, err) - assert.Contains(t, err.Error(), "undefined_function()") }() defer func() { @@ -213,6 +236,7 @@ func Test_Tcp_Broken(t *testing.T) { assert.Nil(t, res.Body) assert.Nil(t, res.Context) wg.Wait() + <-finish } func Test_Tcp_Echo(t *testing.T) { @@ -301,10 +325,21 @@ func Test_Unix_Failboot(t *testing.T) { cmd := exec.Command("php", "../../../tests/failboot.php") - w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd) + finish := make(chan struct{}, 1) + listener := func(event interface{}) { + if ev, ok := event.(events.WorkerEvent); ok { + if ev.Event == events.EventWorkerStderr { + if strings.Contains(string(ev.Payload.([]byte)), "failboot") { + finish <- struct{}{} + } + } + } + } + + w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener) assert.Nil(t, w) assert.Error(t, err) - assert.Contains(t, err.Error(), "failboot") + <-finish } func Test_Unix_Timeout(t *testing.T) { @@ -366,7 +401,20 @@ func Test_Unix_Broken(t *testing.T) { cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix") - w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd) + block := make(chan struct{}) + listener := func(event interface{}) { + if wev, ok := event.(events.WorkerEvent); ok { + if wev.Event == events.EventWorkerStderr { + e := string(wev.Payload.([]byte)) + if strings.ContainsAny(e, "undefined_function()") { + block <- struct{}{} + return + } + } + } + } + + w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener) if err != nil { t.Fatal(err) } @@ -376,7 +424,6 @@ func Test_Unix_Broken(t *testing.T) { defer wg.Done() err := w.Wait() assert.Error(t, err) - assert.Contains(t, err.Error(), "undefined_function()") }() defer func() { @@ -392,6 +439,7 @@ func Test_Unix_Broken(t *testing.T) { assert.Error(t, err) assert.Nil(t, res.Context) assert.Nil(t, res.Body) + <-block wg.Wait() } diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go index 696fbdb7..010af076 100755 --- a/pkg/worker/sync_worker.go +++ b/pkg/worker/sync_worker.go @@ -36,10 +36,8 @@ func FromSync(w *SyncWorkerImpl) BaseProcess { state: w.process.state, cmd: w.process.cmd, pid: w.process.pid, - stderr: w.process.stderr, endState: w.process.endState, relay: w.process.relay, - rd: w.process.rd, } } diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 2f1f399d..b726c6f1 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -1,14 +1,11 @@ package worker import ( - "bytes" "fmt" - "io" "os" "os/exec" "strconv" "strings" - "sync" "time" "github.com/spiral/errors" @@ -53,27 +50,11 @@ type Process struct { // can be nil while process is not started. pid int - // stderr aggregates stderr output from underlying process. Value can be - // receive only once command is completed and all pipes are closed. - stderr *bytes.Buffer - - // channel is being closed once command is complete. - // waitDone chan interface{} - // contains information about resulted process state. endState *os.ProcessState - // ensures than only one execution can be run at once. - mu sync.RWMutex - // communication bus with underlying process. relay relay.Relay - // rd in a second part of pipe to read from stderr - rd io.Reader - // stop signal terminates io.Pipe from reading from stderr - stop chan struct{} - - syncPool sync.Pool } // InitBaseWorker creates new Process over given exec.cmd. @@ -87,33 +68,16 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { events: events.NewEventsHandler(), cmd: cmd, state: internal.NewWorkerState(states.StateInactive), - stderr: new(bytes.Buffer), - stop: make(chan struct{}, 1), - // sync pool for STDERR - // All receivers are pointers - syncPool: sync.Pool{ - New: func() interface{} { - buf := make([]byte, ReadBufSize) - return &buf - }, - }, } - w.rd, w.cmd.Stderr = io.Pipe() - - // small buffer optimization - // at this point we know, that stderr will contain huge messages - w.stderr.Grow(ReadBufSize) + // set self as stderr implementation (Writer interface) + w.cmd.Stderr = w // add options for i := 0; i < len(options); i++ { options[i](w) } - go func() { - w.watch() - }() - return w, nil } @@ -189,44 +153,36 @@ func (w *Process) Start() error { // to find or Start the script. func (w *Process) Wait() error { const op = errors.Op("process_wait") - err := multierr.Combine(w.cmd.Wait()) + var err error + err = w.cmd.Wait() + // If worker was destroyed, just exit if w.State().Value() == states.StateDestroyed { - return errors.E(op, err) + return nil } - // at this point according to the documentation (see cmd.Wait comment) - // if worker finishes with an error, message will be written to the stderr first - // and then process.cmd.Wait return an error - w.endState = w.cmd.ProcessState + // If state is different, and err is not nil, append it to the errors if err != nil { - w.state.Set(states.StateErrored) - - w.mu.RLock() - // if process return code > 0, here will be an error from stderr (if presents) - if w.stderr.Len() > 0 { - err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String()))) - // stop the stderr buffer - w.stop <- struct{}{} - } - w.mu.RUnlock() - - return multierr.Append(err, w.closeRelay()) + w.State().Set(states.StateErrored) + err = multierr.Combine(err, errors.E(op, err)) } - err = multierr.Append(err, w.closeRelay()) - if err != nil { - w.state.Set(states.StateErrored) - return err + // closeRelay + // at this point according to the documentation (see cmd.Wait comment) + // if worker finishes with an error, message will be written to the stderr first + // and then process.cmd.Wait return an error + err2 := w.closeRelay() + if err2 != nil { + w.State().Set(states.StateErrored) + return multierr.Append(err, errors.E(op, err2)) } - if w.endState.Success() { - w.state.Set(states.StateStopped) + if w.cmd.ProcessState.Success() { + w.State().Set(states.StateStopped) + return nil } - w.stderr.Reset() - - return nil + return err } func (w *Process) closeRelay() error { @@ -272,48 +228,8 @@ func (w *Process) Kill() error { return nil } -// put the pointer, to not allocate new slice -// but erase it len and then return back -func (w *Process) put(data *[]byte) { - w.syncPool.Put(data) -} - -// get pointer to the byte slice -func (w *Process) get() *[]byte { - return w.syncPool.Get().(*[]byte) -} - -// Write appends the contents of pool to the errBuffer, growing the errBuffer as -// needed. The return value n is the length of pool; errBuffer is always nil. -func (w *Process) watch() { - go func() { - for { - select { - case <-w.stop: - buf := w.get() - // read the last data - n, _ := w.rd.Read(*buf) - w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) - w.mu.Lock() - // write new message - // we are sending only n read bytes, without sending previously written message as bytes slice from syncPool - w.stderr.Write((*buf)[:n]) - w.mu.Unlock() - w.put(buf) - return - default: - // read the max 10kb of stderr per one read - buf := w.get() - n, _ := w.rd.Read(*buf) - w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) - w.mu.Lock() - // delete all prev messages - w.stderr.Reset() - // write new message - w.stderr.Write((*buf)[:n]) - w.mu.Unlock() - w.put(buf) - } - } - }() +// Worker stderr +func (w *Process) Write(p []byte) (n int, err error) { + w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p}) + return len(p), nil } |