diff options
Diffstat (limited to 'worker.go')
-rw-r--r-- | worker.go | 459 |
1 files changed, 299 insertions, 160 deletions
@@ -1,6 +1,8 @@ package roadrunner import ( + "context" + "errors" "fmt" "os" "os/exec" @@ -9,36 +11,102 @@ import ( "sync" "time" - "github.com/pkg/errors" "github.com/spiral/goridge/v2" + "go.uber.org/multierr" ) -// Worker - supervised process with api over goridge.Relay. -type Worker struct { - // Pid of the process, points to Pid of underlying process and - // can be nil while process is not started. - Pid *int +// EventWorkerKill thrown after WorkerProcess is being forcefully killed. +const ( + // EventWorkerError triggered after WorkerProcess. Except payload to be error. + EventWorkerError int64 = iota + 100 + + // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. + EventWorkerLog + + // EventWorkerWaitDone triggered when worker exit from process Wait + EventWorkerWaitDone + + EventWorkerBufferClosed + + EventRelayCloseError + + EventWorkerProcessError +) + +const ( + // WaitDuration - for how long error buffer should attempt to aggregate error messages + // before merging output together since lastError update (required to keep error update together). + WaitDuration = 100 * time.Millisecond +) + +// todo: write comment +type WorkerEvent struct { + Event int64 + Worker WorkerBase + Payload interface{} +} + +type WorkerBase interface { + fmt.Stringer + + Created() time.Time + + Events() <-chan WorkerEvent - // Created indicates at what time worker has been created. - Created time.Time + Pid() int64 - // state holds information about current worker state, - // number of worker executions, buf status change time. + // State return receive-only WorkerProcess state object, state can be used to safely access + // WorkerProcess status, time when status changed and number of WorkerProcess executions. + State() State + + // Start used to run Cmd and immediately return + Start() error + // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is + // complete and will return process error (if any), if stderr is presented it's value + // will be wrapped as WorkerError. Method will return error code if php process fails + // to find or Start the script. + Wait(ctx context.Context) error + + // Stop sends soft termination command to the WorkerProcess and waits for process completion. + Stop(ctx context.Context) error + // Kill kills underlying process, make sure to call Wait() func to gather + // error log from the stderr. Does not waits for process completion! + Kill(ctx context.Context) error + // Relay returns attached to worker goridge relay + Relay() goridge.Relay + // AttachRelay used to attach goridge relay to the worker process + AttachRelay(rl goridge.Relay) +} + +// WorkerProcess - supervised process with api over goridge.Relay. +type WorkerProcess struct { + // created indicates at what time WorkerProcess has been created. + created time.Time + + // updates parent supervisor or pool about WorkerProcess events + events chan WorkerEvent + + // state holds information about current WorkerProcess state, + // number of WorkerProcess executions, buf status change time. // publicly this object is receive-only and protected using Mutex // and atomic counter. state *state // underlying command with associated process, command must be - // provided to worker from outside in non-started form. CmdSource - // stdErr direction will be handled by worker to aggregate error message. + // provided to WorkerProcess from outside in non-started form. CmdSource + // stdErr direction will be handled by WorkerProcess to aggregate error message. cmd *exec.Cmd - // err aggregates stderr output from underlying process. Value can be + // pid of the process, points to pid of underlying process and + // can be nil while process is not started. + pid int + + // errBuffer aggregates stderr output from underlying process. Value can be // receive only once command is completed and all pipes are closed. - err *errBuffer + errBuffer *errBuffer // channel is being closed once command is complete. - waitDone chan interface{} + // waitDone chan interface{} // contains information about resulted process state. endState *os.ProcessState @@ -47,212 +115,283 @@ type Worker struct { mu sync.Mutex // communication bus with underlying process. - rl goridge.Relay + relay goridge.Relay } -// newWorker creates new worker over given exec.cmd. -func newWorker(cmd *exec.Cmd) (*Worker, error) { +// InitBaseWorker creates new WorkerProcess over given exec.cmd. +func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) { if cmd.Process != nil { return nil, fmt.Errorf("can't attach to running process") } - - w := &Worker{ - Created: time.Now(), - cmd: cmd, - err: newErrBuffer(), - waitDone: make(chan interface{}), - state: newState(StateInactive), + w := &WorkerProcess{ + created: time.Now(), + events: make(chan WorkerEvent, 10), + cmd: cmd, + state: newState(StateInactive), } + w.errBuffer = newErrBuffer(w.logCallback) + // piping all stderr to command errBuffer - w.cmd.Stderr = w.err + w.cmd.Stderr = w.errBuffer return w, nil } -// State return receive-only worker state object, state can be used to safely access -// worker status, time when status changed and number of worker executions. -func (w *Worker) State() State { +func (w *WorkerProcess) Created() time.Time { + return w.created +} + +func (w *WorkerProcess) Pid() int64 { + return int64(w.pid) +} + +// State return receive-only WorkerProcess state object, state can be used to safely access +// WorkerProcess status, time when status changed and number of WorkerProcess executions. +func (w *WorkerProcess) State() State { return w.state } -// String returns worker description. -func (w *Worker) String() string { - state := w.state.String() - if w.Pid != nil { - state = state + ", pid:" + strconv.Itoa(*w.Pid) +// State return receive-only WorkerProcess state object, state can be used to safely access +// WorkerProcess status, time when status changed and number of WorkerProcess executions. +func (w *WorkerProcess) AttachRelay(rl goridge.Relay) { + w.relay = rl +} + +// State return receive-only WorkerProcess state object, state can be used to safely access +// WorkerProcess status, time when status changed and number of WorkerProcess executions. +func (w *WorkerProcess) Relay() goridge.Relay { + return w.relay +} + +// String returns WorkerProcess description. fmt.Stringer interface +func (w *WorkerProcess) String() string { + st := w.state.String() + // we can safely compare pid to 0 + if w.pid != 0 { + st = st + ", pid:" + strconv.Itoa(w.pid) } return fmt.Sprintf( "(`%s` [%s], numExecs: %v)", strings.Join(w.cmd.Args, " "), - state, + st, w.state.NumExecs(), ) } -// Wait must be called once for each worker, call will be released once worker is +func (w *WorkerProcess) Start() error { + err := w.cmd.Start() + if err != nil { + return err + } + + w.pid = w.cmd.Process.Pid + + return nil +} + +func (w *WorkerProcess) Events() <-chan WorkerEvent { + return w.events +} + +// Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is // complete and will return process error (if any), if stderr is presented it's value // will be wrapped as WorkerError. Method will return error code if php process fails -// to find or start the script. -func (w *Worker) Wait() error { - <-w.waitDone +// to find or Start the script. +func (w *WorkerProcess) Wait(ctx context.Context) error { + c := make(chan error) + go func() { + err := multierr.Combine(w.cmd.Wait()) + w.endState = w.cmd.ProcessState + if err != nil { + w.state.Set(StateErrored) + // if there are messages in the events channel, read it + // TODO potentially danger place + if len(w.events) > 0 { + select { + case ev := <-w.events: + err = multierr.Append(err, errors.New(string(ev.Payload.([]byte)))) + } + } + // if no errors in the events, error might be in the errbuffer + if w.errBuffer.Len() > 0 { + err = multierr.Append(err, errors.New(w.errBuffer.String())) + } - // ensure that all receive/send operations are complete - w.mu.Lock() - defer w.mu.Unlock() + c <- multierr.Append(err, w.closeRelay()) + return + } - if w.endState.Success() { - w.state.set(StateStopped) - return nil - } + err = multierr.Append(err, w.closeRelay()) + if err != nil { + w.state.Set(StateErrored) + c <- err + return + } - if w.state.Value() != StateStopping { - w.state.set(StateErrored) - } else { - w.state.set(StateStopped) + if w.endState.Success() { + w.state.Set(StateStopped) + } + c <- nil + }() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-c: + return err + } } +} - if w.err.Len() != 0 { - return errors.New(w.err.String()) +func (w *WorkerProcess) closeRelay() error { + if w.relay != nil { + err := w.relay.Close() + if err != nil { + return err + } } - - // generic process error - return &exec.ExitError{ProcessState: w.endState} + return nil } -// Stop sends soft termination command to the worker and waits for process completion. -func (w *Worker) Stop() error { - select { - case <-w.waitDone: - return nil - default: +// Stop sends soft termination command to the WorkerProcess and waits for process completion. +func (w *WorkerProcess) Stop(ctx context.Context) error { + c := make(chan error) + go func() { + var errs []string + w.errBuffer.Close() + w.state.Set(StateStopping) w.mu.Lock() defer w.mu.Unlock() - - w.state.set(StateStopping) - err := sendControl(w.rl, &stopCommand{Stop: true}) - - <-w.waitDone - return err - } -} - -// Kill kills underlying process, make sure to call Wait() func to gather -// error log from the stderr. Does not waits for process completion! -func (w *Worker) Kill() error { + err := sendControl(w.relay, &stopCommand{Stop: true}) + if err != nil { + errs = append(errs, err.Error()) + w.state.Set(StateKilling) + err = w.cmd.Process.Kill() + if err != nil { + errs = append(errs, err.Error()) + } + c <- errors.New(strings.Join(errs, "|")) + } + w.state.Set(StateStopped) + c <- nil + }() select { - case <-w.waitDone: + case <-ctx.Done(): + return ctx.Err() + case err := <-c: + if err != nil { + return err + } return nil - default: - w.state.set(StateStopping) - err := w.cmd.Process.Signal(os.Kill) - - <-w.waitDone - return err } } -// Exec sends payload to worker, executes it and returns result or -// error. Make sure to handle worker.Wait() to gather worker level -// errors. Method might return JobError indicating issue with payload. -func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) { +// Kill kills underlying process, make sure to call Wait() func to gather +// error log from the stderr. Does not waits for process completion! +func (w *WorkerProcess) Kill(ctx context.Context) error { + w.state.Set(StateKilling) w.mu.Lock() - - if rqs == nil { - w.mu.Unlock() - return nil, fmt.Errorf("payload can not be empty") - } - - if w.state.Value() != StateReady { - w.mu.Unlock() - return nil, fmt.Errorf("worker is not ready (%s)", w.state.String()) - } - - w.state.set(StateWorking) - - rsp, err = w.execPayload(rqs) + defer w.mu.Unlock() + err := w.cmd.Process.Signal(os.Kill) if err != nil { - if _, ok := err.(JobError); !ok { - w.state.set(StateErrored) - w.state.registerExec() - w.mu.Unlock() - return nil, err - } + return err } + w.state.Set(StateStopped) + return nil +} - w.state.set(StateReady) - w.state.registerExec() - w.mu.Unlock() - return rsp, err +func (w *WorkerProcess) logCallback(log []byte) { + w.events <- WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log} } -func (w *Worker) markInvalid() { - w.state.set(StateInvalid) +// thread safe errBuffer +type errBuffer struct { + mu sync.RWMutex + buf []byte + last int + wait *time.Timer + // todo remove update + update chan interface{} + stop chan interface{} + logCallback func(log []byte) } -func (w *Worker) start() error { - if err := w.cmd.Start(); err != nil { - close(w.waitDone) - return err +func newErrBuffer(logCallback func(log []byte)) *errBuffer { + eb := &errBuffer{ + buf: make([]byte, 0), + update: make(chan interface{}), + wait: time.NewTimer(WaitDuration), + stop: make(chan interface{}), + logCallback: logCallback, } - w.Pid = &w.cmd.Process.Pid - - // wait for process to complete - go func() { - w.endState, _ = w.cmd.Process.Wait() - if w.waitDone != nil { - close(w.waitDone) - w.mu.Lock() - defer w.mu.Unlock() - - if w.rl != nil { - err := w.rl.Close() - if err != nil { - w.err.lsn(EventWorkerError, WorkerError{Worker: w, Caused: err}) + go func(eb *errBuffer) { + for { + select { + case <-eb.update: + eb.wait.Reset(WaitDuration) + case <-eb.wait.C: + eb.mu.Lock() + if len(eb.buf) > eb.last { + eb.logCallback(eb.buf[eb.last:]) + eb.buf = eb.buf[0:0] + eb.last = len(eb.buf) } - } - - err := w.err.Close() - if err != nil { - w.err.lsn(EventWorkerError, WorkerError{Worker: w, Caused: err}) + eb.mu.Unlock() + case <-eb.stop: + eb.wait.Stop() + + eb.mu.Lock() + if len(eb.buf) > eb.last { + if eb == nil || eb.logCallback == nil { + eb.mu.Unlock() + return + } + eb.logCallback(eb.buf[eb.last:]) + eb.last = len(eb.buf) + } + eb.mu.Unlock() + return } } - }() + }(eb) - return nil + return eb } -func (w *Worker) execPayload(rqs *Payload) (rsp *Payload, err error) { - // two things - if err := sendControl(w.rl, rqs.Context); err != nil { - return nil, errors.Wrap(err, "header error") - } +// Len returns the number of buf of the unread portion of the errBuffer; +// buf.Len() == len(buf.Bytes()). +func (eb *errBuffer) Len() int { + eb.mu.RLock() + defer eb.mu.RUnlock() - if err = w.rl.Send(rqs.Body, 0); err != nil { - return nil, errors.Wrap(err, "sender error") - } + // currently active message + return len(eb.buf) +} - var pr goridge.Prefix - rsp = new(Payload) +// Write appends the contents of pool to the errBuffer, growing the errBuffer as +// needed. The return value n is the length of pool; errBuffer is always nil. +func (eb *errBuffer) Write(p []byte) (int, error) { + eb.mu.Lock() + eb.buf = append(eb.buf, p...) + eb.mu.Unlock() + eb.update <- nil - if rsp.Context, pr, err = w.rl.Receive(); err != nil { - return nil, errors.Wrap(err, "worker error") - } - - if !pr.HasFlag(goridge.PayloadControl) { - return nil, fmt.Errorf("malformed worker response") - } + return len(p), nil +} - if pr.HasFlag(goridge.PayloadError) { - return nil, JobError(rsp.Context) - } +// Strings fetches all errBuffer data into string. +func (eb *errBuffer) String() string { + eb.mu.Lock() + defer eb.mu.Unlock() - // add streaming support :) - if rsp.Body, pr, err = w.rl.Receive(); err != nil { - return nil, errors.Wrap(err, "worker error") - } + // TODO unsafe operation, use runes + return string(eb.buf) +} - return rsp, nil +// Close aggregation timer. +func (eb *errBuffer) Close() { + close(eb.stop) } |