summaryrefslogtreecommitdiff
path: root/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'worker.go')
-rw-r--r--worker.go168
1 files changed, 168 insertions, 0 deletions
diff --git a/worker.go b/worker.go
new file mode 100644
index 00000000..8960b3fa
--- /dev/null
+++ b/worker.go
@@ -0,0 +1,168 @@
+package roadrunner
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "github.com/spiral/goridge"
+ "io"
+ "os/exec"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+// Worker - supervised process with api over goridge.Relay.
+type Worker struct {
+ // State current worker state.
+ State State
+
+ // Last time worker State has changed
+ Last time.Time
+
+ // NumExecutions how many times worker have been invoked.
+ NumExecutions uint64
+
+ // Pid contains process ID and empty until worker is started.
+ Pid *int
+
+ cmd *exec.Cmd // underlying command process
+ err *bytes.Buffer // aggregates stderr
+ rl goridge.Relay // communication bus with underlying process
+ mu sync.RWMutex // ensures than only one execution can be run at once
+}
+
+// NewWorker creates new worker
+func NewWorker(cmd *exec.Cmd) (*Worker, error) {
+ w := &Worker{
+ cmd: cmd,
+ err: bytes.NewBuffer(nil),
+ State: StateInactive,
+ }
+
+ if w.cmd.Process != nil {
+ return nil, fmt.Errorf("can't attach to running process")
+ }
+
+ return w, nil
+}
+
+// String returns worker description.
+func (w *Worker) String() string {
+ state := w.State.String()
+
+ if w.Pid != nil {
+ state = state + ", pid:" + strconv.Itoa(*w.Pid)
+ }
+
+ return fmt.Sprintf("(`%s` [%s], execs: %v)", strings.Join(w.cmd.Args, " "), state, w.NumExecutions)
+}
+
+// Start underlying process or return error
+func (w *Worker) Start() error {
+ stderr, err := w.cmd.StderrPipe()
+ if err != nil {
+ w.setState(StateError)
+ return err
+ }
+
+ // copying all process errors into buffer space
+ go io.Copy(w.err, stderr)
+
+ if err := w.cmd.Start(); err != nil {
+ w.setState(StateError)
+ return w.mockError(err)
+ }
+
+ w.setState(StateReady)
+
+ return nil
+}
+
+// Execute command and return result and result context.
+func (w *Worker) Execute(body []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ if w.State != StateReady {
+ return nil, nil, fmt.Errorf("worker must be in state `waiting` (`%s` given)", w.State)
+ }
+
+ w.setState(StateReady)
+ atomic.AddUint64(&w.NumExecutions, 1)
+
+ if ctx != nil {
+ if data, err := json.Marshal(ctx); err == nil {
+ w.rl.Send(data, goridge.PayloadControl)
+ } else {
+ return nil, nil, fmt.Errorf("invalid context: %s", err)
+ }
+ } else {
+ w.rl.Send(nil, goridge.PayloadControl|goridge.PayloadEmpty)
+ }
+
+ w.rl.Send(body, 0)
+
+ rCtx, p, err := w.rl.Receive()
+
+ if !p.HasFlag(goridge.PayloadControl) {
+ return nil, nil, w.mockError(fmt.Errorf("invalid response (check script integrity)"))
+ }
+
+ if p.HasFlag(goridge.PayloadError) {
+ w.setState(StateReady)
+ return nil, nil, JobError(rCtx)
+ }
+
+ if resp, p, err = w.rl.Receive(); err != nil {
+ w.setState(StateError)
+ return nil, nil, w.mockError(fmt.Errorf("worker error: %s", err))
+ }
+
+ w.setState(StateReady)
+ return resp, rCtx, nil
+}
+
+// Stop underlying process or return error.
+func (w *Worker) Stop() {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ w.setState(StateInactive)
+
+ go func() {
+ sendCommand(w.rl, &TerminateCommand{Terminate: true})
+ }()
+
+ w.cmd.Wait()
+ w.rl.Close()
+
+ w.setState(StateStopped)
+}
+
+// attach payload/control relay to the worker.
+func (w *Worker) attach(rl goridge.Relay) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ w.rl = rl
+ w.setState(StateBooting)
+}
+
+// sets worker State and it's context (non blocking!).
+func (w *Worker) setState(state State) {
+ // safer?
+ w.State = state
+ w.Last = time.Now()
+}
+
+// mockError attaches worker specific error (from stderr) to parent error
+func (w *Worker) mockError(err error) WorkerError {
+ if w.err.Len() != 0 {
+ return WorkerError(w.err.String())
+ }
+
+ return WorkerError(err.Error())
+}