summaryrefslogtreecommitdiff
path: root/worker.go
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2020-12-15 14:28:30 +0300
committerValery Piashchynski <[email protected]>2020-12-15 14:28:30 +0300
commit21b51367e27f5a1b166459a115e4655d07a5d832 (patch)
treec3257a2ac38f0688e78ca2c9eeb160fb7a84c55d /worker.go
parent08f073f3bdc1288db68235c098c3a2109c6e7667 (diff)
parentd39a0735fe21d21c5aae20c4780458433a42250a (diff)
Merge branch '2.0' into plugin/reloader
# Conflicts: # go.mod # sync_worker.go
Diffstat (limited to 'worker.go')
-rwxr-xr-xworker.go81
1 files changed, 67 insertions, 14 deletions
diff --git a/worker.go b/worker.go
index 402e9b90..d860b3af 100755
--- a/worker.go
+++ b/worker.go
@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
+ "io"
"os"
"os/exec"
"strconv"
@@ -14,7 +15,7 @@ import (
"github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/util"
- "github.com/spiral/goridge/v2"
+ "github.com/spiral/goridge/v3"
"go.uber.org/multierr"
)
@@ -57,6 +58,13 @@ type WorkerEvent struct {
Payload interface{}
}
+var pool = sync.Pool{
+ New: func() interface{} {
+ buf := make([]byte, 10240)
+ return &buf
+ },
+}
+
type WorkerBase interface {
fmt.Stringer
@@ -130,10 +138,12 @@ type WorkerProcess struct {
endState *os.ProcessState
// ensures than only one execution can be run at once.
- mu sync.Mutex
+ mu sync.RWMutex
// communication bus with underlying process.
relay goridge.Relay
+ rd io.Reader
+ stop chan struct{}
}
// InitBaseWorker creates new WorkerProcess over given exec.cmd.
@@ -147,13 +157,19 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) {
cmd: cmd,
state: newState(StateInactive),
stderr: new(bytes.Buffer),
+ stop: make(chan struct{}, 1),
}
+ w.rd, w.cmd.Stderr = io.Pipe()
+
// small buffer optimization
// at this point we know, that stderr will contain huge messages
- w.stderr.Grow(1024)
+ w.stderr.Grow(10240)
+
+ go func() {
+ w.watch()
+ }()
- w.cmd.Stderr = w
return w, nil
}
@@ -222,6 +238,7 @@ func (w *WorkerProcess) Start() error {
func (w *WorkerProcess) Wait() error {
const op = errors.Op("worker process wait")
err := multierr.Combine(w.cmd.Wait())
+
// at this point according to the documentation (see cmd.Wait comment)
// if worker finishes with an error, message will be written to the stderr first
// and then w.cmd.Wait return an error
@@ -229,10 +246,14 @@ func (w *WorkerProcess) Wait() error {
if err != nil {
w.state.Set(StateErrored)
+ w.mu.RLock()
// if process return code > 0, here will be an error from stderr (if presents)
if w.stderr.Len() > 0 {
err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String())))
+ // stop the stderr buffer
+ w.stop <- struct{}{}
}
+ w.mu.RUnlock()
return multierr.Append(err, w.closeRelay())
}
@@ -299,16 +320,48 @@ func (w *WorkerProcess) Kill() error {
return nil
}
+// put the pointer, to not allocate new slice
+// but erase it len and then return back
+func (w *WorkerProcess) put(data *[]byte) {
+ *data = (*data)[:0]
+ *data = (*data)[:cap(*data)]
+
+ pool.Put(data)
+}
+
+// get pointer to the byte slice
+func (w *WorkerProcess) get() *[]byte {
+ return pool.Get().(*[]byte)
+}
+
// Write appends the contents of pool to the errBuffer, growing the errBuffer as
// needed. The return value n is the length of pool; errBuffer is always nil.
-func (w *WorkerProcess) Write(p []byte) (int, error) {
- w.mu.Lock()
- defer w.mu.Unlock()
- // clean all previous messages in the stderr
- w.stderr.Truncate(0)
- w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: p})
- // write new message
- w.stderr.Write(p)
-
- return len(p), nil
+func (w *WorkerProcess) watch() {
+ go func() {
+ for {
+ select {
+ case <-w.stop:
+ buf := w.get()
+ // read the last data
+ n, _ := w.rd.Read(*buf)
+ w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
+ w.mu.Lock()
+ // write new message
+ w.stderr.Write((*buf)[:n])
+ w.mu.Unlock()
+ w.put(buf)
+ return
+ default:
+ // read the max 10kb of stderr per one read
+ buf := w.get()
+ n, _ := w.rd.Read(*buf)
+ w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: (*buf)[:n]})
+ w.mu.Lock()
+ // write new message
+ w.stderr.Write((*buf)[:n])
+ w.mu.Unlock()
+ w.put(buf)
+ }
+ }
+ }()
}