summaryrefslogtreecommitdiff
path: root/worker.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-14 23:07:04 +0300
committerValery Piashchynski <[email protected]>2020-12-14 23:07:04 +0300
commitfd0818ef7a8735f5ea3810eb8e0def5f7caae381 (patch)
tree27e8a40f9c4c32c855cf457165aff434cf83c8ac /worker.go
parent497112cf98d908c6ad99ac971b9ccd4a8d2f22c3 (diff)
Exeriment with piped stderr
Diffstat (limited to 'worker.go')
-rwxr-xr-xworker.go69
1 files changed, 56 insertions, 13 deletions
diff --git a/worker.go b/worker.go
index 5682f551..f17f2c07 100755
--- a/worker.go
+++ b/worker.go
@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
+ "io"
"os"
"os/exec"
"strconv"
@@ -130,10 +131,12 @@ type WorkerProcess struct {
endState *os.ProcessState
// ensures than only one execution can be run at once.
- mu sync.Mutex
+ mu sync.RWMutex
// communication bus with underlying process.
relay goridge.Relay
+ rd io.Reader
+ stop chan struct{}
}
// InitBaseWorker creates new WorkerProcess over given exec.cmd.
@@ -147,13 +150,19 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) {
cmd: cmd,
state: newState(StateInactive),
stderr: new(bytes.Buffer),
+ stop: make(chan struct{}, 1),
}
+ w.rd, w.cmd.Stderr = io.Pipe()
+
// small buffer optimization
// at this point we know, that stderr will contain huge messages
- w.stderr.Grow(1024)
+ w.stderr.Grow(10240)
+
+ go func() {
+ w.watch()
+ }()
- w.cmd.Stderr = w
return w, nil
}
@@ -222,6 +231,7 @@ func (w *WorkerProcess) Start() error {
func (w *WorkerProcess) Wait() error {
const op = errors.Op("worker process wait")
err := multierr.Combine(w.cmd.Wait())
+
// at this point according to the documentation (see cmd.Wait comment)
// if worker finishes with an error, message will be written to the stderr first
// and then w.cmd.Wait return an error
@@ -229,10 +239,12 @@ func (w *WorkerProcess) Wait() error {
if err != nil {
w.state.Set(StateErrored)
+ w.mu.RLock()
// if process return code > 0, here will be an error from stderr (if presents)
if w.stderr.Len() > 0 {
err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String())))
}
+ w.mu.RUnlock()
return multierr.Append(err, w.closeRelay())
}
@@ -301,14 +313,45 @@ func (w *WorkerProcess) Kill() error {
// Write appends the contents of pool to the errBuffer, growing the errBuffer as
// needed. The return value n is the length of pool; errBuffer is always nil.
-func (w *WorkerProcess) Write(p []byte) (int, error) {
- w.mu.Lock()
- defer w.mu.Unlock()
- // clean all previous messages in the stderr
- w.stderr.Truncate(0)
- w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: p})
- // write new message
- w.stderr.Write(p)
-
- return len(p), nil
+func (w *WorkerProcess) watch() {
+ proxy := make(chan [10240]byte, 5)
+
+ go func() {
+ for {
+ select {
+ case <-w.stop:
+ // read the last data
+ var buf [10240]byte
+ _, err := w.rd.Read(buf[:])
+ if err != nil {
+ panic(err)
+ }
+ proxy <- buf
+ // and close
+ close(proxy)
+ return
+ default:
+ var buf [10240]byte
+ _, err := w.rd.Read(buf[:])
+ if err != nil {
+ panic(err)
+ }
+ proxy <- buf
+ }
+ }
+ }()
+
+ for {
+ select {
+ case payload, ok := <-proxy:
+ if !ok {
+ return
+ }
+ w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: payload[:]})
+ w.mu.Lock()
+ // write new message
+ w.stderr.Write(payload[:])
+ w.mu.Unlock()
+ }
+ }
}