summaryrefslogtreecommitdiff
path: root/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'worker.go')
-rwxr-xr-xworker.go202
1 files changed, 107 insertions, 95 deletions
diff --git a/worker.go b/worker.go
index f815ddea..8cfc05a8 100755
--- a/worker.go
+++ b/worker.go
@@ -1,8 +1,9 @@
package roadrunner
import (
+ "bytes"
+ //"bytes"
"context"
- "errors"
"fmt"
"os"
"os/exec"
@@ -11,6 +12,7 @@ import (
"sync"
"time"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/util"
"github.com/spiral/goridge/v2"
@@ -101,7 +103,7 @@ type WorkerProcess struct {
created time.Time
// updates parent supervisor or pool about WorkerProcess events
- events *util.EventHandler
+ events util.EventsHandler
// state holds information about current WorkerProcess state,
// number of WorkerProcess executions, buf status change time.
@@ -120,7 +122,9 @@ type WorkerProcess struct {
// errBuffer aggregates stderr output from underlying process. Value can be
// receive only once command is completed and all pipes are closed.
- errBuffer *errBuffer
+ // errBuffer *errBuffer
+
+ stderr *bytes.Buffer
// channel is being closed once command is complete.
// waitDone chan interface{}
@@ -145,13 +149,18 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) {
events: &util.EventHandler{},
cmd: cmd,
state: newState(StateInactive),
+ stderr: new(bytes.Buffer),
}
- w.errBuffer = newErrBuffer(w.logCallback)
+ // small buffer optimization
+ // at this point we know, that stderr will contain huge messages
+ w.stderr.Grow(1024)
+ //w.errBuffer = newErrBuffer(w.logCallback)
// piping all stderr to command errBuffer
- w.cmd.Stderr = w.errBuffer
+ //w.cmd.Stderr = w.errBuffer
+ w.cmd.Stderr = w
return w, nil
}
@@ -169,9 +178,9 @@ func (w *WorkerProcess) Created() time.Time {
func (w *WorkerProcess) AddListener(listener util.EventListener) {
w.events.AddListener(listener)
- w.errBuffer.mu.Lock()
- w.errBuffer.enable = true
- w.errBuffer.mu.Unlock()
+ //w.errBuffer.mu.Lock()
+ //w.errBuffer.enable = true
+ //w.errBuffer.mu.Unlock()
}
// State return receive-only WorkerProcess state object, state can be used to safely access
@@ -224,16 +233,26 @@ func (w *WorkerProcess) Start() error {
// will be wrapped as WorkerError. Method will return error code if php process fails
// to find or Start the script.
func (w *WorkerProcess) Wait(ctx context.Context) error {
+ const op = errors.Op("worker process wait")
err := multierr.Combine(w.cmd.Wait())
+ // at this point according to the documentation (see cmd.Wait comment)
+ // if worker finishes with an error, message will be written to the stderr first
+ // and then w.cmd.Wait return an error
w.endState = w.cmd.ProcessState
if err != nil {
w.state.Set(StateErrored)
- // if no errors in the events, error might be in the errBuffer
- if w.errBuffer.Len() > 0 {
- err = multierr.Append(err, errors.New(w.errBuffer.String()))
+ // if process return code > 0, here will be an error from stderr (if presents)
+ if w.stderr.Len() > 0 {
+ err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String())))
+ w.stderr.Truncate(0)
}
+ // if no errors in the events, error might be in the errBuffer
+ //if w.errBuffer.Len() > 0 {
+ // err = multierr.Append(err, errors.New(w.errBuffer.String()))
+ //}
+
return multierr.Append(err, w.closeRelay())
}
@@ -262,14 +281,14 @@ func (w *WorkerProcess) closeRelay() error {
// Stop sends soft termination command to the WorkerProcess and waits for process completion.
func (w *WorkerProcess) Stop(ctx context.Context) error {
+ w.mu.Lock()
+ defer w.mu.Unlock()
c := make(chan error)
go func() {
var err error
- w.errBuffer.Close()
+ //w.errBuffer.Close()
w.state.Set(StateStopping)
- w.mu.Lock()
- defer w.mu.Unlock()
err = multierr.Append(err, sendControl(w.relay, &stopCommand{Stop: true}))
if err != nil {
w.state.Set(StateKilling)
@@ -294,8 +313,6 @@ func (w *WorkerProcess) Stop(ctx context.Context) error {
// error log from the stderr. Does not waits for process completion!
func (w *WorkerProcess) Kill() error {
w.state.Set(StateKilling)
- w.mu.Lock()
- defer w.mu.Unlock()
err := w.cmd.Process.Signal(os.Kill)
if err != nil {
return err
@@ -304,93 +321,88 @@ func (w *WorkerProcess) Kill() error {
return nil
}
-func (w *WorkerProcess) logCallback(log []byte) {
- w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log})
-}
-
-// thread safe errBuffer
-type errBuffer struct {
- enable bool
- mu sync.RWMutex
- buf []byte
- last int
- wait *time.Timer
- // todo: remove update
- update chan interface{}
- stop chan interface{}
- logCallback func(log []byte)
-}
-
-func newErrBuffer(logCallback func(log []byte)) *errBuffer {
- eb := &errBuffer{
- buf: make([]byte, 0),
- update: make(chan interface{}),
- wait: time.NewTimer(WaitDuration),
- stop: make(chan interface{}),
- logCallback: logCallback,
- }
-
- go func(eb *errBuffer) {
- for {
- select {
- case <-eb.update:
- eb.wait.Reset(WaitDuration)
- case <-eb.wait.C:
- eb.mu.Lock()
- if eb.enable && len(eb.buf) > eb.last {
- eb.logCallback(eb.buf[eb.last:])
- eb.buf = eb.buf[0:0]
- eb.last = len(eb.buf)
- }
- eb.mu.Unlock()
- case <-eb.stop:
- eb.wait.Stop()
-
- eb.mu.Lock()
- if eb.enable && len(eb.buf) > eb.last {
- eb.logCallback(eb.buf[eb.last:])
- eb.last = len(eb.buf)
- }
- eb.mu.Unlock()
- return
- }
- }
- }(eb)
-
- return eb
-}
+//func (w *WorkerProcess) logCallback(log []byte) {
+// w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log})
+//}
// Len returns the number of buf of the unread portion of the errBuffer;
// buf.Len() == len(buf.Bytes()).
-func (eb *errBuffer) Len() int {
- eb.mu.RLock()
- defer eb.mu.RUnlock()
-
- // currently active message
- return len(eb.buf)
-}
+//func (w *WorkerProcess) Len() int {
+// //eb.mu.RLock()
+// //defer eb.mu.RUnlock()
+//
+// // currently active message
+// return len(w.stderr)
+//}
+// Writer is the interface that wraps the basic Write method.
+//
// Write appends the contents of pool to the errBuffer, growing the errBuffer as
// needed. The return value n is the length of pool; errBuffer is always nil.
-func (eb *errBuffer) Write(p []byte) (int, error) {
- eb.mu.Lock()
- eb.buf = append(eb.buf, p...)
- eb.mu.Unlock()
- eb.update <- nil
+func (w *WorkerProcess) Write(p []byte) (int, error) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+ w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: p})
+ w.stderr.Write(p)
+ //eb.mu.Unlock()
+ //eb.update <- nil
return len(p), nil
}
-// Strings fetches all errBuffer data into string.
-func (eb *errBuffer) String() string {
- eb.mu.Lock()
- defer eb.mu.Unlock()
-
- // TODO unsafe operation, use runes
- return string(eb.buf)
-}
-
// Close aggregation timer.
-func (eb *errBuffer) Close() {
- close(eb.stop)
-}
+//func (w *WorkerProcess) Close() {
+// //close(eb.stop)
+//}
+
+// thread safe errBuffer
+//type errBuffer struct {
+// enable bool
+// mu sync.RWMutex
+// buf []byte
+// last int
+// wait *time.Timer
+// // todo: remove update
+// update chan interface{}
+// stop chan interface{}
+// logCallback func(log []byte)
+//}
+//
+//func newErrBuffer(logCallback func(log []byte)) *errBuffer {
+// eb := &errBuffer{
+// buf: make([]byte, 0),
+// update: make(chan interface{}),
+// wait: time.NewTimer(WaitDuration),
+// stop: make(chan interface{}),
+// logCallback: logCallback,
+// }
+//
+// go func(eb *errBuffer) {
+// for {
+// select {
+// case <-eb.update:
+// eb.wait.Reset(WaitDuration)
+// case <-eb.wait.C:
+// eb.mu.Lock()
+// if eb.enable && len(eb.buf) > eb.last {
+// eb.logCallback(eb.buf[eb.last:])
+// eb.buf = eb.buf[0:0]
+// eb.last = len(eb.buf)
+// }
+// eb.mu.Unlock()
+// case <-eb.stop:
+// eb.wait.Stop()
+//
+// eb.mu.Lock()
+// if eb.enable && len(eb.buf) > eb.last {
+// eb.logCallback(eb.buf[eb.last:])
+// eb.last = len(eb.buf)
+// }
+// eb.mu.Unlock()
+// return
+// }
+// }
+// }(eb)
+//
+// return eb
+//}