diff options
Diffstat (limited to 'worker/worker.go')
-rwxr-xr-x | worker/worker.go | 220 |
1 files changed, 220 insertions, 0 deletions
diff --git a/worker/worker.go b/worker/worker.go new file mode 100755 index 00000000..38a1e9ac --- /dev/null +++ b/worker/worker.go @@ -0,0 +1,220 @@ +package worker + +import ( + "fmt" + "os" + "os/exec" + "strconv" + "strings" + "time" + + "github.com/spiral/errors" + "github.com/spiral/goridge/v3/pkg/relay" + "github.com/spiral/roadrunner/v2/events" + "github.com/spiral/roadrunner/v2/internal" + "go.uber.org/multierr" +) + +type Options func(p *Process) + +// Process - supervised process with api over goridge.Relay. +type Process struct { + // created indicates at what time Process has been created. + created time.Time + + // updates parent supervisor or pool about Process events + events events.Handler + + // state holds information about current Process state, + // number of Process executions, buf status change time. + // publicly this object is receive-only and protected using Mutex + // and atomic counter. + state *StateImpl + + // underlying command with associated process, command must be + // provided to Process from outside in non-started form. CmdSource + // stdErr direction will be handled by Process to aggregate error message. + cmd *exec.Cmd + + // pid of the process, points to pid of underlying process and + // can be nil while process is not started. + pid int + + // communication bus with underlying process. + relay relay.Relay +} + +// InitBaseWorker creates new Process over given exec.cmd. +func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { + if cmd.Process != nil { + return nil, fmt.Errorf("can't attach to running process") + } + w := &Process{ + created: time.Now(), + events: events.NewEventsHandler(), + cmd: cmd, + state: NewWorkerState(StateInactive), + } + + // set self as stderr implementation (Writer interface) + w.cmd.Stderr = w + + // add options + for i := 0; i < len(options); i++ { + options[i](w) + } + + return w, nil +} + +func AddListeners(listeners ...events.Listener) Options { + return func(p *Process) { + for i := 0; i < len(listeners); i++ { + p.addListener(listeners[i]) + } + } +} + +// Pid returns worker pid. +func (w *Process) Pid() int64 { + return int64(w.pid) +} + +// Created returns time worker was created at. +func (w *Process) Created() time.Time { + return w.created +} + +// AddListener registers new worker event listener. +func (w *Process) addListener(listener events.Listener) { + w.events.AddListener(listener) +} + +// State return receive-only Process state object, state can be used to safely access +// Process status, time when status changed and number of Process executions. +func (w *Process) State() State { + return w.state +} + +// AttachRelay attaches relay to the worker +func (w *Process) AttachRelay(rl relay.Relay) { + w.relay = rl +} + +// Relay returns relay attached to the worker +func (w *Process) Relay() relay.Relay { + return w.relay +} + +// String returns Process description. fmt.Stringer interface +func (w *Process) String() string { + st := w.state.String() + // we can safely compare pid to 0 + if w.pid != 0 { + st = st + ", pid:" + strconv.Itoa(w.pid) + } + + return fmt.Sprintf( + "(`%s` [%s], numExecs: %v)", + strings.Join(w.cmd.Args, " "), + st, + w.state.NumExecs(), + ) +} + +func (w *Process) Start() error { + err := w.cmd.Start() + if err != nil { + return err + } + w.pid = w.cmd.Process.Pid + return nil +} + +// Wait must be called once for each Process, call will be released once Process is +// complete and will return process error (if any), if stderr is presented it's value +// will be wrapped as WorkerError. Method will return error code if php process fails +// to find or Start the script. +func (w *Process) Wait() error { + const op = errors.Op("process_wait") + var err error + err = w.cmd.Wait() + + // If worker was destroyed, just exit + if w.State().Value() == StateDestroyed { + return nil + } + + // If state is different, and err is not nil, append it to the errors + if err != nil { + w.State().Set(StateErrored) + err = multierr.Combine(err, errors.E(op, err)) + } + + // closeRelay + // at this point according to the documentation (see cmd.Wait comment) + // if worker finishes with an error, message will be written to the stderr first + // and then process.cmd.Wait return an error + err2 := w.closeRelay() + if err2 != nil { + w.State().Set(StateErrored) + return multierr.Append(err, errors.E(op, err2)) + } + + if w.cmd.ProcessState.Success() { + w.State().Set(StateStopped) + return nil + } + + return err +} + +func (w *Process) closeRelay() error { + if w.relay != nil { + err := w.relay.Close() + if err != nil { + return err + } + } + return nil +} + +// Stop sends soft termination command to the Process and waits for process completion. +func (w *Process) Stop() error { + const op = errors.Op("process_stop") + w.state.Set(StateStopping) + err := internal.SendControl(w.relay, &internal.StopCommand{Stop: true}) + if err != nil { + w.state.Set(StateKilling) + _ = w.cmd.Process.Signal(os.Kill) + return errors.E(op, errors.Network, err) + } + w.state.Set(StateStopped) + return nil +} + +// Kill kills underlying process, make sure to call Wait() func to gather +// error log from the stderr. Does not wait for process completion! +func (w *Process) Kill() error { + if w.State().Value() == StateDestroyed { + err := w.cmd.Process.Signal(os.Kill) + if err != nil { + return err + } + return nil + } + + w.state.Set(StateKilling) + err := w.cmd.Process.Signal(os.Kill) + if err != nil { + return err + } + w.state.Set(StateStopped) + return nil +} + +// Worker stderr +func (w *Process) Write(p []byte) (n int, err error) { + w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p}) + return len(p), nil +} |