diff options
Diffstat (limited to 'worker/worker.go')
-rwxr-xr-x | worker/worker.go | 51 |
1 files changed, 26 insertions, 25 deletions
diff --git a/worker/worker.go b/worker/worker.go index b2689c59..8ca55a3b 100755 --- a/worker/worker.go +++ b/worker/worker.go @@ -10,26 +10,18 @@ import ( "github.com/spiral/errors" "github.com/spiral/goridge/v3/pkg/relay" - "github.com/spiral/roadrunner/v2/events" "github.com/spiral/roadrunner/v2/internal" - "github.com/spiral/roadrunner/v2/utils" "go.uber.org/multierr" + "go.uber.org/zap" ) type Options func(p *Process) -const ( - workerEventsName string = "worker" -) - // Process - supervised process with api over goridge.Relay. type Process struct { // created indicates at what time Process has been created. created time.Time - - // updates parent supervisor or pool about Process events - events events.EventBus - eventsID string + log *zap.Logger // state holds information about current Process state, // number of Process executions, buf status change time. @@ -57,27 +49,39 @@ func InitBaseWorker(cmd *exec.Cmd, options ...Options) (*Process, error) { return nil, fmt.Errorf("can't attach to running process") } - eb, id := events.Bus() w := &Process{ - created: time.Now(), - events: eb, - eventsID: id, - cmd: cmd, - state: NewWorkerState(StateInactive), - doneCh: make(chan struct{}, 1), + created: time.Now(), + cmd: cmd, + state: NewWorkerState(StateInactive), + doneCh: make(chan struct{}, 1), } - // set self as stderr implementation (Writer interface) - w.cmd.Stderr = w - // add options for i := 0; i < len(options); i++ { options[i](w) } + if w.log == nil { + z, err := zap.NewDevelopment() + if err != nil { + return nil, err + } + + w.log = z + } + + // set self as stderr implementation (Writer interface) + w.cmd.Stderr = w + return w, nil } +func WithLog(z *zap.Logger) Options { + return func(p *Process) { + p.log = z + } +} + // Pid returns worker pid. func (w *Process) Pid() int64 { return int64(w.pid) @@ -137,7 +141,6 @@ func (w *Process) Wait() error { const op = errors.Op("process_wait") var err error err = w.cmd.Wait() - defer w.events.Unsubscribe(w.eventsID) w.doneCh <- struct{}{} // If worker was destroyed, just exit @@ -182,7 +185,6 @@ func (w *Process) closeRelay() error { // Stop sends soft termination command to the Process and waits for process completion. func (w *Process) Stop() error { const op = errors.Op("process_stop") - defer w.events.Unsubscribe(w.eventsID) select { // finished @@ -213,7 +215,6 @@ func (w *Process) Kill() error { return err } - w.events.Unsubscribe(w.eventsID) return nil } @@ -223,12 +224,12 @@ func (w *Process) Kill() error { return err } w.state.Set(StateStopped) - w.events.Unsubscribe(w.eventsID) return nil } // Worker stderr func (w *Process) Write(p []byte) (n int, err error) { - w.events.Send(events.NewEvent(events.EventWorkerStderr, workerEventsName, utils.AsString(p))) + // unsafe to use utils.AsString + w.log.Info(string(p)) return len(p), nil } |