summaryrefslogtreecommitdiff
path: root/worker/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'worker/worker.go')
-rwxr-xr-xworker/worker.go43
1 files changed, 24 insertions, 19 deletions
diff --git a/worker/worker.go b/worker/worker.go
index 38a1e9ac..05c6dd0d 100755
--- a/worker/worker.go
+++ b/worker/worker.go
@@ -12,18 +12,24 @@ import (
"github.com/spiral/goridge/v3/pkg/relay"
"github.com/spiral/roadrunner/v2/events"
"github.com/spiral/roadrunner/v2/internal"
+ "github.com/spiral/roadrunner/v2/utils"
"go.uber.org/multierr"
)
type Options func(p *Process)
+const (
+ workerEventsName string = "worker"
+)
+
// Process - supervised process with api over goridge.Relay.
type Process struct {
// created indicates at what time Process has been created.
created time.Time
// updates parent supervisor or pool about Process events
- events events.Handler
+ events events.EventBus
+ eventsID string
// state holds information about current Process state,
// number of Process executions, buf status change time.
@@ -49,11 +55,14 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
if cmd.Process != nil {
return nil, fmt.Errorf("can't attach to running process")
}
+
+ eb, id := events.Bus()
w := &Process{
- created: time.Now(),
- events: events.NewEventsHandler(),
- cmd: cmd,
- state: NewWorkerState(StateInactive),
+ created: time.Now(),
+ events: eb,
+ eventsID: id,
+ cmd: cmd,
+ state: NewWorkerState(StateInactive),
}
// set self as stderr implementation (Writer interface)
@@ -67,14 +76,6 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) {
return w, nil
}
-func AddListeners(listeners ...events.Listener) Options {
- return func(p *Process) {
- for i := 0; i < len(listeners); i++ {
- p.addListener(listeners[i])
- }
- }
-}
-
// Pid returns worker pid.
func (w *Process) Pid() int64 {
return int64(w.pid)
@@ -85,11 +86,6 @@ func (w *Process) Created() time.Time {
return w.created
}
-// AddListener registers new worker event listener.
-func (w *Process) addListener(listener events.Listener) {
- w.events.AddListener(listener)
-}
-
// State return receive-only Process state object, state can be used to safely access
// Process status, time when status changed and number of Process executions.
func (w *Process) State() State {
@@ -139,6 +135,7 @@ func (w *Process) Wait() error {
const op = errors.Op("process_wait")
var err error
err = w.cmd.Wait()
+ defer w.events.Unsubscribe(w.eventsID)
// If worker was destroyed, just exit
if w.State().Value() == StateDestroyed {
@@ -187,9 +184,13 @@ func (w *Process) Stop() error {
if err != nil {
w.state.Set(StateKilling)
_ = w.cmd.Process.Signal(os.Kill)
+
+ w.events.Unsubscribe(w.eventsID)
return errors.E(op, errors.Network, err)
}
+
w.state.Set(StateStopped)
+ w.events.Unsubscribe(w.eventsID)
return nil
}
@@ -201,6 +202,8 @@ func (w *Process) Kill() error {
if err != nil {
return err
}
+
+ w.events.Unsubscribe(w.eventsID)
return nil
}
@@ -210,11 +213,13 @@ func (w *Process) Kill() error {
return err
}
w.state.Set(StateStopped)
+
+ w.events.Unsubscribe(w.eventsID)
return nil
}
// Worker stderr
func (w *Process) Write(p []byte) (n int, err error) {
- w.events.Push(events.WorkerEvent{Event: events.EventWorkerStderr, Worker: w, Payload: p})
+ w.events.Send(events.NewEvent(events.EventWorkerStderr, workerEventsName, utils.AsString(p)))
return len(p), nil
}