diff options
Diffstat (limited to 'worker.go')
-rw-r--r-- | worker.go | 99 |
1 files changed, 49 insertions, 50 deletions
@@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/spiral/roadrunner/v2/util" "os" "os/exec" "strconv" @@ -15,6 +16,12 @@ import ( "go.uber.org/multierr" ) +const ( + // WaitDuration - for how long error buffer should attempt to aggregate error messages + // before merging output together since lastError update (required to keep error update together). + WaitDuration = 25 * time.Millisecond +) + // EventWorkerKill thrown after WorkerProcess is being forcefully killed. const ( // EventWorkerError triggered after WorkerProcess. Except payload to be error. @@ -22,38 +29,31 @@ const ( // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. EventWorkerLog - - // EventWorkerWaitDone triggered when worker exit from process Wait - EventWorkerWaitDone // todo: implemented? - - EventWorkerBufferClosed - - EventRelayCloseError - - EventWorkerProcessError ) -const ( - // WaitDuration - for how long error buffer should attempt to aggregate error messages - // before merging output together since lastError update (required to keep error update together). - WaitDuration = 100 * time.Millisecond -) - -// todo: write comment +// WorkerEvent wraps worker events. type WorkerEvent struct { - Event int64 - Worker WorkerBase + // Event id, see below. + Event int64 + + // Worker triggered the event. + Worker WorkerBase + + // Event specific payload. Payload interface{} } type WorkerBase interface { fmt.Stringer - Created() time.Time + // Pid returns worker pid. + Pid() int64 - Events() <-chan WorkerEvent + // Created returns time worker was created at. + Created() time.Time - Pid() int64 + // AddListener attaches listener to consume worker events. + AddListener(listener util.EventListener) // State return receive-only WorkerProcess state object, state can be used to safely access // WorkerProcess status, time when status changed and number of WorkerProcess executions. @@ -88,7 +88,7 @@ type WorkerProcess struct { created time.Time // updates parent supervisor or pool about WorkerProcess events - events chan WorkerEvent + events *util.EventHandler // state holds information about current WorkerProcess state, // number of WorkerProcess executions, buf status change time. @@ -129,7 +129,7 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) { } w := &WorkerProcess{ created: time.Now(), - events: make(chan WorkerEvent, 10), + events: &util.EventHandler{}, cmd: cmd, state: newState(StateInactive), } @@ -142,12 +142,23 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) { return w, nil } +// Pid returns worker pid. +func (w *WorkerProcess) Pid() int64 { + return int64(w.pid) +} + +// Created returns time worker was created at. func (w *WorkerProcess) Created() time.Time { return w.created } -func (w *WorkerProcess) Pid() int64 { - return int64(w.pid) +// AddListener registers new worker event listener. +func (w *WorkerProcess) AddListener(listener util.EventListener) { + w.events.AddListener(listener) + + w.errBuffer.mu.Lock() + w.errBuffer.enable = true + w.errBuffer.mu.Unlock() } // State return receive-only WorkerProcess state object, state can be used to safely access @@ -195,10 +206,6 @@ func (w *WorkerProcess) Start() error { return nil } -func (w *WorkerProcess) Events() <-chan WorkerEvent { - return w.events -} - // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is // complete and will return process error (if any), if stderr is presented it's value // will be wrapped as WorkerError. Method will return error code if php process fails @@ -208,15 +215,8 @@ func (w *WorkerProcess) Wait(ctx context.Context) error { w.endState = w.cmd.ProcessState if err != nil { w.state.Set(StateErrored) - // if there are messages in the events channel, read it - // TODO potentially danger place - if len(w.events) > 0 { - select { - case ev := <-w.events: - err = multierr.Append(err, errors.New(string(ev.Payload.([]byte)))) - } - } - // if no errors in the events, error might be in the errbuffer + + // if no errors in the events, error might be in the errBuffer if w.errBuffer.Len() > 0 { err = multierr.Append(err, errors.New(w.errBuffer.String())) } @@ -250,6 +250,7 @@ func (w *WorkerProcess) closeRelay() error { // Stop sends soft termination command to the WorkerProcess and waits for process completion. func (w *WorkerProcess) Stop(ctx context.Context) error { c := make(chan error) + go func() { var err error w.errBuffer.Close() @@ -264,6 +265,7 @@ func (w *WorkerProcess) Stop(ctx context.Context) error { w.state.Set(StateStopped) c <- nil }() + select { case <-ctx.Done(): return ctx.Err() @@ -290,16 +292,17 @@ func (w *WorkerProcess) Kill(ctx context.Context) error { } func (w *WorkerProcess) logCallback(log []byte) { - w.events <- WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log} + w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log}) } // thread safe errBuffer type errBuffer struct { - mu sync.RWMutex - buf []byte - last int - wait *time.Timer - // todo remove update + enable bool + mu sync.RWMutex + buf []byte + last int + wait *time.Timer + // todo: remove update update chan interface{} stop chan interface{} logCallback func(log []byte) @@ -321,7 +324,7 @@ func newErrBuffer(logCallback func(log []byte)) *errBuffer { eb.wait.Reset(WaitDuration) case <-eb.wait.C: eb.mu.Lock() - if len(eb.buf) > eb.last { + if eb.enable && len(eb.buf) > eb.last { eb.logCallback(eb.buf[eb.last:]) eb.buf = eb.buf[0:0] eb.last = len(eb.buf) @@ -331,11 +334,7 @@ func newErrBuffer(logCallback func(log []byte)) *errBuffer { eb.wait.Stop() eb.mu.Lock() - if len(eb.buf) > eb.last { - if eb == nil || eb.logCallback == nil { - eb.mu.Unlock() - return - } + if eb.enable && len(eb.buf) > eb.last { eb.logCallback(eb.buf[eb.last:]) eb.last = len(eb.buf) } |