diff options
Diffstat (limited to 'worker/worker.go')
-rwxr-xr-x | worker/worker.go | 43 |
1 files changed, 24 insertions, 19 deletions
diff --git a/worker/worker.go b/worker/worker.go index 38a1e9ac..05c6dd0d 100755 --- a/worker/worker.go +++ b/worker/worker.go @@ -12,18 +12,24 @@ import ( "github.com/spiral/goridge/v3/pkg/relay" "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/utils" "go.uber.org/multierr" ) type Options func(p *Process) +const ( + workerEventsName string = "worker" +) + // Process - supervised process with api over goridge.Relay. type Process struct { // created indicates at what time Process has been created. created time.Time // updates parent supervisor or pool about Process events - events events.Handler + events events.EventBus + eventsID string // state holds information about current Process state, // number of Process executions, buf status change time. @@ -49,11 +55,14 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { if cmd.Process != nil { return nil, fmt.Errorf("can't attach to running process") } + + eb, id := events.Bus() w := &Process{ - created: time.Now(), - events: events.NewEventsHandler(), - cmd: cmd, - state: NewWorkerState(StateInactive), + created: time.Now(), + events: eb, + eventsID: id, + cmd: cmd, + state: NewWorkerState(StateInactive), } // set self as stderr implementation (Writer interface) @@ -67,14 +76,6 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { return w, nil } -func AddListeners(listeners ...events.Listener) Options { - return func(p *Process) { - for i := 0; i < len(listeners); i++ { - p.addListener(listeners[i]) - } - } -} - // Pid returns worker pid. func (w *Process) Pid() int64 { return int64(w.pid) @@ -85,11 +86,6 @@ func (w *Process) Created() time.Time { return w.created } -// AddListener registers new worker event listener. -func (w *Process) addListener(listener events.Listener) { - w.events.AddListener(listener) -} - // State return receive-only Process state object, state can be used to safely access // Process status, time when status changed and number of Process executions. func (w *Process) State() State { @@ -139,6 +135,7 @@ func (w *Process) Wait() error { const op = errors.Op("process_wait") var err error err = w.cmd.Wait() + defer w.events.Unsubscribe(w.eventsID) // If worker was destroyed, just exit if w.State().Value() == StateDestroyed { @@ -187,9 +184,13 @@ func (w *Process) Stop() error { if err != nil { w.state.Set(StateKilling) _ = w.cmd.Process.Signal(os.Kill) + + w.events.Unsubscribe(w.eventsID) return errors.E(op, errors.Network, err) } + w.state.Set(StateStopped) + w.events.Unsubscribe(w.eventsID) return nil } @@ -201,6 +202,8 @@ func (w *Process) Kill() error { if err != nil { return err } + + w.events.Unsubscribe(w.eventsID) return nil } @@ -210,11 +213,13 @@ func (w *Process) Kill() error { return err } w.state.Set(StateStopped) + + w.events.Unsubscribe(w.eventsID) return nil } // Worker stderr func (w *Process) Write(p []byte) (n int, err error) { - w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p}) + w.events.Send(events.NewEvent(events.EventWorkerStderr, workerEventsName, utils.AsString(p))) return len(p), nil } |