summaryrefslogtreecommitdiff
path: root/worker.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-01-23 19:51:15 -0500
committerWolfy-J <[email protected]>2018-01-23 19:51:15 -0500
commit78a42de837928cf7d10a1ae04d7e82e56d66e1e2 (patch)
tree8882b9a051bcc9c42328df583c0bb8c39a89591e /worker.go
parentfa4bd78d9f7c5f74e8445374370927c742fc4e78 (diff)
API update
Diffstat (limited to 'worker.go')
-rw-r--r--worker.go258
1 files changed, 162 insertions, 96 deletions
diff --git a/worker.go b/worker.go
index 8960b3fa..aee28c1e 100644
--- a/worker.go
+++ b/worker.go
@@ -2,167 +2,233 @@ package roadrunner
import (
"bytes"
- "encoding/json"
"fmt"
"github.com/spiral/goridge"
- "io"
+ "os"
"os/exec"
"strconv"
"strings"
"sync"
- "sync/atomic"
- "time"
+ "github.com/pkg/errors"
)
// Worker - supervised process with api over goridge.Relay.
type Worker struct {
- // State current worker state.
- State State
+ // Pid of the process, points to Pid of underlying process and
+ // can be nil while process is not started.
+ Pid *int
- // Last time worker State has changed
- Last time.Time
+ // state holds information about current worker state,
+ // number of worker executions, last status change time.
+ // publicly this object is read-only and protected using Mutex
+ // and atomic counter.
+ state *state
- // NumExecutions how many times worker have been invoked.
- NumExecutions uint64
+ // underlying command with associated process, command must be
+ // provided to worker from outside in non-started form. Cmd
+ // stdErr pipe will be handled by worker to aggregate error message.
+ cmd *exec.Cmd
- // Pid contains process ID and empty until worker is started.
- Pid *int
+ // err aggregates stderr output from underlying process. Value can be
+ // read only once command is completed and all pipes are closed.
+ err *bytes.Buffer
+
+ // channel is being closed once command is complete.
+ waitDone chan interface{}
+
+ // contains information about resulted process state.
+ endState *os.ProcessState
- cmd *exec.Cmd // underlying command process
- err *bytes.Buffer // aggregates stderr
- rl goridge.Relay // communication bus with underlying process
- mu sync.RWMutex // ensures than only one execution can be run at once
+ // ensures than only one execution can be run at once.
+ mu sync.Mutex
+
+ // communication bus with underlying process.
+ rl goridge.Relay
}
-// NewWorker creates new worker
-func NewWorker(cmd *exec.Cmd) (*Worker, error) {
- w := &Worker{
- cmd: cmd,
- err: bytes.NewBuffer(nil),
- State: StateInactive,
+// newWorker creates new worker over given exec.cmd.
+func newWorker(cmd *exec.Cmd) (*Worker, error) {
+ if cmd.Process != nil {
+ return nil, fmt.Errorf("can't attach to running process")
}
- if w.cmd.Process != nil {
- return nil, fmt.Errorf("can't attach to running process")
+ w := &Worker{
+ cmd: cmd,
+ err: new(bytes.Buffer),
+ waitDone: make(chan interface{}),
+ state: newState(StateInactive),
}
+ // piping all stderr to command buffer
+ w.cmd.Stderr = w.err
+
return w, nil
}
+// State return read-only worker state object, state can be used to safely access
+// worker status, time when status changed and number of worker executions.
+func (w *Worker) State() State {
+ return w.state
+}
+
// String returns worker description.
func (w *Worker) String() string {
- state := w.State.String()
-
+ state := w.state.String()
if w.Pid != nil {
- state = state + ", pid:" + strconv.Itoa(*w.Pid)
+ state = state + ", pid.php:" + strconv.Itoa(*w.Pid)
}
- return fmt.Sprintf("(`%s` [%s], execs: %v)", strings.Join(w.cmd.Args, " "), state, w.NumExecutions)
+ return fmt.Sprintf(
+ "(`%s` [%s], numExecs: %v)",
+ strings.Join(w.cmd.Args, " "),
+ state,
+ w.state.NumExecs(),
+ )
}
// Start underlying process or return error
func (w *Worker) Start() error {
- stderr, err := w.cmd.StderrPipe()
- if err != nil {
- w.setState(StateError)
- return err
+ if w.cmd.Process != nil {
+ return fmt.Errorf("process already running")
}
- // copying all process errors into buffer space
- go io.Copy(w.err, stderr)
-
if err := w.cmd.Start(); err != nil {
- w.setState(StateError)
- return w.mockError(err)
+ close(w.waitDone)
+
+ return err
}
- w.setState(StateReady)
+ w.Pid = &w.cmd.Process.Pid
+
+ // relays for process to complete
+ go func() {
+ w.endState, _ = w.cmd.Process.Wait()
+ if w.waitDone != nil {
+ w.state.set(StateStopped)
+ close(w.waitDone)
+
+ if w.rl != nil {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ w.rl.Close()
+ }
+ }
+ }()
return nil
}
-// Execute command and return result and result context.
-func (w *Worker) Execute(body []byte, ctx interface{}) (resp []byte, rCtx []byte, err error) {
- w.mu.Lock()
- defer w.mu.Unlock()
+// Wait must be called once for each worker, call will be released once worker is
+// complete and will return process error (if any), if stderr is presented it's value
+// will be wrapped as WorkerError. Method will return error code if php process fails
+// to find or start the script.
+func (w *Worker) Wait() error {
+ <-w.waitDone
- if w.State != StateReady {
- return nil, nil, fmt.Errorf("worker must be in state `waiting` (`%s` given)", w.State)
- }
+ // ensure that all pipe descriptors are closed
+ w.cmd.Wait()
- w.setState(StateReady)
- atomic.AddUint64(&w.NumExecutions, 1)
+ if w.endState.Success() {
+ return nil
+ }
- if ctx != nil {
- if data, err := json.Marshal(ctx); err == nil {
- w.rl.Send(data, goridge.PayloadControl)
- } else {
- return nil, nil, fmt.Errorf("invalid context: %s", err)
- }
- } else {
- w.rl.Send(nil, goridge.PayloadControl|goridge.PayloadEmpty)
+ if w.err.Len() != 0 {
+ return errors.New(w.err.String())
}
- w.rl.Send(body, 0)
+ // generic process error
+ return &exec.ExitError{ProcessState: w.endState}
+}
- rCtx, p, err := w.rl.Receive()
+// Destroy sends soft termination command to the worker to properly stop the process.
+func (w *Worker) Stop() error {
+ select {
+ case <-w.waitDone:
+ return nil
+ default:
+ w.mu.Lock()
+ defer w.mu.Unlock()
- if !p.HasFlag(goridge.PayloadControl) {
- return nil, nil, w.mockError(fmt.Errorf("invalid response (check script integrity)"))
- }
+ w.state.set(StateInactive)
+ err := sendHead(w.rl, &stopCommand{Stop: true})
- if p.HasFlag(goridge.PayloadError) {
- w.setState(StateReady)
- return nil, nil, JobError(rCtx)
+ <-w.waitDone
+ return err
}
+}
- if resp, p, err = w.rl.Receive(); err != nil {
- w.setState(StateError)
- return nil, nil, w.mockError(fmt.Errorf("worker error: %s", err))
- }
+// Kill kills underlying process, make sure to call Wait() func to gather
+// error log from the stderr
+func (w *Worker) Kill() error {
+ select {
+ case <-w.waitDone:
+ return nil
+ default:
+ w.mu.Lock()
+ defer w.mu.Unlock()
- w.setState(StateReady)
- return resp, rCtx, nil
+ w.state.set(StateInactive)
+ err := w.cmd.Process.Kill()
+
+ <-w.waitDone
+ return err
+ }
}
-// Stop underlying process or return error.
-func (w *Worker) Stop() {
+func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) {
w.mu.Lock()
defer w.mu.Unlock()
- w.setState(StateInactive)
+ if rqs == nil {
+ return nil, fmt.Errorf("payload can not be empty")
+ }
- go func() {
- sendCommand(w.rl, &TerminateCommand{Terminate: true})
- }()
+ if w.state.Value() != StateReady {
+ return nil, fmt.Errorf("worker is not ready (%s)", w.state.Value())
+ }
- w.cmd.Wait()
- w.rl.Close()
+ w.state.set(StateWorking)
+ defer w.state.registerExec()
- w.setState(StateStopped)
-}
+ rsp, err = w.execPayload(rqs)
-// attach payload/control relay to the worker.
-func (w *Worker) attach(rl goridge.Relay) {
- w.mu.Lock()
- defer w.mu.Unlock()
+ if err != nil {
+ if _, ok := err.(JobError); !ok {
+ w.state.set(StateErrored)
+ return nil, err
+ }
+ }
- w.rl = rl
- w.setState(StateBooting)
+ w.state.set(StateReady)
+ return rsp, err
}
-// sets worker State and it's context (non blocking!).
-func (w *Worker) setState(state State) {
- // safer?
- w.State = state
- w.Last = time.Now()
-}
+func (w *Worker) execPayload(rqs *Payload) (rsp *Payload, err error) {
+ if err := sendHead(w.rl, rqs.Head); err != nil {
+ return nil, errors.Wrap(err, "header error")
+ }
-// mockError attaches worker specific error (from stderr) to parent error
-func (w *Worker) mockError(err error) WorkerError {
- if w.err.Len() != 0 {
- return WorkerError(w.err.String())
+ w.rl.Send(rqs.Body, 0)
+
+ var pr goridge.Prefix
+ rsp = new(Payload)
+
+ if rsp.Head, pr, err = w.rl.Receive(); err != nil {
+ return nil, errors.Wrap(err, "worker error")
+ }
+
+ if !pr.HasFlag(goridge.PayloadControl) {
+ return nil, fmt.Errorf("mailformed worker response")
+ }
+
+ if pr.HasFlag(goridge.PayloadError) {
+ return nil, JobError(rsp.Head)
+ }
+
+ if rsp.Body, pr, err = w.rl.Receive(); err != nil {
+ return nil, errors.Wrap(err, "worker error")
}
- return WorkerError(err.Error())
+ return rsp, nil
}