summaryrefslogtreecommitdiff
path: root/pkg/worker/worker.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-02-03 17:31:17 +0300
committerValery Piashchynski <[email protected]>2021-02-03 17:31:17 +0300
commit8eda5dc6f0f7e05d7b3d62e1861af05b49a2574a (patch)
treefae66ad49d2a4624a7caf45a5bf07d53e5c7d26f /pkg/worker/worker.go
parent20a1a5d2eb26090e0eef0e6772330ee2a52526fa (diff)
Fix memory leak in the Worker.go
Diffstat (limited to 'pkg/worker/worker.go')
-rwxr-xr-xpkg/worker/worker.go134
1 files changed, 25 insertions, 109 deletions
diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go
index 2f1f399d..b726c6f1 100755
--- a/pkg/worker/worker.go
+++ b/pkg/worker/worker.go
@@ -1,14 +1,11 @@
package worker
import (
- "bytes"
"fmt"
- "io"
"os"
"os/exec"
"strconv"
"strings"
- "sync"
"time"
"github.com/spiral/errors"
@@ -53,27 +50,11 @@ type Process struct {
// can be nil while process is not started.
pid int
- // stderr aggregates stderr output from underlying process. Value can be
- // receive only once command is completed and all pipes are closed.
- stderr *bytes.Buffer
-
- // channel is being closed once command is complete.
- // waitDone chan interface{}
-
// contains information about resulted process state.
endState *os.ProcessState
- // ensures than only one execution can be run at once.
- mu sync.RWMutex
-
// communication bus with underlying process.
relay relay.Relay
- // rd in a second part of pipe to read from stderr
- rd io.Reader
- // stop signal terminates io.Pipe from reading from stderr
- stop chan struct{}
-
- syncPool sync.Pool
}
// InitBaseWorker creates new Process over given exec.cmd.
@@ -87,33 +68,16 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
events: events.NewEventsHandler(),
cmd: cmd,
state: internal.NewWorkerState(states.StateInactive),
- stderr: new(bytes.Buffer),
- stop: make(chan struct{}, 1),
- // sync pool for STDERR
- // All receivers are pointers
- syncPool: sync.Pool{
- New: func() interface{} {
- buf := make([]byte, ReadBufSize)
- return &buf
- },
- },
}
- w.rd, w.cmd.Stderr = io.Pipe()
-
- // small buffer optimization
- // at this point we know, that stderr will contain huge messages
- w.stderr.Grow(ReadBufSize)
+ // set self as stderr implementation (Writer interface)
+ w.cmd.Stderr = w
// add options
for i := 0; i < len(options); i++ {
options[i](w)
}
- go func() {
- w.watch()
- }()
-
return w, nil
}
@@ -189,44 +153,36 @@ func (w *Process) Start() error {
// to find or Start the script.
func (w *Process) Wait() error {
const op = errors.Op("process_wait")
- err := multierr.Combine(w.cmd.Wait())
+ var err error
+ err = w.cmd.Wait()
+ // If worker was destroyed, just exit
if w.State().Value() == states.StateDestroyed {
- return errors.E(op, err)
+ return nil
}
- // at this point according to the documentation (see cmd.Wait comment)
- // if worker finishes with an error, message will be written to the stderr first
- // and then process.cmd.Wait return an error
- w.endState = w.cmd.ProcessState
+ // If state is different, and err is not nil, append it to the errors
if err != nil {
- w.state.Set(states.StateErrored)
-
- w.mu.RLock()
- // if process return code > 0, here will be an error from stderr (if presents)
- if w.stderr.Len() > 0 {
- err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String())))
- // stop the stderr buffer
- w.stop <- struct{}{}
- }
- w.mu.RUnlock()
-
- return multierr.Append(err, w.closeRelay())
+ w.State().Set(states.StateErrored)
+ err = multierr.Combine(err, errors.E(op, err))
}
- err = multierr.Append(err, w.closeRelay())
- if err != nil {
- w.state.Set(states.StateErrored)
- return err
+ // closeRelay
+ // at this point according to the documentation (see cmd.Wait comment)
+ // if worker finishes with an error, message will be written to the stderr first
+ // and then process.cmd.Wait return an error
+ err2 := w.closeRelay()
+ if err2 != nil {
+ w.State().Set(states.StateErrored)
+ return multierr.Append(err, errors.E(op, err2))
}
- if w.endState.Success() {
- w.state.Set(states.StateStopped)
+ if w.cmd.ProcessState.Success() {
+ w.State().Set(states.StateStopped)
+ return nil
}
- w.stderr.Reset()
-
- return nil
+ return err
}
func (w *Process) closeRelay() error {
@@ -272,48 +228,8 @@ func (w *Process) Kill() error {
return nil
}
-// put the pointer, to not allocate new slice
-// but erase it len and then return back
-func (w *Process) put(data *[]byte) {
- w.syncPool.Put(data)
-}
-
-// get pointer to the byte slice
-func (w *Process) get() *[]byte {
- return w.syncPool.Get().(*[]byte)
-}
-
-// Write appends the contents of pool to the errBuffer, growing the errBuffer as
-// needed. The return value n is the length of pool; errBuffer is always nil.
-func (w *Process) watch() {
- go func() {
- for {
- select {
- case <-w.stop:
- buf := w.get()
- // read the last data
- n, _ := w.rd.Read(*buf)
- w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
- w.mu.Lock()
- // write new message
- // we are sending only n read bytes, without sending previously written message as bytes slice from syncPool
- w.stderr.Write((*buf)[:n])
- w.mu.Unlock()
- w.put(buf)
- return
- default:
- // read the max 10kb of stderr per one read
- buf := w.get()
- n, _ := w.rd.Read(*buf)
- w.events.Push(events.WorkerEvent{Event: events.EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
- w.mu.Lock()
- // delete all prev messages
- w.stderr.Reset()
- // write new message
- w.stderr.Write((*buf)[:n])
- w.mu.Unlock()
- w.put(buf)
- }
- }
- }()
+// Worker stderr
+func (w *Process) Write(p []byte) (n int, err error) {
+ w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p})
+ return len(p), nil
}