diff options
author | Valery Piashchynski <[email protected]> | 2020-12-15 14:28:30 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-12-15 14:28:30 +0300 |
commit | 21b51367e27f5a1b166459a115e4655d07a5d832 (patch) | |
tree | c3257a2ac38f0688e78ca2c9eeb160fb7a84c55d /worker.go | |
parent | 08f073f3bdc1288db68235c098c3a2109c6e7667 (diff) | |
parent | d39a0735fe21d21c5aae20c4780458433a42250a (diff) |
Merge branch '2.0' into plugin/reloader
# Conflicts:
# go.mod
# sync_worker.go
Diffstat (limited to 'worker.go')
-rwxr-xr-x | worker.go | 81 |
1 files changed, 67 insertions, 14 deletions
@@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "os" "os/exec" "strconv" @@ -14,7 +15,7 @@ import ( "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/util" - "github.com/spiral/goridge/v2" + "github.com/spiral/goridge/v3" "go.uber.org/multierr" ) @@ -57,6 +58,13 @@ type WorkerEvent struct { Payload interface{} } +var pool = sync.Pool{ + New: func() interface{} { + buf := make([]byte, 10240) + return &buf + }, +} + type WorkerBase interface { fmt.Stringer @@ -130,10 +138,12 @@ type WorkerProcess struct { endState *os.ProcessState // ensures than only one execution can be run at once. - mu sync.Mutex + mu sync.RWMutex // communication bus with underlying process. relay goridge.Relay + rd io.Reader + stop chan struct{} } // InitBaseWorker creates new WorkerProcess over given exec.cmd. @@ -147,13 +157,19 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) { cmd: cmd, state: newState(StateInactive), stderr: new(bytes.Buffer), + stop: make(chan struct{}, 1), } + w.rd, w.cmd.Stderr = io.Pipe() + // small buffer optimization // at this point we know, that stderr will contain huge messages - w.stderr.Grow(1024) + w.stderr.Grow(10240) + + go func() { + w.watch() + }() - w.cmd.Stderr = w return w, nil } @@ -222,6 +238,7 @@ func (w *WorkerProcess) Start() error { func (w *WorkerProcess) Wait() error { const op = errors.Op("worker process wait") err := multierr.Combine(w.cmd.Wait()) + // at this point according to the documentation (see cmd.Wait comment) // if worker finishes with an error, message will be written to the stderr first // and then w.cmd.Wait return an error @@ -229,10 +246,14 @@ func (w *WorkerProcess) Wait() error { if err != nil { w.state.Set(StateErrored) + w.mu.RLock() // if process return code > 0, here will be an error from stderr (if presents) if w.stderr.Len() > 0 { err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String()))) + // stop the stderr buffer + w.stop <- struct{}{} } + w.mu.RUnlock() return multierr.Append(err, w.closeRelay()) } @@ -299,16 +320,48 @@ func (w *WorkerProcess) Kill() error { return nil } +// put the pointer, to not allocate new slice +// but erase it len and then return back +func (w *WorkerProcess) put(data *[]byte) { + *data = (*data)[:0] + *data = (*data)[:cap(*data)] + + pool.Put(data) +} + +// get pointer to the byte slice +func (w *WorkerProcess) get() *[]byte { + return pool.Get().(*[]byte) +} + // Write appends the contents of pool to the errBuffer, growing the errBuffer as // needed. The return value n is the length of pool; errBuffer is always nil. -func (w *WorkerProcess) Write(p []byte) (int, error) { - w.mu.Lock() - defer w.mu.Unlock() - // clean all previous messages in the stderr - w.stderr.Truncate(0) - w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: p}) - // write new message - w.stderr.Write(p) - - return len(p), nil +func (w *WorkerProcess) watch() { + go func() { + for { + select { + case <-w.stop: + buf := w.get() + // read the last data + n, _ := w.rd.Read(*buf) + w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) + w.mu.Lock() + // write new message + w.stderr.Write((*buf)[:n]) + w.mu.Unlock() + w.put(buf) + return + default: + // read the max 10kb of stderr per one read + buf := w.get() + n, _ := w.rd.Read(*buf) + w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) + w.mu.Lock() + // write new message + w.stderr.Write((*buf)[:n]) + w.mu.Unlock() + w.put(buf) + } + } + }() } |