diff options
author | Valery Piashchynski <[email protected]> | 2021-10-26 19:22:09 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2021-10-26 19:22:09 +0300 |
commit | 9d42e1d430c45a21b8eed86cc3d36817f7deeb64 (patch) | |
tree | 8fa981011ffb2f4bd9ca685b4935b5c35d7d368f /worker | |
parent | 160055c16d4c1ca1e0e19853cbb89ef3509c7556 (diff) |
Events package update
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'worker')
-rwxr-xr-x | worker/worker.go | 49 |
1 files changed, 30 insertions, 19 deletions
diff --git a/worker/worker.go b/worker/worker.go index 38a1e9ac..5973adc6 100755 --- a/worker/worker.go +++ b/worker/worker.go @@ -12,18 +12,24 @@ import ( "github.com/spiral/goridge/v3/pkg/relay" "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/internal" + "github.com/spiral/roadrunner/v2/utils" "go.uber.org/multierr" ) type Options func(p *Process) +const ( + workerEventsName string = "worker" +) + // Process - supervised process with api over goridge.Relay. type Process struct { // created indicates at what time Process has been created. created time.Time // updates parent supervisor or pool about Process events - events events.Handler + events events.EventBus + eventsID string // state holds information about current Process state, // number of Process executions, buf status change time. @@ -49,11 +55,14 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { if cmd.Process != nil { return nil, fmt.Errorf("can't attach to running process") } + + eb, id := events.Bus() w := &Process{ - created: time.Now(), - events: events.NewEventsHandler(), - cmd: cmd, - state: NewWorkerState(StateInactive), + created: time.Now(), + events: eb, + eventsID: id, + cmd: cmd, + state: NewWorkerState(StateInactive), } // set self as stderr implementation (Writer interface) @@ -67,14 +76,6 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { return w, nil } -func AddListeners(listeners ...events.Listener) Options { - return func(p *Process) { - for i := 0; i < len(listeners); i++ { - p.addListener(listeners[i]) - } - } -} - // Pid returns worker pid. func (w *Process) Pid() int64 { return int64(w.pid) @@ -85,11 +86,6 @@ func (w *Process) Created() time.Time { return w.created } -// AddListener registers new worker event listener. -func (w *Process) addListener(listener events.Listener) { - w.events.AddListener(listener) -} - // State return receive-only Process state object, state can be used to safely access // Process status, time when status changed and number of Process executions. func (w *Process) State() State { @@ -166,6 +162,8 @@ func (w *Process) Wait() error { return nil } + w.events.Unsubscribe(w.eventsID) + return err } @@ -187,9 +185,13 @@ func (w *Process) Stop() error { if err != nil { w.state.Set(StateKilling) _ = w.cmd.Process.Signal(os.Kill) + + w.events.Unsubscribe(w.eventsID) return errors.E(op, errors.Network, err) } + w.state.Set(StateStopped) + w.events.Unsubscribe(w.eventsID) return nil } @@ -201,6 +203,8 @@ func (w *Process) Kill() error { if err != nil { return err } + + w.events.Unsubscribe(w.eventsID) return nil } @@ -210,11 +214,18 @@ func (w *Process) Kill() error { return err } w.state.Set(StateStopped) + + w.events.Unsubscribe(w.eventsID) return nil } // Worker stderr func (w *Process) Write(p []byte) (n int, err error) { - w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p}) + w.events.Send(&events.RREvent{ + T: events.EventWorkerStderr, + P: workerEventsName, + M: utils.AsString(p), + }) + return len(p), nil } |