diff options
author | Valery Piashchynski <[email protected]> | 2021-02-03 17:31:17 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-02-03 17:31:17 +0300 |
commit | 8eda5dc6f0f7e05d7b3d62e1861af05b49a2574a (patch) | |
tree | fae66ad49d2a4624a7caf45a5bf07d53e5c7d26f /pkg/worker/worker.go | |
parent | 20a1a5d2eb26090e0eef0e6772330ee2a52526fa (diff) |
Fix memory leak in the Worker.go
Diffstat (limited to 'pkg/worker/worker.go')
-rwxr-xr-x | pkg/worker/worker.go | 134 |
1 files changed, 25 insertions, 109 deletions
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 2f1f399d..b726c6f1 100755 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -1,14 +1,11 @@ package worker import ( - "bytes" "fmt" - "io" "os" "os/exec" "strconv" "strings" - "sync" "time" "github.com/spiral/errors" @@ -53,27 +50,11 @@ type Process struct { // can be nil while process is not started. pid int - // stderr aggregates stderr output from underlying process. Value can be - // receive only once command is completed and all pipes are closed. - stderr *bytes.Buffer - - // channel is being closed once command is complete. - // waitDone chan interface{} - // contains information about resulted process state. endState *os.ProcessState - // ensures than only one execution can be run at once. - mu sync.RWMutex - // communication bus with underlying process. relay relay.Relay - // rd in a second part of pipe to read from stderr - rd io.Reader - // stop signal terminates io.Pipe from reading from stderr - stop chan struct{} - - syncPool sync.Pool } // InitBaseWorker creates new Process over given exec.cmd. @@ -87,33 +68,16 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { events: events.NewEventsHandler(), cmd: cmd, state: internal.NewWorkerState(states.StateInactive), - stderr: new(bytes.Buffer), - stop: make(chan struct{}, 1), - // sync pool for STDERR - // All receivers are pointers - syncPool: sync.Pool{ - New: func() interface{} { - buf := make([]byte, ReadBufSize) - return &buf - }, - }, } - w.rd, w.cmd.Stderr = io.Pipe() - - // small buffer optimization - // at this point we know, that stderr will contain huge messages - w.stderr.Grow(ReadBufSize) + // set self as stderr implementation (Writer interface) + w.cmd.Stderr = w // add options for i := 0; i < len(options); i++ { options[i](w) } - go func() { - w.watch() - }() - return w, nil } @@ -189,44 +153,36 @@ func (w *Process) Start() error { // to find or Start the script. func (w *Process) Wait() error { const op = errors.Op("process_wait") - err := multierr.Combine(w.cmd.Wait()) + var err error + err = w.cmd.Wait() + // If worker was destroyed, just exit if w.State().Value() == states.StateDestroyed { - return errors.E(op, err) + return nil } - // at this point according to the documentation (see cmd.Wait comment) - // if worker finishes with an error, message will be written to the stderr first - // and then process.cmd.Wait return an error - w.endState = w.cmd.ProcessState + // If state is different, and err is not nil, append it to the errors if err != nil { - w.state.Set(states.StateErrored) - - w.mu.RLock() - // if process return code > 0, here will be an error from stderr (if presents) - if w.stderr.Len() > 0 { - err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String()))) - // stop the stderr buffer - w.stop <- struct{}{} - } - w.mu.RUnlock() - - return multierr.Append(err, w.closeRelay()) + w.State().Set(states.StateErrored) + err = multierr.Combine(err, errors.E(op, err)) } - err = multierr.Append(err, w.closeRelay()) - if err != nil { - w.state.Set(states.StateErrored) - return err + // closeRelay + // at this point according to the documentation (see cmd.Wait comment) + // if worker finishes with an error, message will be written to the stderr first + // and then process.cmd.Wait return an error + err2 := w.closeRelay() + if err2 != nil { + w.State().Set(states.StateErrored) + return multierr.Append(err, errors.E(op, err2)) } - if w.endState.Success() { - w.state.Set(states.StateStopped) + if w.cmd.ProcessState.Success() { + w.State().Set(states.StateStopped) + return nil } - w.stderr.Reset() - - return nil + return err } func (w *Process) closeRelay() error { @@ -272,48 +228,8 @@ func (w *Process) Kill() error { return nil } -// put the pointer, to not allocate new slice -// but erase it len and then return back -func (w *Process) put(data *[]byte) { - w.syncPool.Put(data) -} - -// get pointer to the byte slice -func (w *Process) get() *[]byte { - return w.syncPool.Get().(*[]byte) -} - -// Write appends the contents of pool to the errBuffer, growing the errBuffer as -// needed. The return value n is the length of pool; errBuffer is always nil. -func (w *Process) watch() { - go func() { - for { - select { - case <-w.stop: - buf := w.get() - // read the last data - n, _ := w.rd.Read(*buf) - w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) - w.mu.Lock() - // write new message - // we are sending only n read bytes, without sending previously written message as bytes slice from syncPool - w.stderr.Write((*buf)[:n]) - w.mu.Unlock() - w.put(buf) - return - default: - // read the max 10kb of stderr per one read - buf := w.get() - n, _ := w.rd.Read(*buf) - w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) - w.mu.Lock() - // delete all prev messages - w.stderr.Reset() - // write new message - w.stderr.Write((*buf)[:n]) - w.mu.Unlock() - w.put(buf) - } - } - }() +// Worker stderr +func (w *Process) Write(p []byte) (n int, err error) { + w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p}) + return len(p), nil } |