summaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-03 17:31:17 +0300
committerValery Piashchynski <[email protected]>2021-02-03 17:31:17 +0300
commit8eda5dc6f0f7e05d7b3d62e1861af05b49a2574a (patch)
treefae66ad49d2a4624a7caf45a5bf07d53e5c7d26f /pkg
parent20a1a5d2eb26090e0eef0e6772330ee2a52526fa (diff)
Fix memory leak in the Worker.go
Diffstat (limited to 'pkg')
-rw-r--r--pkg/events/worker_events.go5
-rwxr-xr-xpkg/pool/static_pool_test.go2
-rw-r--r--pkg/states/worker_states.go2
-rw-r--r--pkg/transport/pipe/pipe_factory_spawn_test.go14
-rwxr-xr-xpkg/transport/pipe/pipe_factory_test.go16
-rw-r--r--pkg/transport/socket/socket_factory_spawn_test.go62
-rwxr-xr-xpkg/transport/socket/socket_factory_test.go64
-rwxr-xr-xpkg/worker/sync_worker.go2
-rwxr-xr-xpkg/worker/worker.go134
9 files changed, 168 insertions, 133 deletions
diff --git a/pkg/events/worker_events.go b/pkg/events/worker_events.go
index 9d428f7d..11bd6ab7 100644
--- a/pkg/events/worker_events.go
+++ b/pkg/events/worker_events.go
@@ -3,9 +3,10 @@ package events
const (
// EventWorkerError triggered after WorkerProcess. Except payload to be error.
EventWorkerError W = iota + 11000
-
// EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
EventWorkerLog
+ // EventWorkerStderr is the worker standard error output
+ EventWorkerStderr
)
type W int64
@@ -16,6 +17,8 @@ func (ev W) String() string {
return "EventWorkerError"
case EventWorkerLog:
return "EventWorkerLog"
+ case EventWorkerStderr:
+ return "EventWorkerStderr"
}
return "Unknown event type"
}
diff --git a/pkg/pool/static_pool_test.go b/pkg/pool/static_pool_test.go
index 2d2b2b7d..a8fe3baa 100755
--- a/pkg/pool/static_pool_test.go
+++ b/pkg/pool/static_pool_test.go
@@ -171,7 +171,7 @@ func Test_StaticPool_Broken_Replace(t *testing.T) {
listener := func(event interface{}) {
if wev, ok := event.(events.WorkerEvent); ok {
- if wev.Event == events.EventWorkerLog {
+ if wev.Event == events.EventWorkerStderr {
e := string(wev.Payload.([]byte))
if strings.ContainsAny(e, "undefined_function()") {
block <- struct{}{}
diff --git a/pkg/states/worker_states.go b/pkg/states/worker_states.go
index 22fdfe8a..fe653cb4 100644
--- a/pkg/states/worker_states.go
+++ b/pkg/states/worker_states.go
@@ -16,6 +16,7 @@ const (
// StateStopping - process is being softly stopped.
StateStopping
+ // StateKilling - process is being forcibly stopped
StateKilling
// State of worker, when no need to allocate new one
@@ -27,5 +28,6 @@ const (
// StateErrored - error WorkerState (can't be used).
StateErrored
+ // StateRemove - worker is killed and removed from the stack
StateRemove
)
diff --git a/pkg/transport/pipe/pipe_factory_spawn_test.go b/pkg/transport/pipe/pipe_factory_spawn_test.go
index e247324c..663b3dd5 100644
--- a/pkg/transport/pipe/pipe_factory_spawn_test.go
+++ b/pkg/transport/pipe/pipe_factory_spawn_test.go
@@ -106,11 +106,21 @@ func Test_Pipe_PipeError4(t *testing.T) {
func Test_Pipe_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../../tests/failboot.php")
- w, err := NewPipeFactory().SpawnWorker(cmd)
+ finish := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+ w, err := NewPipeFactory().SpawnWorker(cmd, listener)
assert.Nil(t, w)
assert.Error(t, err)
- assert.Contains(t, err.Error(), "failboot")
+ <-finish
}
func Test_Pipe_Invalid2(t *testing.T) {
diff --git a/pkg/transport/pipe/pipe_factory_test.go b/pkg/transport/pipe/pipe_factory_test.go
index b23af19f..6045dd91 100755
--- a/pkg/transport/pipe/pipe_factory_test.go
+++ b/pkg/transport/pipe/pipe_factory_test.go
@@ -117,11 +117,23 @@ func Test_Pipe_PipeError2(t *testing.T) {
func Test_Pipe_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../../tests/failboot.php")
ctx := context.Background()
- w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd)
+
+ finish := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err := NewPipeFactory().SpawnWorkerWithTimeout(ctx, cmd, listener)
assert.Nil(t, w)
assert.Error(t, err)
- assert.Contains(t, err.Error(), "failboot")
+ <-finish
}
func Test_Pipe_Invalid(t *testing.T) {
diff --git a/pkg/transport/socket/socket_factory_spawn_test.go b/pkg/transport/socket/socket_factory_spawn_test.go
index 0e29e7d2..50729546 100644
--- a/pkg/transport/socket/socket_factory_spawn_test.go
+++ b/pkg/transport/socket/socket_factory_spawn_test.go
@@ -3,11 +3,13 @@ package socket
import (
"net"
"os/exec"
+ "strings"
"sync"
"syscall"
"testing"
"time"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
@@ -108,10 +110,21 @@ func Test_Tcp_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../../tests/failboot.php")
- w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
+ finish := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener)
assert.Nil(t, w)
assert.Error(t, err2)
- assert.Contains(t, err2.Error(), "failboot")
+ <-finish
}
func Test_Tcp_Invalid2(t *testing.T) {
@@ -149,7 +162,18 @@ func Test_Tcp_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ finish := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener)
if err != nil {
t.Fatal(err)
}
@@ -159,7 +183,6 @@ func Test_Tcp_Broken2(t *testing.T) {
defer wg.Done()
err := w.Wait()
assert.Error(t, err)
- assert.Contains(t, err.Error(), "undefined_function()")
}()
defer func() {
@@ -176,6 +199,7 @@ func Test_Tcp_Broken2(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
wg.Wait()
+ <-finish
}
func Test_Tcp_Echo2(t *testing.T) {
@@ -250,10 +274,21 @@ func Test_Unix_Failboot2(t *testing.T) {
cmd := exec.Command("php", "../../../tests/failboot.php")
- w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd)
+ finish := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err := NewSocketServer(ls, time.Second*5).SpawnWorker(cmd, listener)
assert.Nil(t, w)
assert.Error(t, err)
- assert.Contains(t, err.Error(), "failboot")
+ <-finish
}
func Test_Unix_Timeout2(t *testing.T) {
@@ -297,7 +332,18 @@ func Test_Unix_Broken2(t *testing.T) {
cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd)
+ finish := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorker(cmd, listener)
if err != nil {
t.Fatal(err)
}
@@ -307,7 +353,6 @@ func Test_Unix_Broken2(t *testing.T) {
defer wg.Done()
err := w.Wait()
assert.Error(t, err)
- assert.Contains(t, err.Error(), "undefined_function()")
}()
defer func() {
@@ -324,6 +369,7 @@ func Test_Unix_Broken2(t *testing.T) {
assert.Nil(t, res.Context)
assert.Nil(t, res.Body)
wg.Wait()
+ <-finish
}
func Test_Unix_Echo2(t *testing.T) {
diff --git a/pkg/transport/socket/socket_factory_test.go b/pkg/transport/socket/socket_factory_test.go
index f55fc3dd..4abcd5d9 100755
--- a/pkg/transport/socket/socket_factory_test.go
+++ b/pkg/transport/socket/socket_factory_test.go
@@ -4,10 +4,12 @@ import (
"context"
"net"
"os/exec"
+ "strings"
"sync"
"testing"
"time"
+ "github.com/spiral/roadrunner/v2/pkg/events"
"github.com/spiral/roadrunner/v2/pkg/payload"
"github.com/spiral/roadrunner/v2/pkg/worker"
"github.com/stretchr/testify/assert"
@@ -118,10 +120,21 @@ func Test_Tcp_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../../tests/failboot.php")
- w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
+ finish := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err2 := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener)
assert.Nil(t, w)
assert.Error(t, err2)
- assert.Contains(t, err2.Error(), "failboot")
+ <-finish
}
func Test_Tcp_Timeout(t *testing.T) {
@@ -186,7 +199,18 @@ func Test_Tcp_Broken(t *testing.T) {
cmd := exec.Command("php", "../../../tests/client.php", "broken", "tcp")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ finish := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "undefined_function()") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener)
if err != nil {
t.Fatal(err)
}
@@ -196,7 +220,6 @@ func Test_Tcp_Broken(t *testing.T) {
defer wg.Done()
err := w.Wait()
assert.Error(t, err)
- assert.Contains(t, err.Error(), "undefined_function()")
}()
defer func() {
@@ -213,6 +236,7 @@ func Test_Tcp_Broken(t *testing.T) {
assert.Nil(t, res.Body)
assert.Nil(t, res.Context)
wg.Wait()
+ <-finish
}
func Test_Tcp_Echo(t *testing.T) {
@@ -301,10 +325,21 @@ func Test_Unix_Failboot(t *testing.T) {
cmd := exec.Command("php", "../../../tests/failboot.php")
- w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd)
+ finish := make(chan struct{}, 1)
+ listener := func(event interface{}) {
+ if ev, ok := event.(events.WorkerEvent); ok {
+ if ev.Event == events.EventWorkerStderr {
+ if strings.Contains(string(ev.Payload.([]byte)), "failboot") {
+ finish <- struct{}{}
+ }
+ }
+ }
+ }
+
+ w, err := NewSocketServer(ls, time.Second*5).SpawnWorkerWithTimeout(ctx, cmd, listener)
assert.Nil(t, w)
assert.Error(t, err)
- assert.Contains(t, err.Error(), "failboot")
+ <-finish
}
func Test_Unix_Timeout(t *testing.T) {
@@ -366,7 +401,20 @@ func Test_Unix_Broken(t *testing.T) {
cmd := exec.Command("php", "../../../tests/client.php", "broken", "unix")
- w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd)
+ block := make(chan struct{})
+ listener := func(event interface{}) {
+ if wev, ok := event.(events.WorkerEvent); ok {
+ if wev.Event == events.EventWorkerStderr {
+ e := string(wev.Payload.([]byte))
+ if strings.ContainsAny(e, "undefined_function()") {
+ block <- struct{}{}
+ return
+ }
+ }
+ }
+ }
+
+ w, err := NewSocketServer(ls, time.Minute).SpawnWorkerWithTimeout(ctx, cmd, listener)
if err != nil {
t.Fatal(err)
}
@@ -376,7 +424,6 @@ func Test_Unix_Broken(t *testing.T) {
defer wg.Done()
err := w.Wait()
assert.Error(t, err)
- assert.Contains(t, err.Error(), "undefined_function()")
}()
defer func() {
@@ -392,6 +439,7 @@ func Test_Unix_Broken(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, res.Context)
assert.Nil(t, res.Body)
+ <-block
wg.Wait()
}
diff --git a/pkg/worker/sync_worker.go b/pkg/worker/sync_worker.go
index 696fbdb7..010af076 100755
--- a/pkg/worker/sync_worker.go
+++ b/pkg/worker/sync_worker.go
@@ -36,10 +36,8 @@ func FromSync(w *SyncWorkerImpl) BaseProcess {
state: w.process.state,
cmd: w.process.cmd,
pid: w.process.pid,
- stderr: w.process.stderr,
endState: w.process.endState,
relay: w.process.relay,
- rd: w.process.rd,
}
}
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 2f1f399d..b726c6f1 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -1,14 +1,11 @@
package worker
import (
- "bytes"
"fmt"
- "io"
"os"
"os/exec"
"strconv"
"strings"
- "sync"
"time"
"github.com/spiral/errors"
@@ -53,27 +50,11 @@ type Process struct {
// can be nil while process is not started.
pid int
- // stderr aggregates stderr output from underlying process. Value can be
- // receive only once command is completed and all pipes are closed.
- stderr *bytes.Buffer
-
- // channel is being closed once command is complete.
- // waitDone chan interface{}
-
// contains information about resulted process state.
endState *os.ProcessState
- // ensures than only one execution can be run at once.
- mu sync.RWMutex
-
// communication bus with underlying process.
relay relay.Relay
- // rd in a second part of pipe to read from stderr
- rd io.Reader
- // stop signal terminates io.Pipe from reading from stderr
- stop chan struct{}
-
- syncPool sync.Pool
}
// InitBaseWorker creates new Process over given exec.cmd.
@@ -87,33 +68,16 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
events: events.NewEventsHandler(),
cmd: cmd,
state: internal.NewWorkerState(states.StateInactive),
- stderr: new(bytes.Buffer),
- stop: make(chan struct{}, 1),
- // sync pool for STDERR
- // All receivers are pointers
- syncPool: sync.Pool{
- New: func() interface{} {
- buf := make([]byte, ReadBufSize)
- return &buf
- },
- },
}
- w.rd, w.cmd.Stderr = io.Pipe()
-
- // small buffer optimization
- // at this point we know, that stderr will contain huge messages
- w.stderr.Grow(ReadBufSize)
+ // set self as stderr implementation (Writer interface)
+ w.cmd.Stderr = w
// add options
for i := 0; i < len(options); i++ {
options[i](w)
}
- go func() {
- w.watch()
- }()
-
return w, nil
}
@@ -189,44 +153,36 @@ func (w *Process) Start() error {
// to find or Start the script.
func (w *Process) Wait() error {
const op = errors.Op("process_wait")
- err := multierr.Combine(w.cmd.Wait())
+ var err error
+ err = w.cmd.Wait()
+ // If worker was destroyed, just exit
if w.State().Value() == states.StateDestroyed {
- return errors.E(op, err)
+ return nil
}
- // at this point according to the documentation (see cmd.Wait comment)
- // if worker finishes with an error, message will be written to the stderr first
- // and then process.cmd.Wait return an error
- w.endState = w.cmd.ProcessState
+ // If state is different, and err is not nil, append it to the errors
if err != nil {
- w.state.Set(states.StateErrored)
-
- w.mu.RLock()
- // if process return code > 0, here will be an error from stderr (if presents)
- if w.stderr.Len() > 0 {
- err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String())))
- // stop the stderr buffer
- w.stop <- struct{}{}
- }
- w.mu.RUnlock()
-
- return multierr.Append(err, w.closeRelay())
+ w.State().Set(states.StateErrored)
+ err = multierr.Combine(err, errors.E(op, err))
}
- err = multierr.Append(err, w.closeRelay())
- if err != nil {
- w.state.Set(states.StateErrored)
- return err
+ // closeRelay
+ // at this point according to the documentation (see cmd.Wait comment)
+ // if worker finishes with an error, message will be written to the stderr first
+ // and then process.cmd.Wait return an error
+ err2 := w.closeRelay()
+ if err2 != nil {
+ w.State().Set(states.StateErrored)
+ return multierr.Append(err, errors.E(op, err2))
}
- if w.endState.Success() {
- w.state.Set(states.StateStopped)
+ if w.cmd.ProcessState.Success() {
+ w.State().Set(states.StateStopped)
+ return nil
}
- w.stderr.Reset()
-
- return nil
+ return err
}
func (w *Process) closeRelay() error {
@@ -272,48 +228,8 @@ func (w *Process) Kill() error {
return nil
}
-// put the pointer, to not allocate new slice
-// but erase it len and then return back
-func (w *Process) put(data *[]byte) {
- w.syncPool.Put(data)
-}
-
-// get pointer to the byte slice
-func (w *Process) get() *[]byte {
- return w.syncPool.Get().(*[]byte)
-}
-
-// Write appends the contents of pool to the errBuffer, growing the errBuffer as
-// needed. The return value n is the length of pool; errBuffer is always nil.
-func (w *Process) watch() {
- go func() {
- for {
- select {
- case <-w.stop:
- buf := w.get()
- // read the last data
- n, _ := w.rd.Read(*buf)
- w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
- w.mu.Lock()
- // write new message
- // we are sending only n read bytes, without sending previously written message as bytes slice from syncPool
- w.stderr.Write((*buf)[:n])
- w.mu.Unlock()
- w.put(buf)
- return
- default:
- // read the max 10kb of stderr per one read
- buf := w.get()
- n, _ := w.rd.Read(*buf)
- w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
- w.mu.Lock()
- // delete all prev messages
- w.stderr.Reset()
- // write new message
- w.stderr.Write((*buf)[:n])
- w.mu.Unlock()
- w.put(buf)
- }
- }
- }()
+// Worker stderr
+func (w *Process) Write(p []byte) (n int, err error) {
+ w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p})
+ return len(p), nil
}