summaryrefslogtreecommitdiff
path: root/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'worker.go')
-rw-r--r--worker.go459
1 files changed, 299 insertions, 160 deletions
diff --git a/worker.go b/worker.go
index 3740aee8..855a9958 100644
--- a/worker.go
+++ b/worker.go
@@ -1,6 +1,8 @@
package roadrunner
import (
+ "context"
+ "errors"
"fmt"
"os"
"os/exec"
@@ -9,36 +11,102 @@ import (
"sync"
"time"
- "github.com/pkg/errors"
"github.com/spiral/goridge/v2"
+ "go.uber.org/multierr"
)
-// Worker - supervised process with api over goridge.Relay.
-type Worker struct {
- // Pid of the process, points to Pid of underlying process and
- // can be nil while process is not started.
- Pid *int
+// EventWorkerKill thrown after WorkerProcess is being forcefully killed.
+const (
+ // EventWorkerError triggered after WorkerProcess. Except payload to be error.
+ EventWorkerError int64 = iota + 100
+
+ // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
+ EventWorkerLog
+
+ // EventWorkerWaitDone triggered when worker exit from process Wait
+ EventWorkerWaitDone
+
+ EventWorkerBufferClosed
+
+ EventRelayCloseError
+
+ EventWorkerProcessError
+)
+
+const (
+ // WaitDuration - for how long error buffer should attempt to aggregate error messages
+ // before merging output together since lastError update (required to keep error update together).
+ WaitDuration = 100 * time.Millisecond
+)
+
+// todo: write comment
+type WorkerEvent struct {
+ Event int64
+ Worker WorkerBase
+ Payload interface{}
+}
+
+type WorkerBase interface {
+ fmt.Stringer
+
+ Created() time.Time
+
+ Events() <-chan WorkerEvent
- // Created indicates at what time worker has been created.
- Created time.Time
+ Pid() int64
- // state holds information about current worker state,
- // number of worker executions, buf status change time.
+ // State return receive-only WorkerProcess state object, state can be used to safely access
+ // WorkerProcess status, time when status changed and number of WorkerProcess executions.
+ State() State
+
+ // Start used to run Cmd and immediately return
+ Start() error
+ // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
+ // complete and will return process error (if any), if stderr is presented it's value
+ // will be wrapped as WorkerError. Method will return error code if php process fails
+ // to find or Start the script.
+ Wait(ctx context.Context) error
+
+ // Stop sends soft termination command to the WorkerProcess and waits for process completion.
+ Stop(ctx context.Context) error
+ // Kill kills underlying process, make sure to call Wait() func to gather
+ // error log from the stderr. Does not waits for process completion!
+ Kill(ctx context.Context) error
+ // Relay returns attached to worker goridge relay
+ Relay() goridge.Relay
+ // AttachRelay used to attach goridge relay to the worker process
+ AttachRelay(rl goridge.Relay)
+}
+
+// WorkerProcess - supervised process with api over goridge.Relay.
+type WorkerProcess struct {
+ // created indicates at what time WorkerProcess has been created.
+ created time.Time
+
+ // updates parent supervisor or pool about WorkerProcess events
+ events chan WorkerEvent
+
+ // state holds information about current WorkerProcess state,
+ // number of WorkerProcess executions, buf status change time.
// publicly this object is receive-only and protected using Mutex
// and atomic counter.
state *state
// underlying command with associated process, command must be
- // provided to worker from outside in non-started form. CmdSource
- // stdErr direction will be handled by worker to aggregate error message.
+ // provided to WorkerProcess from outside in non-started form. CmdSource
+ // stdErr direction will be handled by WorkerProcess to aggregate error message.
cmd *exec.Cmd
- // err aggregates stderr output from underlying process. Value can be
+ // pid of the process, points to pid of underlying process and
+ // can be nil while process is not started.
+ pid int
+
+ // errBuffer aggregates stderr output from underlying process. Value can be
// receive only once command is completed and all pipes are closed.
- err *errBuffer
+ errBuffer *errBuffer
// channel is being closed once command is complete.
- waitDone chan interface{}
+ // waitDone chan interface{}
// contains information about resulted process state.
endState *os.ProcessState
@@ -47,212 +115,283 @@ type Worker struct {
mu sync.Mutex
// communication bus with underlying process.
- rl goridge.Relay
+ relay goridge.Relay
}
-// newWorker creates new worker over given exec.cmd.
-func newWorker(cmd *exec.Cmd) (*Worker, error) {
+// InitBaseWorker creates new WorkerProcess over given exec.cmd.
+func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) {
if cmd.Process != nil {
return nil, fmt.Errorf("can't attach to running process")
}
-
- w := &Worker{
- Created: time.Now(),
- cmd: cmd,
- err: newErrBuffer(),
- waitDone: make(chan interface{}),
- state: newState(StateInactive),
+ w := &WorkerProcess{
+ created: time.Now(),
+ events: make(chan WorkerEvent, 10),
+ cmd: cmd,
+ state: newState(StateInactive),
}
+ w.errBuffer = newErrBuffer(w.logCallback)
+
// piping all stderr to command errBuffer
- w.cmd.Stderr = w.err
+ w.cmd.Stderr = w.errBuffer
return w, nil
}
-// State return receive-only worker state object, state can be used to safely access
-// worker status, time when status changed and number of worker executions.
-func (w *Worker) State() State {
+func (w *WorkerProcess) Created() time.Time {
+ return w.created
+}
+
+func (w *WorkerProcess) Pid() int64 {
+ return int64(w.pid)
+}
+
+// State return receive-only WorkerProcess state object, state can be used to safely access
+// WorkerProcess status, time when status changed and number of WorkerProcess executions.
+func (w *WorkerProcess) State() State {
return w.state
}
-// String returns worker description.
-func (w *Worker) String() string {
- state := w.state.String()
- if w.Pid != nil {
- state = state + ", pid:" + strconv.Itoa(*w.Pid)
+// State return receive-only WorkerProcess state object, state can be used to safely access
+// WorkerProcess status, time when status changed and number of WorkerProcess executions.
+func (w *WorkerProcess) AttachRelay(rl goridge.Relay) {
+ w.relay = rl
+}
+
+// State return receive-only WorkerProcess state object, state can be used to safely access
+// WorkerProcess status, time when status changed and number of WorkerProcess executions.
+func (w *WorkerProcess) Relay() goridge.Relay {
+ return w.relay
+}
+
+// String returns WorkerProcess description. fmt.Stringer interface
+func (w *WorkerProcess) String() string {
+ st := w.state.String()
+ // we can safely compare pid to 0
+ if w.pid != 0 {
+ st = st + ", pid:" + strconv.Itoa(w.pid)
}
return fmt.Sprintf(
"(`%s` [%s], numExecs: %v)",
strings.Join(w.cmd.Args, " "),
- state,
+ st,
w.state.NumExecs(),
)
}
-// Wait must be called once for each worker, call will be released once worker is
+func (w *WorkerProcess) Start() error {
+ err := w.cmd.Start()
+ if err != nil {
+ return err
+ }
+
+ w.pid = w.cmd.Process.Pid
+
+ return nil
+}
+
+func (w *WorkerProcess) Events() <-chan WorkerEvent {
+ return w.events
+}
+
+// Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
// complete and will return process error (if any), if stderr is presented it's value
// will be wrapped as WorkerError. Method will return error code if php process fails
-// to find or start the script.
-func (w *Worker) Wait() error {
- <-w.waitDone
+// to find or Start the script.
+func (w *WorkerProcess) Wait(ctx context.Context) error {
+ c := make(chan error)
+ go func() {
+ err := multierr.Combine(w.cmd.Wait())
+ w.endState = w.cmd.ProcessState
+ if err != nil {
+ w.state.Set(StateErrored)
+ // if there are messages in the events channel, read it
+ // TODO potentially danger place
+ if len(w.events) > 0 {
+ select {
+ case ev := <-w.events:
+ err = multierr.Append(err, errors.New(string(ev.Payload.([]byte))))
+ }
+ }
+ // if no errors in the events, error might be in the errbuffer
+ if w.errBuffer.Len() > 0 {
+ err = multierr.Append(err, errors.New(w.errBuffer.String()))
+ }
- // ensure that all receive/send operations are complete
- w.mu.Lock()
- defer w.mu.Unlock()
+ c <- multierr.Append(err, w.closeRelay())
+ return
+ }
- if w.endState.Success() {
- w.state.set(StateStopped)
- return nil
- }
+ err = multierr.Append(err, w.closeRelay())
+ if err != nil {
+ w.state.Set(StateErrored)
+ c <- err
+ return
+ }
- if w.state.Value() != StateStopping {
- w.state.set(StateErrored)
- } else {
- w.state.set(StateStopped)
+ if w.endState.Success() {
+ w.state.Set(StateStopped)
+ }
+ c <- nil
+ }()
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case err := <-c:
+ return err
+ }
}
+}
- if w.err.Len() != 0 {
- return errors.New(w.err.String())
+func (w *WorkerProcess) closeRelay() error {
+ if w.relay != nil {
+ err := w.relay.Close()
+ if err != nil {
+ return err
+ }
}
-
- // generic process error
- return &exec.ExitError{ProcessState: w.endState}
+ return nil
}
-// Stop sends soft termination command to the worker and waits for process completion.
-func (w *Worker) Stop() error {
- select {
- case <-w.waitDone:
- return nil
- default:
+// Stop sends soft termination command to the WorkerProcess and waits for process completion.
+func (w *WorkerProcess) Stop(ctx context.Context) error {
+ c := make(chan error)
+ go func() {
+ var errs []string
+ w.errBuffer.Close()
+ w.state.Set(StateStopping)
w.mu.Lock()
defer w.mu.Unlock()
-
- w.state.set(StateStopping)
- err := sendControl(w.rl, &stopCommand{Stop: true})
-
- <-w.waitDone
- return err
- }
-}
-
-// Kill kills underlying process, make sure to call Wait() func to gather
-// error log from the stderr. Does not waits for process completion!
-func (w *Worker) Kill() error {
+ err := sendControl(w.relay, &stopCommand{Stop: true})
+ if err != nil {
+ errs = append(errs, err.Error())
+ w.state.Set(StateKilling)
+ err = w.cmd.Process.Kill()
+ if err != nil {
+ errs = append(errs, err.Error())
+ }
+ c <- errors.New(strings.Join(errs, "|"))
+ }
+ w.state.Set(StateStopped)
+ c <- nil
+ }()
select {
- case <-w.waitDone:
+ case <-ctx.Done():
+ return ctx.Err()
+ case err := <-c:
+ if err != nil {
+ return err
+ }
return nil
- default:
- w.state.set(StateStopping)
- err := w.cmd.Process.Signal(os.Kill)
-
- <-w.waitDone
- return err
}
}
-// Exec sends payload to worker, executes it and returns result or
-// error. Make sure to handle worker.Wait() to gather worker level
-// errors. Method might return JobError indicating issue with payload.
-func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error) {
+// Kill kills underlying process, make sure to call Wait() func to gather
+// error log from the stderr. Does not waits for process completion!
+func (w *WorkerProcess) Kill(ctx context.Context) error {
+ w.state.Set(StateKilling)
w.mu.Lock()
-
- if rqs == nil {
- w.mu.Unlock()
- return nil, fmt.Errorf("payload can not be empty")
- }
-
- if w.state.Value() != StateReady {
- w.mu.Unlock()
- return nil, fmt.Errorf("worker is not ready (%s)", w.state.String())
- }
-
- w.state.set(StateWorking)
-
- rsp, err = w.execPayload(rqs)
+ defer w.mu.Unlock()
+ err := w.cmd.Process.Signal(os.Kill)
if err != nil {
- if _, ok := err.(JobError); !ok {
- w.state.set(StateErrored)
- w.state.registerExec()
- w.mu.Unlock()
- return nil, err
- }
+ return err
}
+ w.state.Set(StateStopped)
+ return nil
+}
- w.state.set(StateReady)
- w.state.registerExec()
- w.mu.Unlock()
- return rsp, err
+func (w *WorkerProcess) logCallback(log []byte) {
+ w.events <- WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log}
}
-func (w *Worker) markInvalid() {
- w.state.set(StateInvalid)
+// thread safe errBuffer
+type errBuffer struct {
+ mu sync.RWMutex
+ buf []byte
+ last int
+ wait *time.Timer
+ // todo remove update
+ update chan interface{}
+ stop chan interface{}
+ logCallback func(log []byte)
}
-func (w *Worker) start() error {
- if err := w.cmd.Start(); err != nil {
- close(w.waitDone)
- return err
+func newErrBuffer(logCallback func(log []byte)) *errBuffer {
+ eb := &errBuffer{
+ buf: make([]byte, 0),
+ update: make(chan interface{}),
+ wait: time.NewTimer(WaitDuration),
+ stop: make(chan interface{}),
+ logCallback: logCallback,
}
- w.Pid = &w.cmd.Process.Pid
-
- // wait for process to complete
- go func() {
- w.endState, _ = w.cmd.Process.Wait()
- if w.waitDone != nil {
- close(w.waitDone)
- w.mu.Lock()
- defer w.mu.Unlock()
-
- if w.rl != nil {
- err := w.rl.Close()
- if err != nil {
- w.err.lsn(EventWorkerError, WorkerError{Worker: w, Caused: err})
+ go func(eb *errBuffer) {
+ for {
+ select {
+ case <-eb.update:
+ eb.wait.Reset(WaitDuration)
+ case <-eb.wait.C:
+ eb.mu.Lock()
+ if len(eb.buf) > eb.last {
+ eb.logCallback(eb.buf[eb.last:])
+ eb.buf = eb.buf[0:0]
+ eb.last = len(eb.buf)
}
- }
-
- err := w.err.Close()
- if err != nil {
- w.err.lsn(EventWorkerError, WorkerError{Worker: w, Caused: err})
+ eb.mu.Unlock()
+ case <-eb.stop:
+ eb.wait.Stop()
+
+ eb.mu.Lock()
+ if len(eb.buf) > eb.last {
+ if eb == nil || eb.logCallback == nil {
+ eb.mu.Unlock()
+ return
+ }
+ eb.logCallback(eb.buf[eb.last:])
+ eb.last = len(eb.buf)
+ }
+ eb.mu.Unlock()
+ return
}
}
- }()
+ }(eb)
- return nil
+ return eb
}
-func (w *Worker) execPayload(rqs *Payload) (rsp *Payload, err error) {
- // two things
- if err := sendControl(w.rl, rqs.Context); err != nil {
- return nil, errors.Wrap(err, "header error")
- }
+// Len returns the number of buf of the unread portion of the errBuffer;
+// buf.Len() == len(buf.Bytes()).
+func (eb *errBuffer) Len() int {
+ eb.mu.RLock()
+ defer eb.mu.RUnlock()
- if err = w.rl.Send(rqs.Body, 0); err != nil {
- return nil, errors.Wrap(err, "sender error")
- }
+ // currently active message
+ return len(eb.buf)
+}
- var pr goridge.Prefix
- rsp = new(Payload)
+// Write appends the contents of pool to the errBuffer, growing the errBuffer as
+// needed. The return value n is the length of pool; errBuffer is always nil.
+func (eb *errBuffer) Write(p []byte) (int, error) {
+ eb.mu.Lock()
+ eb.buf = append(eb.buf, p...)
+ eb.mu.Unlock()
+ eb.update <- nil
- if rsp.Context, pr, err = w.rl.Receive(); err != nil {
- return nil, errors.Wrap(err, "worker error")
- }
-
- if !pr.HasFlag(goridge.PayloadControl) {
- return nil, fmt.Errorf("malformed worker response")
- }
+ return len(p), nil
+}
- if pr.HasFlag(goridge.PayloadError) {
- return nil, JobError(rsp.Context)
- }
+// Strings fetches all errBuffer data into string.
+func (eb *errBuffer) String() string {
+ eb.mu.Lock()
+ defer eb.mu.Unlock()
- // add streaming support :)
- if rsp.Body, pr, err = w.rl.Receive(); err != nil {
- return nil, errors.Wrap(err, "worker error")
- }
+ // TODO unsafe operation, use runes
+ return string(eb.buf)
+}
- return rsp, nil
+// Close aggregation timer.
+func (eb *errBuffer) Close() {
+ close(eb.stop)
}