diff options
Diffstat (limited to 'worker.go')
-rw-r--r-- | worker.go | 168 |
1 files changed, 168 insertions, 0 deletions
diff --git a/worker.go b/worker.go new file mode 100644 index 00000000..8960b3fa --- /dev/null +++ b/worker.go @@ -0,0 +1,168 @@ +package roadrunner + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/spiral/goridge" + "io" + "os/exec" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" +) + +// Worker - supervised process with api over goridge.Relay. +type Worker struct { + // State current worker state. + State State + + // Last time worker State has changed + Last time.Time + + // NumExecutions how many times worker have been invoked. + NumExecutions uint64 + + // Pid contains process ID and empty until worker is started. + Pid *int + + cmd *exec.Cmd // underlying command process + err *bytes.Buffer // aggregates stderr + rl goridge.Relay // communication bus with underlying process + mu sync.RWMutex // ensures than only one execution can be run at once +} + +// NewWorker creates new worker +func NewWorker(cmd *exec.Cmd) (*Worker, error) { + w := &Worker{ + cmd: cmd, + err: bytes.NewBuffer(nil), + State: StateInactive, + } + + if w.cmd.Process != nil { + return nil, fmt.Errorf("can't attach to running process") + } + + return w, nil +} + +// String returns worker description. +func (w *Worker) String() string { + state := w.State.String() + + if w.Pid != nil { + state = state + ", pid:" + strconv.Itoa(*w.Pid) + } + + return fmt.Sprintf("(`%s` [%s], execs: %v)", strings.Join(w.cmd.Args, " "), state, w.NumExecutions) +} + +// Start underlying process or return error +func (w *Worker) Start() error { + stderr, err := w.cmd.StderrPipe() + if err != nil { + w.setState(StateError) + return err + } + + // copying all process errors into buffer space + go io.Copy(w.err, stderr) + + if err := w.cmd.Start(); err != nil { + w.setState(StateError) + return w.mockError(err) + } + + w.setState(StateReady) + + return nil +} + +// Execute command and return result and result context. +func (w *Worker) Execute(body []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) { + w.mu.Lock() + defer w.mu.Unlock() + + if w.State != StateReady { + return nil, nil, fmt.Errorf("worker must be in state `waiting` (`%s` given)", w.State) + } + + w.setState(StateReady) + atomic.AddUint64(&w.NumExecutions, 1) + + if ctx != nil { + if data, err := json.Marshal(ctx); err == nil { + w.rl.Send(data, goridge.PayloadControl) + } else { + return nil, nil, fmt.Errorf("invalid context: %s", err) + } + } else { + w.rl.Send(nil, goridge.PayloadControl|goridge.PayloadEmpty) + } + + w.rl.Send(body, 0) + + rCtx, p, err := w.rl.Receive() + + if !p.HasFlag(goridge.PayloadControl) { + return nil, nil, w.mockError(fmt.Errorf("invalid response (check script integrity)")) + } + + if p.HasFlag(goridge.PayloadError) { + w.setState(StateReady) + return nil, nil, JobError(rCtx) + } + + if resp, p, err = w.rl.Receive(); err != nil { + w.setState(StateError) + return nil, nil, w.mockError(fmt.Errorf("worker error: %s", err)) + } + + w.setState(StateReady) + return resp, rCtx, nil +} + +// Stop underlying process or return error. +func (w *Worker) Stop() { + w.mu.Lock() + defer w.mu.Unlock() + + w.setState(StateInactive) + + go func() { + sendCommand(w.rl, &TerminateCommand{Terminate: true}) + }() + + w.cmd.Wait() + w.rl.Close() + + w.setState(StateStopped) +} + +// attach payload/control relay to the worker. +func (w *Worker) attach(rl goridge.Relay) { + w.mu.Lock() + defer w.mu.Unlock() + + w.rl = rl + w.setState(StateBooting) +} + +// sets worker State and it's context (non blocking!). +func (w *Worker) setState(state State) { + // safer? + w.State = state + w.Last = time.Now() +} + +// mockError attaches worker specific error (from stderr) to parent error +func (w *Worker) mockError(err error) WorkerError { + if w.err.Len() != 0 { + return WorkerError(w.err.String()) + } + + return WorkerError(err.Error()) +} |