diff options
author | Wolfy-J <[email protected]> | 2018-01-23 19:51:15 -0500 |
---|---|---|
committer | Wolfy-J <[email protected]> | 2018-01-23 19:51:15 -0500 |
commit | 78a42de837928cf7d10a1ae04d7e82e56d66e1e2 (patch) | |
tree | 8882b9a051bcc9c42328df583c0bb8c39a89591e /worker.go | |
parent | fa4bd78d9f7c5f74e8445374370927c742fc4e78 (diff) |
API update
Diffstat (limited to 'worker.go')
-rw-r--r-- | worker.go | 258 |
1 files changed, 162 insertions, 96 deletions
@@ -2,167 +2,233 @@ package roadrunner import ( "bytes" - "encoding/json" "fmt" "github.com/spiral/goridge" - "io" + "os" "os/exec" "strconv" "strings" "sync" - "sync/atomic" - "time" + "github.com/pkg/errors" ) // Worker - supervised process with api over goridge.Relay. type Worker struct { - // State current worker state. - State State + // Pid of the process, points to Pid of underlying process and + // can be nil while process is not started. + Pid *int - // Last time worker State has changed - Last time.Time + // state holds information about current worker state, + // number of worker executions, last status change time. + // publicly this object is read-only and protected using Mutex + // and atomic counter. + state *state - // NumExecutions how many times worker have been invoked. - NumExecutions uint64 + // underlying command with associated process, command must be + // provided to worker from outside in non-started form. Cmd + // stdErr pipe will be handled by worker to aggregate error message. + cmd *exec.Cmd - // Pid contains process ID and empty until worker is started. - Pid *int + // err aggregates stderr output from underlying process. Value can be + // read only once command is completed and all pipes are closed. + err *bytes.Buffer + + // channel is being closed once command is complete. + waitDone chan interface{} + + // contains information about resulted process state. + endState *os.ProcessState - cmd *exec.Cmd // underlying command process - err *bytes.Buffer // aggregates stderr - rl goridge.Relay // communication bus with underlying process - mu sync.RWMutex // ensures than only one execution can be run at once + // ensures than only one execution can be run at once. + mu sync.Mutex + + // communication bus with underlying process. + rl goridge.Relay } -// NewWorker creates new worker -func NewWorker(cmd *exec.Cmd) (*Worker, error) { - w := &Worker{ - cmd: cmd, - err: bytes.NewBuffer(nil), - State: StateInactive, +// newWorker creates new worker over given exec.cmd. +func newWorker(cmd *exec.Cmd) (*Worker, error) { + if cmd.Process != nil { + return nil, fmt.Errorf("can't attach to running process") } - if w.cmd.Process != nil { - return nil, fmt.Errorf("can't attach to running process") + w := &Worker{ + cmd: cmd, + err: new(bytes.Buffer), + waitDone: make(chan interface{}), + state: newState(StateInactive), } + // piping all stderr to command buffer + w.cmd.Stderr = w.err + return w, nil } +// State return read-only worker state object, state can be used to safely access +// worker status, time when status changed and number of worker executions. +func (w *Worker) State() State { + return w.state +} + // String returns worker description. func (w *Worker) String() string { - state := w.State.String() - + state := w.state.String() if w.Pid != nil { - state = state + ", pid:" + strconv.Itoa(*w.Pid) + state = state + ", pid.php:" + strconv.Itoa(*w.Pid) } - return fmt.Sprintf("(`%s` [%s], execs: %v)", strings.Join(w.cmd.Args, " "), state, w.NumExecutions) + return fmt.Sprintf( + "(`%s` [%s], numExecs: %v)", + strings.Join(w.cmd.Args, " "), + state, + w.state.NumExecs(), + ) } // Start underlying process or return error func (w *Worker) Start() error { - stderr, err := w.cmd.StderrPipe() - if err != nil { - w.setState(StateError) - return err + if w.cmd.Process != nil { + return fmt.Errorf("process already running") } - // copying all process errors into buffer space - go io.Copy(w.err, stderr) - if err := w.cmd.Start(); err != nil { - w.setState(StateError) - return w.mockError(err) + close(w.waitDone) + + return err } - w.setState(StateReady) + w.Pid = &w.cmd.Process.Pid + + // relays for process to complete + go func() { + w.endState, _ = w.cmd.Process.Wait() + if w.waitDone != nil { + w.state.set(StateStopped) + close(w.waitDone) + + if w.rl != nil { + w.mu.Lock() + defer w.mu.Unlock() + + w.rl.Close() + } + } + }() return nil } -// Execute command and return result and result context. -func (w *Worker) Execute(body []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) { - w.mu.Lock() - defer w.mu.Unlock() +// Wait must be called once for each worker, call will be released once worker is +// complete and will return process error (if any), if stderr is presented it's value +// will be wrapped as WorkerError. Method will return error code if php process fails +// to find or start the script. +func (w *Worker) Wait() error { + <-w.waitDone - if w.State != StateReady { - return nil, nil, fmt.Errorf("worker must be in state `waiting` (`%s` given)", w.State) - } + // ensure that all pipe descriptors are closed + w.cmd.Wait() - w.setState(StateReady) - atomic.AddUint64(&w.NumExecutions, 1) + if w.endState.Success() { + return nil + } - if ctx != nil { - if data, err := json.Marshal(ctx); err == nil { - w.rl.Send(data, goridge.PayloadControl) - } else { - return nil, nil, fmt.Errorf("invalid context: %s", err) - } - } else { - w.rl.Send(nil, goridge.PayloadControl|goridge.PayloadEmpty) + if w.err.Len() != 0 { + return errors.New(w.err.String()) } - w.rl.Send(body, 0) + // generic process error + return &exec.ExitError{ProcessState: w.endState} +} - rCtx, p, err := w.rl.Receive() +// Destroy sends soft termination command to the worker to properly stop the process. +func (w *Worker) Stop() error { + select { + case <-w.waitDone: + return nil + default: + w.mu.Lock() + defer w.mu.Unlock() - if !p.HasFlag(goridge.PayloadControl) { - return nil, nil, w.mockError(fmt.Errorf("invalid response (check script integrity)")) - } + w.state.set(StateInactive) + err := sendHead(w.rl, &stopCommand{Stop: true}) - if p.HasFlag(goridge.PayloadError) { - w.setState(StateReady) - return nil, nil, JobError(rCtx) + <-w.waitDone + return err } +} - if resp, p, err = w.rl.Receive(); err != nil { - w.setState(StateError) - return nil, nil, w.mockError(fmt.Errorf("worker error: %s", err)) - } +// Kill kills underlying process, make sure to call Wait() func to gather +// error log from the stderr +func (w *Worker) Kill() error { + select { + case <-w.waitDone: + return nil + default: + w.mu.Lock() + defer w.mu.Unlock() - w.setState(StateReady) - return resp, rCtx, nil + w.state.set(StateInactive) + err := w.cmd.Process.Kill() + + <-w.waitDone + return err + } } -// Stop underlying process or return error. -func (w *Worker) Stop() { +func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) { w.mu.Lock() defer w.mu.Unlock() - w.setState(StateInactive) + if rqs == nil { + return nil, fmt.Errorf("payload can not be empty") + } - go func() { - sendCommand(w.rl, &TerminateCommand{Terminate: true}) - }() + if w.state.Value() != StateReady { + return nil, fmt.Errorf("worker is not ready (%s)", w.state.Value()) + } - w.cmd.Wait() - w.rl.Close() + w.state.set(StateWorking) + defer w.state.registerExec() - w.setState(StateStopped) -} + rsp, err = w.execPayload(rqs) -// attach payload/control relay to the worker. -func (w *Worker) attach(rl goridge.Relay) { - w.mu.Lock() - defer w.mu.Unlock() + if err != nil { + if _, ok := err.(JobError); !ok { + w.state.set(StateErrored) + return nil, err + } + } - w.rl = rl - w.setState(StateBooting) + w.state.set(StateReady) + return rsp, err } -// sets worker State and it's context (non blocking!). -func (w *Worker) setState(state State) { - // safer? - w.State = state - w.Last = time.Now() -} +func (w *Worker) execPayload(rqs *Payload) (rsp *Payload, err error) { + if err := sendHead(w.rl, rqs.Head); err != nil { + return nil, errors.Wrap(err, "header error") + } -// mockError attaches worker specific error (from stderr) to parent error -func (w *Worker) mockError(err error) WorkerError { - if w.err.Len() != 0 { - return WorkerError(w.err.String()) + w.rl.Send(rqs.Body, 0) + + var pr goridge.Prefix + rsp = new(Payload) + + if rsp.Head, pr, err = w.rl.Receive(); err != nil { + return nil, errors.Wrap(err, "worker error") + } + + if !pr.HasFlag(goridge.PayloadControl) { + return nil, fmt.Errorf("mailformed worker response") + } + + if pr.HasFlag(goridge.PayloadError) { + return nil, JobError(rsp.Head) + } + + if rsp.Body, pr, err = w.rl.Receive(); err != nil { + return nil, errors.Wrap(err, "worker error") } - return WorkerError(err.Error()) + return rsp, nil } |