summaryrefslogtreecommitdiff
path: root/worker
diff options
context:
space:
mode:
authorValery Piashchynski <[email protected]>2021-10-26 19:22:09 +0300
committerValery Piashchynski <[email protected]>2021-10-26 19:22:09 +0300
commit9d42e1d430c45a21b8eed86cc3d36817f7deeb64 (patch)
tree8fa981011ffb2f4bd9ca685b4935b5c35d7d368f /worker
parent160055c16d4c1ca1e0e19853cbb89ef3509c7556 (diff)
Events package update
Signed-off-by: Valery Piashchynski <[email protected]>
Diffstat (limited to 'worker')
-rwxr-xr-xworker/worker.go49
1 files changed, 30 insertions, 19 deletions
diff --git a/worker/worker.go b/worker/worker.go
index 38a1e9ac..5973adc6 100755
--- a/worker/worker.go
+++ b/worker/worker.go
@@ -12,18 +12,24 @@ import (
"github.com/spiral/goridge/v3/pkg/relay"
"github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/utils"
"go.uber.org/multierr"
)
type Options func(p *Process)
+const (
+ workerEventsName string = "worker"
+)
+
// Process - supervised process with api over goridge.Relay.
type Process struct {
// created indicates at what time Process has been created.
created time.Time
// updates parent supervisor or pool about Process events
- events events.Handler
+ events events.EventBus
+ eventsID string
// state holds information about current Process state,
// number of Process executions, buf status change time.
@@ -49,11 +55,14 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
if cmd.Process != nil {
return nil, fmt.Errorf("can't attach to running process")
}
+
+ eb, id := events.Bus()
w := &Process{
- created: time.Now(),
- events: events.NewEventsHandler(),
- cmd: cmd,
- state: NewWorkerState(StateInactive),
+ created: time.Now(),
+ events: eb,
+ eventsID: id,
+ cmd: cmd,
+ state: NewWorkerState(StateInactive),
}
// set self as stderr implementation (Writer interface)
@@ -67,14 +76,6 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
return w, nil
}
-func AddListeners(listeners ...events.Listener) Options {
- return func(p *Process) {
- for i := 0; i < len(listeners); i++ {
- p.addListener(listeners[i])
- }
- }
-}
-
// Pid returns worker pid.
func (w *Process) Pid() int64 {
return int64(w.pid)
@@ -85,11 +86,6 @@ func (w *Process) Created() time.Time {
return w.created
}
-// AddListener registers new worker event listener.
-func (w *Process) addListener(listener events.Listener) {
- w.events.AddListener(listener)
-}
-
// State return receive-only Process state object, state can be used to safely access
// Process status, time when status changed and number of Process executions.
func (w *Process) State() State {
@@ -166,6 +162,8 @@ func (w *Process) Wait() error {
return nil
}
+ w.events.Unsubscribe(w.eventsID)
+
return err
}
@@ -187,9 +185,13 @@ func (w *Process) Stop() error {
if err != nil {
w.state.Set(StateKilling)
_ = w.cmd.Process.Signal(os.Kill)
+
+ w.events.Unsubscribe(w.eventsID)
return errors.E(op, errors.Network, err)
}
+
w.state.Set(StateStopped)
+ w.events.Unsubscribe(w.eventsID)
return nil
}
@@ -201,6 +203,8 @@ func (w *Process) Kill() error {
if err != nil {
return err
}
+
+ w.events.Unsubscribe(w.eventsID)
return nil
}
@@ -210,11 +214,18 @@ func (w *Process) Kill() error {
return err
}
w.state.Set(StateStopped)
+
+ w.events.Unsubscribe(w.eventsID)
return nil
}
// Worker stderr
func (w *Process) Write(p []byte) (n int, err error) {
- w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p})
+ w.events.Send(&events.RREvent{
+ T: events.EventWorkerStderr,
+ P: workerEventsName,
+ M: utils.AsString(p),
+ })
+
return len(p), nil
}