diff options
Diffstat (limited to 'worker.go')
-rwxr-xr-x | worker.go | 152 |
1 files changed, 41 insertions, 111 deletions
@@ -1,8 +1,8 @@ package roadrunner import ( + "bytes" "context" - "errors" "fmt" "os" "os/exec" @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/util" "github.com/spiral/goridge/v2" @@ -26,16 +27,28 @@ const ( // EventWorkerKill thrown after WorkerProcess is being forcefully killed. const ( // EventWorkerError triggered after WorkerProcess. Except payload to be error. - EventWorkerError int64 = iota + 200 + EventWorkerError Event = iota + 200 // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. EventWorkerLog ) +type Event int64 + +func (ev Event) String() string { + switch ev { + case EventWorkerError: + return "EventWorkerError" + case EventWorkerLog: + return "EventWorkerLog" + } + return "Unknown event type" +} + // WorkerEvent wraps worker events. type WorkerEvent struct { // Event id, see below. - Event int64 + Event Event // Worker triggered the event. Worker WorkerBase @@ -67,7 +80,7 @@ type WorkerBase interface { // complete and will return process error (if any), if stderr is presented it's value // will be wrapped as WorkerError. Method will return error code if php process fails // to find or Start the script. - Wait(ctx context.Context) error + Wait() error // Stop sends soft termination command to the WorkerProcess and waits for process completion. Stop(ctx context.Context) error @@ -89,7 +102,7 @@ type WorkerProcess struct { created time.Time // updates parent supervisor or pool about WorkerProcess events - events *util.EventHandler + events util.EventsHandler // state holds information about current WorkerProcess state, // number of WorkerProcess executions, buf status change time. @@ -106,9 +119,9 @@ type WorkerProcess struct { // can be nil while process is not started. pid int - // errBuffer aggregates stderr output from underlying process. Value can be + // stderr aggregates stderr output from underlying process. Value can be // receive only once command is completed and all pipes are closed. - errBuffer *errBuffer + stderr *bytes.Buffer // channel is being closed once command is complete. // waitDone chan interface{} @@ -133,13 +146,14 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) { events: &util.EventHandler{}, cmd: cmd, state: newState(StateInactive), + stderr: new(bytes.Buffer), } - w.errBuffer = newErrBuffer(w.logCallback) - - // piping all stderr to command errBuffer - w.cmd.Stderr = w.errBuffer + // small buffer optimization + // at this point we know, that stderr will contain huge messages + w.stderr.Grow(1024) + w.cmd.Stderr = w return w, nil } @@ -156,10 +170,6 @@ func (w *WorkerProcess) Created() time.Time { // AddListener registers new worker event listener. func (w *WorkerProcess) AddListener(listener util.EventListener) { w.events.AddListener(listener) - - w.errBuffer.mu.Lock() - w.errBuffer.enable = true - w.errBuffer.mu.Unlock() } // State return receive-only WorkerProcess state object, state can be used to safely access @@ -201,9 +211,7 @@ func (w *WorkerProcess) Start() error { if err != nil { return err } - w.pid = w.cmd.Process.Pid - return nil } @@ -211,15 +219,19 @@ func (w *WorkerProcess) Start() error { // complete and will return process error (if any), if stderr is presented it's value // will be wrapped as WorkerError. Method will return error code if php process fails // to find or Start the script. -func (w *WorkerProcess) Wait(ctx context.Context) error { +func (w *WorkerProcess) Wait() error { + const op = errors.Op("worker process wait") err := multierr.Combine(w.cmd.Wait()) + // at this point according to the documentation (see cmd.Wait comment) + // if worker finishes with an error, message will be written to the stderr first + // and then w.cmd.Wait return an error w.endState = w.cmd.ProcessState if err != nil { w.state.Set(StateErrored) - // if no errors in the events, error might be in the errBuffer - if w.errBuffer.Len() > 0 { - err = multierr.Append(err, errors.New(w.errBuffer.String())) + // if process return code > 0, here will be an error from stderr (if presents) + if w.stderr.Len() > 0 { + err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String()))) } return multierr.Append(err, w.closeRelay()) @@ -254,10 +266,7 @@ func (w *WorkerProcess) Stop(ctx context.Context) error { go func() { var err error - w.errBuffer.Close() w.state.Set(StateStopping) - w.mu.Lock() - defer w.mu.Unlock() err = multierr.Append(err, sendControl(w.relay, &stopCommand{Stop: true})) if err != nil { w.state.Set(StateKilling) @@ -282,8 +291,6 @@ func (w *WorkerProcess) Stop(ctx context.Context) error { // error log from the stderr. Does not waits for process completion! func (w *WorkerProcess) Kill() error { w.state.Set(StateKilling) - w.mu.Lock() - defer w.mu.Unlock() err := w.cmd.Process.Signal(os.Kill) if err != nil { return err @@ -292,93 +299,16 @@ func (w *WorkerProcess) Kill() error { return nil } -func (w *WorkerProcess) logCallback(log []byte) { - w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log}) -} - -// thread safe errBuffer -type errBuffer struct { - enable bool - mu sync.RWMutex - buf []byte - last int - wait *time.Timer - // todo: remove update - update chan interface{} - stop chan interface{} - logCallback func(log []byte) -} - -func newErrBuffer(logCallback func(log []byte)) *errBuffer { - eb := &errBuffer{ - buf: make([]byte, 0), - update: make(chan interface{}), - wait: time.NewTimer(WaitDuration), - stop: make(chan interface{}), - logCallback: logCallback, - } - - go func(eb *errBuffer) { - for { - select { - case <-eb.update: - eb.wait.Reset(WaitDuration) - case <-eb.wait.C: - eb.mu.Lock() - if eb.enable && len(eb.buf) > eb.last { - eb.logCallback(eb.buf[eb.last:]) - eb.buf = eb.buf[0:0] - eb.last = len(eb.buf) - } - eb.mu.Unlock() - case <-eb.stop: - eb.wait.Stop() - - eb.mu.Lock() - if eb.enable && len(eb.buf) > eb.last { - eb.logCallback(eb.buf[eb.last:]) - eb.last = len(eb.buf) - } - eb.mu.Unlock() - return - } - } - }(eb) - - return eb -} - -// Len returns the number of buf of the unread portion of the errBuffer; -// buf.Len() == len(buf.Bytes()). -func (eb *errBuffer) Len() int { - eb.mu.RLock() - defer eb.mu.RUnlock() - - // currently active message - return len(eb.buf) -} - // Write appends the contents of pool to the errBuffer, growing the errBuffer as // needed. The return value n is the length of pool; errBuffer is always nil. -func (eb *errBuffer) Write(p []byte) (int, error) { - eb.mu.Lock() - eb.buf = append(eb.buf, p...) - eb.mu.Unlock() - eb.update <- nil +func (w *WorkerProcess) Write(p []byte) (int, error) { + w.mu.Lock() + defer w.mu.Unlock() + // clean all previous messages in the stderr + w.stderr.Truncate(0) + w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: p}) + // write new message + w.stderr.Write(p) return len(p), nil } - -// Strings fetches all errBuffer data into string. -func (eb *errBuffer) String() string { - eb.mu.Lock() - defer eb.mu.Unlock() - - // TODO unsafe operation, use runes - return string(eb.buf) -} - -// Close aggregation timer. -func (eb *errBuffer) Close() { - close(eb.stop) -} |