summaryrefslogtreecommitdiff
path: root/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'worker.go')
-rw-r--r--worker.go99
1 files changed, 49 insertions, 50 deletions
diff --git a/worker.go b/worker.go
index c0a735c2..05b5712d 100644
--- a/worker.go
+++ b/worker.go
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
+ "github.com/spiral/roadrunner/v2/util"
"os"
"os/exec"
"strconv"
@@ -15,6 +16,12 @@ import (
"go.uber.org/multierr"
)
+const (
+ // WaitDuration - for how long error buffer should attempt to aggregate error messages
+ // before merging output together since lastError update (required to keep error update together).
+ WaitDuration = 25 * time.Millisecond
+)
+
// EventWorkerKill thrown after WorkerProcess is being forcefully killed.
const (
// EventWorkerError triggered after WorkerProcess. Except payload to be error.
@@ -22,38 +29,31 @@ const (
// EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
EventWorkerLog
-
- // EventWorkerWaitDone triggered when worker exit from process Wait
- EventWorkerWaitDone // todo: implemented?
-
- EventWorkerBufferClosed
-
- EventRelayCloseError
-
- EventWorkerProcessError
)
-const (
- // WaitDuration - for how long error buffer should attempt to aggregate error messages
- // before merging output together since lastError update (required to keep error update together).
- WaitDuration = 100 * time.Millisecond
-)
-
-// todo: write comment
+// WorkerEvent wraps worker events.
type WorkerEvent struct {
- Event int64
- Worker WorkerBase
+ // Event id, see below.
+ Event int64
+
+ // Worker triggered the event.
+ Worker WorkerBase
+
+ // Event specific payload.
Payload interface{}
}
type WorkerBase interface {
fmt.Stringer
- Created() time.Time
+ // Pid returns worker pid.
+ Pid() int64
- Events() <-chan WorkerEvent
+ // Created returns time worker was created at.
+ Created() time.Time
- Pid() int64
+ // AddListener attaches listener to consume worker events.
+ AddListener(listener util.EventListener)
// State return receive-only WorkerProcess state object, state can be used to safely access
// WorkerProcess status, time when status changed and number of WorkerProcess executions.
@@ -88,7 +88,7 @@ type WorkerProcess struct {
created time.Time
// updates parent supervisor or pool about WorkerProcess events
- events chan WorkerEvent
+ events *util.EventHandler
// state holds information about current WorkerProcess state,
// number of WorkerProcess executions, buf status change time.
@@ -129,7 +129,7 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) {
}
w := &WorkerProcess{
created: time.Now(),
- events: make(chan WorkerEvent, 10),
+ events: &util.EventHandler{},
cmd: cmd,
state: newState(StateInactive),
}
@@ -142,12 +142,23 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) {
return w, nil
}
+// Pid returns worker pid.
+func (w *WorkerProcess) Pid() int64 {
+ return int64(w.pid)
+}
+
+// Created returns time worker was created at.
func (w *WorkerProcess) Created() time.Time {
return w.created
}
-func (w *WorkerProcess) Pid() int64 {
- return int64(w.pid)
+// AddListener registers new worker event listener.
+func (w *WorkerProcess) AddListener(listener util.EventListener) {
+ w.events.AddListener(listener)
+
+ w.errBuffer.mu.Lock()
+ w.errBuffer.enable = true
+ w.errBuffer.mu.Unlock()
}
// State return receive-only WorkerProcess state object, state can be used to safely access
@@ -195,10 +206,6 @@ func (w *WorkerProcess) Start() error {
return nil
}
-func (w *WorkerProcess) Events() <-chan WorkerEvent {
- return w.events
-}
-
// Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
// complete and will return process error (if any), if stderr is presented it's value
// will be wrapped as WorkerError. Method will return error code if php process fails
@@ -208,15 +215,8 @@ func (w *WorkerProcess) Wait(ctx context.Context) error {
w.endState = w.cmd.ProcessState
if err != nil {
w.state.Set(StateErrored)
- // if there are messages in the events channel, read it
- // TODO potentially danger place
- if len(w.events) > 0 {
- select {
- case ev := <-w.events:
- err = multierr.Append(err, errors.New(string(ev.Payload.([]byte))))
- }
- }
- // if no errors in the events, error might be in the errbuffer
+
+ // if no errors in the events, error might be in the errBuffer
if w.errBuffer.Len() > 0 {
err = multierr.Append(err, errors.New(w.errBuffer.String()))
}
@@ -250,6 +250,7 @@ func (w *WorkerProcess) closeRelay() error {
// Stop sends soft termination command to the WorkerProcess and waits for process completion.
func (w *WorkerProcess) Stop(ctx context.Context) error {
c := make(chan error)
+
go func() {
var err error
w.errBuffer.Close()
@@ -264,6 +265,7 @@ func (w *WorkerProcess) Stop(ctx context.Context) error {
w.state.Set(StateStopped)
c <- nil
}()
+
select {
case <-ctx.Done():
return ctx.Err()
@@ -290,16 +292,17 @@ func (w *WorkerProcess) Kill(ctx context.Context) error {
}
func (w *WorkerProcess) logCallback(log []byte) {
- w.events <- WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log}
+ w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log})
}
// thread safe errBuffer
type errBuffer struct {
- mu sync.RWMutex
- buf []byte
- last int
- wait *time.Timer
- // todo remove update
+ enable bool
+ mu sync.RWMutex
+ buf []byte
+ last int
+ wait *time.Timer
+ // todo: remove update
update chan interface{}
stop chan interface{}
logCallback func(log []byte)
@@ -321,7 +324,7 @@ func newErrBuffer(logCallback func(log []byte)) *errBuffer {
eb.wait.Reset(WaitDuration)
case <-eb.wait.C:
eb.mu.Lock()
- if len(eb.buf) > eb.last {
+ if eb.enable && len(eb.buf) > eb.last {
eb.logCallback(eb.buf[eb.last:])
eb.buf = eb.buf[0:0]
eb.last = len(eb.buf)
@@ -331,11 +334,7 @@ func newErrBuffer(logCallback func(log []byte)) *errBuffer {
eb.wait.Stop()
eb.mu.Lock()
- if len(eb.buf) > eb.last {
- if eb == nil || eb.logCallback == nil {
- eb.mu.Unlock()
- return
- }
+ if eb.enable && len(eb.buf) > eb.last {
eb.logCallback(eb.buf[eb.last:])
eb.last = len(eb.buf)
}