diff options
Diffstat (limited to 'worker.go')
-rwxr-xr-x | worker.go | 370 |
1 files changed, 0 insertions, 370 deletions
diff --git a/worker.go b/worker.go deleted file mode 100755 index 07c1e5c8..00000000 --- a/worker.go +++ /dev/null @@ -1,370 +0,0 @@ -package roadrunner - -import ( - "bytes" - "context" - "fmt" - "io" - "os" - "os/exec" - "strconv" - "strings" - "sync" - "time" - - "github.com/spiral/errors" - "github.com/spiral/roadrunner/v2/util" - - "github.com/spiral/goridge/v3" - "go.uber.org/multierr" -) - -const ( - // WaitDuration - for how long error buffer should attempt to aggregate error messages - // before merging output together since lastError update (required to keep error update together). - WaitDuration = 25 * time.Millisecond - - // ReadBufSize used to make a slice with specified length to read from stderr - ReadBufSize = 10240 // Kb -) - -// EventWorkerKill thrown after WorkerProcess is being forcefully killed. -const ( - // EventWorkerError triggered after WorkerProcess. Except payload to be error. - EventWorkerError Event = iota + 200 - - // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. - EventWorkerLog -) - -type Event int64 - -func (ev Event) String() string { - switch ev { - case EventWorkerError: - return "EventWorkerError" - case EventWorkerLog: - return "EventWorkerLog" - } - return "Unknown event type" -} - -// WorkerEvent wraps worker events. -type WorkerEvent struct { - // Event id, see below. - Event Event - - // Worker triggered the event. - Worker WorkerBase - - // Event specific payload. - Payload interface{} -} - -var pool = sync.Pool{ - New: func() interface{} { - buf := make([]byte, ReadBufSize) - return &buf - }, -} - -type WorkerBase interface { - fmt.Stringer - - // Pid returns worker pid. - Pid() int64 - - // Created returns time worker was created at. - Created() time.Time - - // AddListener attaches listener to consume worker events. - AddListener(listener util.EventListener) - - // State return receive-only WorkerProcess state object, state can be used to safely access - // WorkerProcess status, time when status changed and number of WorkerProcess executions. - State() State - - // Start used to run Cmd and immediately return - Start() error - - // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is - // complete and will return process error (if any), if stderr is presented it's value - // will be wrapped as WorkerError. Method will return error code if php process fails - // to find or Start the script. - Wait() error - - // Stop sends soft termination command to the WorkerProcess and waits for process completion. - Stop(ctx context.Context) error - - // Kill kills underlying process, make sure to call Wait() func to gather - // error log from the stderr. Does not waits for process completion! - Kill() error - - // Relay returns attached to worker goridge relay - Relay() goridge.Relay - - // AttachRelay used to attach goridge relay to the worker process - AttachRelay(rl goridge.Relay) -} - -// WorkerProcess - supervised process with api over goridge.Relay. -type WorkerProcess struct { - // created indicates at what time WorkerProcess has been created. - created time.Time - - // updates parent supervisor or pool about WorkerProcess events - events util.EventsHandler - - // state holds information about current WorkerProcess state, - // number of WorkerProcess executions, buf status change time. - // publicly this object is receive-only and protected using Mutex - // and atomic counter. - state *state - - // underlying command with associated process, command must be - // provided to WorkerProcess from outside in non-started form. CmdSource - // stdErr direction will be handled by WorkerProcess to aggregate error message. - cmd *exec.Cmd - - // pid of the process, points to pid of underlying process and - // can be nil while process is not started. - pid int - - // stderr aggregates stderr output from underlying process. Value can be - // receive only once command is completed and all pipes are closed. - stderr *bytes.Buffer - - // channel is being closed once command is complete. - // waitDone chan interface{} - - // contains information about resulted process state. - endState *os.ProcessState - - // ensures than only one execution can be run at once. - mu sync.RWMutex - - // communication bus with underlying process. - relay goridge.Relay - rd io.Reader - stop chan struct{} -} - -// InitBaseWorker creates new WorkerProcess over given exec.cmd. -func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) { - if cmd.Process != nil { - return nil, fmt.Errorf("can't attach to running process") - } - w := &WorkerProcess{ - created: time.Now(), - events: &util.EventHandler{}, - cmd: cmd, - state: newState(StateInactive), - stderr: new(bytes.Buffer), - stop: make(chan struct{}, 1), - } - - w.rd, w.cmd.Stderr = io.Pipe() - - // small buffer optimization - // at this point we know, that stderr will contain huge messages - w.stderr.Grow(ReadBufSize) - - go func() { - w.watch() - }() - - return w, nil -} - -// Pid returns worker pid. -func (w *WorkerProcess) Pid() int64 { - return int64(w.pid) -} - -// Created returns time worker was created at. -func (w *WorkerProcess) Created() time.Time { - return w.created -} - -// AddListener registers new worker event listener. -func (w *WorkerProcess) AddListener(listener util.EventListener) { - w.events.AddListener(listener) -} - -// State return receive-only WorkerProcess state object, state can be used to safely access -// WorkerProcess status, time when status changed and number of WorkerProcess executions. -func (w *WorkerProcess) State() State { - return w.state -} - -// State return receive-only WorkerProcess state object, state can be used to safely access -// WorkerProcess status, time when status changed and number of WorkerProcess executions. -func (w *WorkerProcess) AttachRelay(rl goridge.Relay) { - w.relay = rl -} - -// State return receive-only WorkerProcess state object, state can be used to safely access -// WorkerProcess status, time when status changed and number of WorkerProcess executions. -func (w *WorkerProcess) Relay() goridge.Relay { - return w.relay -} - -// String returns WorkerProcess description. fmt.Stringer interface -func (w *WorkerProcess) String() string { - st := w.state.String() - // we can safely compare pid to 0 - if w.pid != 0 { - st = st + ", pid:" + strconv.Itoa(w.pid) - } - - return fmt.Sprintf( - "(`%s` [%s], numExecs: %v)", - strings.Join(w.cmd.Args, " "), - st, - w.state.NumExecs(), - ) -} - -func (w *WorkerProcess) Start() error { - err := w.cmd.Start() - if err != nil { - return err - } - w.pid = w.cmd.Process.Pid - return nil -} - -// Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is -// complete and will return process error (if any), if stderr is presented it's value -// will be wrapped as WorkerError. Method will return error code if php process fails -// to find or Start the script. -func (w *WorkerProcess) Wait() error { - const op = errors.Op("worker process wait") - err := multierr.Combine(w.cmd.Wait()) - - // at this point according to the documentation (see cmd.Wait comment) - // if worker finishes with an error, message will be written to the stderr first - // and then w.cmd.Wait return an error - w.endState = w.cmd.ProcessState - if err != nil { - w.state.Set(StateErrored) - - w.mu.RLock() - // if process return code > 0, here will be an error from stderr (if presents) - if w.stderr.Len() > 0 { - err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String()))) - // stop the stderr buffer - w.stop <- struct{}{} - } - w.mu.RUnlock() - - return multierr.Append(err, w.closeRelay()) - } - - err = multierr.Append(err, w.closeRelay()) - if err != nil { - w.state.Set(StateErrored) - return err - } - - if w.endState.Success() { - w.state.Set(StateStopped) - } - - return nil -} - -func (w *WorkerProcess) closeRelay() error { - if w.relay != nil { - err := w.relay.Close() - if err != nil { - return err - } - } - return nil -} - -// Stop sends soft termination command to the WorkerProcess and waits for process completion. -func (w *WorkerProcess) Stop(ctx context.Context) error { - c := make(chan error) - - go func() { - var err error - w.state.Set(StateStopping) - err = multierr.Append(err, sendControl(w.relay, &stopCommand{Stop: true})) - if err != nil { - w.state.Set(StateKilling) - c <- multierr.Append(err, w.cmd.Process.Kill()) - } - w.state.Set(StateStopped) - c <- nil - }() - - select { - case <-ctx.Done(): - return ctx.Err() - case err := <-c: - if err != nil { - return err - } - return nil - } -} - -// Kill kills underlying process, make sure to call Wait() func to gather -// error log from the stderr. Does not waits for process completion! -func (w *WorkerProcess) Kill() error { - w.state.Set(StateKilling) - err := w.cmd.Process.Signal(os.Kill) - if err != nil { - return err - } - w.state.Set(StateStopped) - return nil -} - -// put the pointer, to not allocate new slice -// but erase it len and then return back -func (w *WorkerProcess) put(data *[]byte) { - *data = (*data)[:0] - *data = (*data)[:cap(*data)] - - pool.Put(data) -} - -// get pointer to the byte slice -func (w *WorkerProcess) get() *[]byte { - return pool.Get().(*[]byte) -} - -// Write appends the contents of pool to the errBuffer, growing the errBuffer as -// needed. The return value n is the length of pool; errBuffer is always nil. -func (w *WorkerProcess) watch() { - go func() { - for { - select { - case <-w.stop: - buf := w.get() - // read the last data - n, _ := w.rd.Read(*buf) - w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) - w.mu.Lock() - // write new message - w.stderr.Write((*buf)[:n]) - w.mu.Unlock() - w.put(buf) - return - default: - // read the max 10kb of stderr per one read - buf := w.get() - n, _ := w.rd.Read(*buf) - w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: (*buf)[:n]}) - w.mu.Lock() - // write new message - w.stderr.Write((*buf)[:n]) - w.mu.Unlock() - w.put(buf) - } - } - }() -} |