diff options
Diffstat (limited to 'worker.go')
-rwxr-xr-x | worker.go | 202 |
1 files changed, 107 insertions, 95 deletions
@@ -1,8 +1,9 @@ package roadrunner import ( + "bytes" + //"bytes" "context" - "errors" "fmt" "os" "os/exec" @@ -11,6 +12,7 @@ import ( "sync" "time" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/util" "github.com/spiral/goridge/v2" @@ -101,7 +103,7 @@ type WorkerProcess struct { created time.Time // updates parent supervisor or pool about WorkerProcess events - events *util.EventHandler + events util.EventsHandler // state holds information about current WorkerProcess state, // number of WorkerProcess executions, buf status change time. @@ -120,7 +122,9 @@ type WorkerProcess struct { // errBuffer aggregates stderr output from underlying process. Value can be // receive only once command is completed and all pipes are closed. - errBuffer *errBuffer + // errBuffer *errBuffer + + stderr *bytes.Buffer // channel is being closed once command is complete. // waitDone chan interface{} @@ -145,13 +149,18 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) { events: &util.EventHandler{}, cmd: cmd, state: newState(StateInactive), + stderr: new(bytes.Buffer), } - w.errBuffer = newErrBuffer(w.logCallback) + // small buffer optimization + // at this point we know, that stderr will contain huge messages + w.stderr.Grow(1024) + //w.errBuffer = newErrBuffer(w.logCallback) // piping all stderr to command errBuffer - w.cmd.Stderr = w.errBuffer + //w.cmd.Stderr = w.errBuffer + w.cmd.Stderr = w return w, nil } @@ -169,9 +178,9 @@ func (w *WorkerProcess) Created() time.Time { func (w *WorkerProcess) AddListener(listener util.EventListener) { w.events.AddListener(listener) - w.errBuffer.mu.Lock() - w.errBuffer.enable = true - w.errBuffer.mu.Unlock() + //w.errBuffer.mu.Lock() + //w.errBuffer.enable = true + //w.errBuffer.mu.Unlock() } // State return receive-only WorkerProcess state object, state can be used to safely access @@ -224,16 +233,26 @@ func (w *WorkerProcess) Start() error { // will be wrapped as WorkerError. Method will return error code if php process fails // to find or Start the script. func (w *WorkerProcess) Wait(ctx context.Context) error { + const op = errors.Op("worker process wait") err := multierr.Combine(w.cmd.Wait()) + // at this point according to the documentation (see cmd.Wait comment) + // if worker finishes with an error, message will be written to the stderr first + // and then w.cmd.Wait return an error w.endState = w.cmd.ProcessState if err != nil { w.state.Set(StateErrored) - // if no errors in the events, error might be in the errBuffer - if w.errBuffer.Len() > 0 { - err = multierr.Append(err, errors.New(w.errBuffer.String())) + // if process return code > 0, here will be an error from stderr (if presents) + if w.stderr.Len() > 0 { + err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String()))) + w.stderr.Truncate(0) } + // if no errors in the events, error might be in the errBuffer + //if w.errBuffer.Len() > 0 { + // err = multierr.Append(err, errors.New(w.errBuffer.String())) + //} + return multierr.Append(err, w.closeRelay()) } @@ -262,14 +281,14 @@ func (w *WorkerProcess) closeRelay() error { // Stop sends soft termination command to the WorkerProcess and waits for process completion. func (w *WorkerProcess) Stop(ctx context.Context) error { + w.mu.Lock() + defer w.mu.Unlock() c := make(chan error) go func() { var err error - w.errBuffer.Close() + //w.errBuffer.Close() w.state.Set(StateStopping) - w.mu.Lock() - defer w.mu.Unlock() err = multierr.Append(err, sendControl(w.relay, &stopCommand{Stop: true})) if err != nil { w.state.Set(StateKilling) @@ -294,8 +313,6 @@ func (w *WorkerProcess) Stop(ctx context.Context) error { // error log from the stderr. Does not waits for process completion! func (w *WorkerProcess) Kill() error { w.state.Set(StateKilling) - w.mu.Lock() - defer w.mu.Unlock() err := w.cmd.Process.Signal(os.Kill) if err != nil { return err @@ -304,93 +321,88 @@ func (w *WorkerProcess) Kill() error { return nil } -func (w *WorkerProcess) logCallback(log []byte) { - w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log}) -} - -// thread safe errBuffer -type errBuffer struct { - enable bool - mu sync.RWMutex - buf []byte - last int - wait *time.Timer - // todo: remove update - update chan interface{} - stop chan interface{} - logCallback func(log []byte) -} - -func newErrBuffer(logCallback func(log []byte)) *errBuffer { - eb := &errBuffer{ - buf: make([]byte, 0), - update: make(chan interface{}), - wait: time.NewTimer(WaitDuration), - stop: make(chan interface{}), - logCallback: logCallback, - } - - go func(eb *errBuffer) { - for { - select { - case <-eb.update: - eb.wait.Reset(WaitDuration) - case <-eb.wait.C: - eb.mu.Lock() - if eb.enable && len(eb.buf) > eb.last { - eb.logCallback(eb.buf[eb.last:]) - eb.buf = eb.buf[0:0] - eb.last = len(eb.buf) - } - eb.mu.Unlock() - case <-eb.stop: - eb.wait.Stop() - - eb.mu.Lock() - if eb.enable && len(eb.buf) > eb.last { - eb.logCallback(eb.buf[eb.last:]) - eb.last = len(eb.buf) - } - eb.mu.Unlock() - return - } - } - }(eb) - - return eb -} +//func (w *WorkerProcess) logCallback(log []byte) { +// w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log}) +//} // Len returns the number of buf of the unread portion of the errBuffer; // buf.Len() == len(buf.Bytes()). -func (eb *errBuffer) Len() int { - eb.mu.RLock() - defer eb.mu.RUnlock() - - // currently active message - return len(eb.buf) -} +//func (w *WorkerProcess) Len() int { +// //eb.mu.RLock() +// //defer eb.mu.RUnlock() +// +// // currently active message +// return len(w.stderr) +//} +// Writer is the interface that wraps the basic Write method. +// // Write appends the contents of pool to the errBuffer, growing the errBuffer as // needed. The return value n is the length of pool; errBuffer is always nil. -func (eb *errBuffer) Write(p []byte) (int, error) { - eb.mu.Lock() - eb.buf = append(eb.buf, p...) - eb.mu.Unlock() - eb.update <- nil +func (w *WorkerProcess) Write(p []byte) (int, error) { + w.mu.Lock() + defer w.mu.Unlock() + w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: p}) + w.stderr.Write(p) + //eb.mu.Unlock() + //eb.update <- nil return len(p), nil } -// Strings fetches all errBuffer data into string. -func (eb *errBuffer) String() string { - eb.mu.Lock() - defer eb.mu.Unlock() - - // TODO unsafe operation, use runes - return string(eb.buf) -} - // Close aggregation timer. -func (eb *errBuffer) Close() { - close(eb.stop) -} +//func (w *WorkerProcess) Close() { +// //close(eb.stop) +//} + +// thread safe errBuffer +//type errBuffer struct { +// enable bool +// mu sync.RWMutex +// buf []byte +// last int +// wait *time.Timer +// // todo: remove update +// update chan interface{} +// stop chan interface{} +// logCallback func(log []byte) +//} +// +//func newErrBuffer(logCallback func(log []byte)) *errBuffer { +// eb := &errBuffer{ +// buf: make([]byte, 0), +// update: make(chan interface{}), +// wait: time.NewTimer(WaitDuration), +// stop: make(chan interface{}), +// logCallback: logCallback, +// } +// +// go func(eb *errBuffer) { +// for { +// select { +// case <-eb.update: +// eb.wait.Reset(WaitDuration) +// case <-eb.wait.C: +// eb.mu.Lock() +// if eb.enable && len(eb.buf) > eb.last { +// eb.logCallback(eb.buf[eb.last:]) +// eb.buf = eb.buf[0:0] +// eb.last = len(eb.buf) +// } +// eb.mu.Unlock() +// case <-eb.stop: +// eb.wait.Stop() +// +// eb.mu.Lock() +// if eb.enable && len(eb.buf) > eb.last { +// eb.logCallback(eb.buf[eb.last:]) +// eb.last = len(eb.buf) +// } +// eb.mu.Unlock() +// return +// } +// } +// }(eb) +// +// return eb +//} |