diff options
Diffstat (limited to 'worker.go')
-rwxr-xr-x | worker.go | 69 |
1 files changed, 56 insertions, 13 deletions
@@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "io" "os" "os/exec" "strconv" @@ -130,10 +131,12 @@ type WorkerProcess struct { endState *os.ProcessState // ensures than only one execution can be run at once. - mu sync.Mutex + mu sync.RWMutex // communication bus with underlying process. relay goridge.Relay + rd io.Reader + stop chan struct{} } // InitBaseWorker creates new WorkerProcess over given exec.cmd. @@ -147,13 +150,19 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) { cmd: cmd, state: newState(StateInactive), stderr: new(bytes.Buffer), + stop: make(chan struct{}, 1), } + w.rd, w.cmd.Stderr = io.Pipe() + // small buffer optimization // at this point we know, that stderr will contain huge messages - w.stderr.Grow(1024) + w.stderr.Grow(10240) + + go func() { + w.watch() + }() - w.cmd.Stderr = w return w, nil } @@ -222,6 +231,7 @@ func (w *WorkerProcess) Start() error { func (w *WorkerProcess) Wait() error { const op = errors.Op("worker process wait") err := multierr.Combine(w.cmd.Wait()) + // at this point according to the documentation (see cmd.Wait comment) // if worker finishes with an error, message will be written to the stderr first // and then w.cmd.Wait return an error @@ -229,10 +239,12 @@ func (w *WorkerProcess) Wait() error { if err != nil { w.state.Set(StateErrored) + w.mu.RLock() // if process return code > 0, here will be an error from stderr (if presents) if w.stderr.Len() > 0 { err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String()))) } + w.mu.RUnlock() return multierr.Append(err, w.closeRelay()) } @@ -301,14 +313,45 @@ func (w *WorkerProcess) Kill() error { // Write appends the contents of pool to the errBuffer, growing the errBuffer as // needed. The return value n is the length of pool; errBuffer is always nil. -func (w *WorkerProcess) Write(p []byte) (int, error) { - w.mu.Lock() - defer w.mu.Unlock() - // clean all previous messages in the stderr - w.stderr.Truncate(0) - w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: p}) - // write new message - w.stderr.Write(p) - - return len(p), nil +func (w *WorkerProcess) watch() { + proxy := make(chan [10240]byte, 5) + + go func() { + for { + select { + case <-w.stop: + // read the last data + var buf [10240]byte + _, err := w.rd.Read(buf[:]) + if err != nil { + panic(err) + } + proxy <- buf + // and close + close(proxy) + return + default: + var buf [10240]byte + _, err := w.rd.Read(buf[:]) + if err != nil { + panic(err) + } + proxy <- buf + } + } + }() + + for { + select { + case payload, ok := <-proxy: + if !ok { + return + } + w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: payload[:]}) + w.mu.Lock() + // write new message + w.stderr.Write(payload[:]) + w.mu.Unlock() + } + } } |