summaryrefslogtreecommitdiff
path: root/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'worker.go')
-rwxr-xr-xworker.go152
1 files changed, 41 insertions, 111 deletions
diff --git a/worker.go b/worker.go
index ef532f51..402e9b90 100755
--- a/worker.go
+++ b/worker.go
@@ -1,8 +1,8 @@
package roadrunner
import (
+ "bytes"
"context"
- "errors"
"fmt"
"os"
"os/exec"
@@ -11,6 +11,7 @@ import (
"sync"
"time"
+ "github.com/spiral/errors"
"github.com/spiral/roadrunner/v2/util"
"github.com/spiral/goridge/v2"
@@ -26,16 +27,28 @@ const (
// EventWorkerKill thrown after WorkerProcess is being forcefully killed.
const (
// EventWorkerError triggered after WorkerProcess. Except payload to be error.
- EventWorkerError int64 = iota + 200
+ EventWorkerError Event = iota + 200
// EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
EventWorkerLog
)
+type Event int64
+
+func (ev Event) String() string {
+ switch ev {
+ case EventWorkerError:
+ return "EventWorkerError"
+ case EventWorkerLog:
+ return "EventWorkerLog"
+ }
+ return "Unknown event type"
+}
+
// WorkerEvent wraps worker events.
type WorkerEvent struct {
// Event id, see below.
- Event int64
+ Event Event
// Worker triggered the event.
Worker WorkerBase
@@ -67,7 +80,7 @@ type WorkerBase interface {
// complete and will return process error (if any), if stderr is presented it's value
// will be wrapped as WorkerError. Method will return error code if php process fails
// to find or Start the script.
- Wait(ctx context.Context) error
+ Wait() error
// Stop sends soft termination command to the WorkerProcess and waits for process completion.
Stop(ctx context.Context) error
@@ -89,7 +102,7 @@ type WorkerProcess struct {
created time.Time
// updates parent supervisor or pool about WorkerProcess events
- events *util.EventHandler
+ events util.EventsHandler
// state holds information about current WorkerProcess state,
// number of WorkerProcess executions, buf status change time.
@@ -106,9 +119,9 @@ type WorkerProcess struct {
// can be nil while process is not started.
pid int
- // errBuffer aggregates stderr output from underlying process. Value can be
+ // stderr aggregates stderr output from underlying process. Value can be
// receive only once command is completed and all pipes are closed.
- errBuffer *errBuffer
+ stderr *bytes.Buffer
// channel is being closed once command is complete.
// waitDone chan interface{}
@@ -133,13 +146,14 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) {
events: &util.EventHandler{},
cmd: cmd,
state: newState(StateInactive),
+ stderr: new(bytes.Buffer),
}
- w.errBuffer = newErrBuffer(w.logCallback)
-
- // piping all stderr to command errBuffer
- w.cmd.Stderr = w.errBuffer
+ // small buffer optimization
+ // at this point we know, that stderr will contain huge messages
+ w.stderr.Grow(1024)
+ w.cmd.Stderr = w
return w, nil
}
@@ -156,10 +170,6 @@ func (w *WorkerProcess) Created() time.Time {
// AddListener registers new worker event listener.
func (w *WorkerProcess) AddListener(listener util.EventListener) {
w.events.AddListener(listener)
-
- w.errBuffer.mu.Lock()
- w.errBuffer.enable = true
- w.errBuffer.mu.Unlock()
}
// State return receive-only WorkerProcess state object, state can be used to safely access
@@ -201,9 +211,7 @@ func (w *WorkerProcess) Start() error {
if err != nil {
return err
}
-
w.pid = w.cmd.Process.Pid
-
return nil
}
@@ -211,15 +219,19 @@ func (w *WorkerProcess) Start() error {
// complete and will return process error (if any), if stderr is presented it's value
// will be wrapped as WorkerError. Method will return error code if php process fails
// to find or Start the script.
-func (w *WorkerProcess) Wait(ctx context.Context) error {
+func (w *WorkerProcess) Wait() error {
+ const op = errors.Op("worker process wait")
err := multierr.Combine(w.cmd.Wait())
+ // at this point according to the documentation (see cmd.Wait comment)
+ // if worker finishes with an error, message will be written to the stderr first
+ // and then w.cmd.Wait return an error
w.endState = w.cmd.ProcessState
if err != nil {
w.state.Set(StateErrored)
- // if no errors in the events, error might be in the errBuffer
- if w.errBuffer.Len() > 0 {
- err = multierr.Append(err, errors.New(w.errBuffer.String()))
+ // if process return code > 0, here will be an error from stderr (if presents)
+ if w.stderr.Len() > 0 {
+ err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String())))
}
return multierr.Append(err, w.closeRelay())
@@ -254,10 +266,7 @@ func (w *WorkerProcess) Stop(ctx context.Context) error {
go func() {
var err error
- w.errBuffer.Close()
w.state.Set(StateStopping)
- w.mu.Lock()
- defer w.mu.Unlock()
err = multierr.Append(err, sendControl(w.relay, &stopCommand{Stop: true}))
if err != nil {
w.state.Set(StateKilling)
@@ -282,8 +291,6 @@ func (w *WorkerProcess) Stop(ctx context.Context) error {
// error log from the stderr. Does not waits for process completion!
func (w *WorkerProcess) Kill() error {
w.state.Set(StateKilling)
- w.mu.Lock()
- defer w.mu.Unlock()
err := w.cmd.Process.Signal(os.Kill)
if err != nil {
return err
@@ -292,93 +299,16 @@ func (w *WorkerProcess) Kill() error {
return nil
}
-func (w *WorkerProcess) logCallback(log []byte) {
- w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log})
-}
-
-// thread safe errBuffer
-type errBuffer struct {
- enable bool
- mu sync.RWMutex
- buf []byte
- last int
- wait *time.Timer
- // todo: remove update
- update chan interface{}
- stop chan interface{}
- logCallback func(log []byte)
-}
-
-func newErrBuffer(logCallback func(log []byte)) *errBuffer {
- eb := &errBuffer{
- buf: make([]byte, 0),
- update: make(chan interface{}),
- wait: time.NewTimer(WaitDuration),
- stop: make(chan interface{}),
- logCallback: logCallback,
- }
-
- go func(eb *errBuffer) {
- for {
- select {
- case <-eb.update:
- eb.wait.Reset(WaitDuration)
- case <-eb.wait.C:
- eb.mu.Lock()
- if eb.enable && len(eb.buf) > eb.last {
- eb.logCallback(eb.buf[eb.last:])
- eb.buf = eb.buf[0:0]
- eb.last = len(eb.buf)
- }
- eb.mu.Unlock()
- case <-eb.stop:
- eb.wait.Stop()
-
- eb.mu.Lock()
- if eb.enable && len(eb.buf) > eb.last {
- eb.logCallback(eb.buf[eb.last:])
- eb.last = len(eb.buf)
- }
- eb.mu.Unlock()
- return
- }
- }
- }(eb)
-
- return eb
-}
-
-// Len returns the number of buf of the unread portion of the errBuffer;
-// buf.Len() == len(buf.Bytes()).
-func (eb *errBuffer) Len() int {
- eb.mu.RLock()
- defer eb.mu.RUnlock()
-
- // currently active message
- return len(eb.buf)
-}
-
// Write appends the contents of pool to the errBuffer, growing the errBuffer as
// needed. The return value n is the length of pool; errBuffer is always nil.
-func (eb *errBuffer) Write(p []byte) (int, error) {
- eb.mu.Lock()
- eb.buf = append(eb.buf, p...)
- eb.mu.Unlock()
- eb.update <- nil
+func (w *WorkerProcess) Write(p []byte) (int, error) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+ // clean all previous messages in the stderr
+ w.stderr.Truncate(0)
+ w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: p})
+ // write new message
+ w.stderr.Write(p)
return len(p), nil
}
-
-// Strings fetches all errBuffer data into string.
-func (eb *errBuffer) String() string {
- eb.mu.Lock()
- defer eb.mu.Unlock()
-
- // TODO unsafe operation, use runes
- return string(eb.buf)
-}
-
-// Close aggregation timer.
-func (eb *errBuffer) Close() {
- close(eb.stop)
-}