diff options
author | Valery Piashchynski <[email protected]> | 2020-11-26 01:56:32 +0300 |
---|---|---|
committer | Valery Piashchynski <[email protected]> | 2020-11-26 01:56:32 +0300 |
commit | 5a689337cc557d6d57e20277cac1e31b6878e142 (patch) | |
tree | 3f06c252b5a1e7d5f4e58624a12b4dc008e81028 | |
parent | 85ca6f6b488b3f144b5e4343cd7ada1237de1602 (diff) |
error buffer update (remove)
-rwxr-xr-x | sync_worker.go | 2 | ||||
-rwxr-xr-x | worker.go | 202 | ||||
-rwxr-xr-x | worker_test.go | 226 |
3 files changed, 221 insertions, 209 deletions
diff --git a/sync_worker.go b/sync_worker.go index a9c53553..6f5b20d9 100755 --- a/sync_worker.go +++ b/sync_worker.go @@ -71,7 +71,7 @@ type wexec struct { // Exec payload without TTL timeout. func (tw *syncWorker) ExecWithContext(ctx context.Context, p Payload) (Payload, error) { - const op = errors.Op("exec_with_context") + const op = errors.Op("ExecWithContext") c := make(chan wexec, 1) go func() { if len(p.Body) == 0 && len(p.Context) == 0 { @@ -1,8 +1,9 @@ package roadrunner import ( + "bytes" + //"bytes" "context" - "errors" "fmt" "os" "os/exec" @@ -11,6 +12,7 @@ import ( "sync" "time" + "github.com/spiral/errors" "github.com/spiral/roadrunner/v2/util" "github.com/spiral/goridge/v2" @@ -101,7 +103,7 @@ type WorkerProcess struct { created time.Time // updates parent supervisor or pool about WorkerProcess events - events *util.EventHandler + events util.EventsHandler // state holds information about current WorkerProcess state, // number of WorkerProcess executions, buf status change time. @@ -120,7 +122,9 @@ type WorkerProcess struct { // errBuffer aggregates stderr output from underlying process. Value can be // receive only once command is completed and all pipes are closed. - errBuffer *errBuffer + // errBuffer *errBuffer + + stderr *bytes.Buffer // channel is being closed once command is complete. // waitDone chan interface{} @@ -145,13 +149,18 @@ func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error) { events: &util.EventHandler{}, cmd: cmd, state: newState(StateInactive), + stderr: new(bytes.Buffer), } - w.errBuffer = newErrBuffer(w.logCallback) + // small buffer optimization + // at this point we know, that stderr will contain huge messages + w.stderr.Grow(1024) + //w.errBuffer = newErrBuffer(w.logCallback) // piping all stderr to command errBuffer - w.cmd.Stderr = w.errBuffer + //w.cmd.Stderr = w.errBuffer + w.cmd.Stderr = w return w, nil } @@ -169,9 +178,9 @@ func (w *WorkerProcess) Created() time.Time { func (w *WorkerProcess) AddListener(listener util.EventListener) { w.events.AddListener(listener) - w.errBuffer.mu.Lock() - w.errBuffer.enable = true - w.errBuffer.mu.Unlock() + //w.errBuffer.mu.Lock() + //w.errBuffer.enable = true + //w.errBuffer.mu.Unlock() } // State return receive-only WorkerProcess state object, state can be used to safely access @@ -224,16 +233,26 @@ func (w *WorkerProcess) Start() error { // will be wrapped as WorkerError. Method will return error code if php process fails // to find or Start the script. func (w *WorkerProcess) Wait(ctx context.Context) error { + const op = errors.Op("worker process wait") err := multierr.Combine(w.cmd.Wait()) + // at this point according to the documentation (see cmd.Wait comment) + // if worker finishes with an error, message will be written to the stderr first + // and then w.cmd.Wait return an error w.endState = w.cmd.ProcessState if err != nil { w.state.Set(StateErrored) - // if no errors in the events, error might be in the errBuffer - if w.errBuffer.Len() > 0 { - err = multierr.Append(err, errors.New(w.errBuffer.String())) + // if process return code > 0, here will be an error from stderr (if presents) + if w.stderr.Len() > 0 { + err = multierr.Append(err, errors.E(op, errors.Str(w.stderr.String()))) + w.stderr.Truncate(0) } + // if no errors in the events, error might be in the errBuffer + //if w.errBuffer.Len() > 0 { + // err = multierr.Append(err, errors.New(w.errBuffer.String())) + //} + return multierr.Append(err, w.closeRelay()) } @@ -262,14 +281,14 @@ func (w *WorkerProcess) closeRelay() error { // Stop sends soft termination command to the WorkerProcess and waits for process completion. func (w *WorkerProcess) Stop(ctx context.Context) error { + w.mu.Lock() + defer w.mu.Unlock() c := make(chan error) go func() { var err error - w.errBuffer.Close() + //w.errBuffer.Close() w.state.Set(StateStopping) - w.mu.Lock() - defer w.mu.Unlock() err = multierr.Append(err, sendControl(w.relay, &stopCommand{Stop: true})) if err != nil { w.state.Set(StateKilling) @@ -294,8 +313,6 @@ func (w *WorkerProcess) Stop(ctx context.Context) error { // error log from the stderr. Does not waits for process completion! func (w *WorkerProcess) Kill() error { w.state.Set(StateKilling) - w.mu.Lock() - defer w.mu.Unlock() err := w.cmd.Process.Signal(os.Kill) if err != nil { return err @@ -304,93 +321,88 @@ func (w *WorkerProcess) Kill() error { return nil } -func (w *WorkerProcess) logCallback(log []byte) { - w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log}) -} - -// thread safe errBuffer -type errBuffer struct { - enable bool - mu sync.RWMutex - buf []byte - last int - wait *time.Timer - // todo: remove update - update chan interface{} - stop chan interface{} - logCallback func(log []byte) -} - -func newErrBuffer(logCallback func(log []byte)) *errBuffer { - eb := &errBuffer{ - buf: make([]byte, 0), - update: make(chan interface{}), - wait: time.NewTimer(WaitDuration), - stop: make(chan interface{}), - logCallback: logCallback, - } - - go func(eb *errBuffer) { - for { - select { - case <-eb.update: - eb.wait.Reset(WaitDuration) - case <-eb.wait.C: - eb.mu.Lock() - if eb.enable && len(eb.buf) > eb.last { - eb.logCallback(eb.buf[eb.last:]) - eb.buf = eb.buf[0:0] - eb.last = len(eb.buf) - } - eb.mu.Unlock() - case <-eb.stop: - eb.wait.Stop() - - eb.mu.Lock() - if eb.enable && len(eb.buf) > eb.last { - eb.logCallback(eb.buf[eb.last:]) - eb.last = len(eb.buf) - } - eb.mu.Unlock() - return - } - } - }(eb) - - return eb -} +//func (w *WorkerProcess) logCallback(log []byte) { +// w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: log}) +//} // Len returns the number of buf of the unread portion of the errBuffer; // buf.Len() == len(buf.Bytes()). -func (eb *errBuffer) Len() int { - eb.mu.RLock() - defer eb.mu.RUnlock() - - // currently active message - return len(eb.buf) -} +//func (w *WorkerProcess) Len() int { +// //eb.mu.RLock() +// //defer eb.mu.RUnlock() +// +// // currently active message +// return len(w.stderr) +//} +// Writer is the interface that wraps the basic Write method. +// // Write appends the contents of pool to the errBuffer, growing the errBuffer as // needed. The return value n is the length of pool; errBuffer is always nil. -func (eb *errBuffer) Write(p []byte) (int, error) { - eb.mu.Lock() - eb.buf = append(eb.buf, p...) - eb.mu.Unlock() - eb.update <- nil +func (w *WorkerProcess) Write(p []byte) (int, error) { + w.mu.Lock() + defer w.mu.Unlock() + w.events.Push(WorkerEvent{Event: EventWorkerLog, Worker: w, Payload: p}) + w.stderr.Write(p) + //eb.mu.Unlock() + //eb.update <- nil return len(p), nil } -// Strings fetches all errBuffer data into string. -func (eb *errBuffer) String() string { - eb.mu.Lock() - defer eb.mu.Unlock() - - // TODO unsafe operation, use runes - return string(eb.buf) -} - // Close aggregation timer. -func (eb *errBuffer) Close() { - close(eb.stop) -} +//func (w *WorkerProcess) Close() { +// //close(eb.stop) +//} + +// thread safe errBuffer +//type errBuffer struct { +// enable bool +// mu sync.RWMutex +// buf []byte +// last int +// wait *time.Timer +// // todo: remove update +// update chan interface{} +// stop chan interface{} +// logCallback func(log []byte) +//} +// +//func newErrBuffer(logCallback func(log []byte)) *errBuffer { +// eb := &errBuffer{ +// buf: make([]byte, 0), +// update: make(chan interface{}), +// wait: time.NewTimer(WaitDuration), +// stop: make(chan interface{}), +// logCallback: logCallback, +// } +// +// go func(eb *errBuffer) { +// for { +// select { +// case <-eb.update: +// eb.wait.Reset(WaitDuration) +// case <-eb.wait.C: +// eb.mu.Lock() +// if eb.enable && len(eb.buf) > eb.last { +// eb.logCallback(eb.buf[eb.last:]) +// eb.buf = eb.buf[0:0] +// eb.last = len(eb.buf) +// } +// eb.mu.Unlock() +// case <-eb.stop: +// eb.wait.Stop() +// +// eb.mu.Lock() +// if eb.enable && len(eb.buf) > eb.last { +// eb.logCallback(eb.buf[eb.last:]) +// eb.last = len(eb.buf) +// } +// eb.mu.Unlock() +// return +// } +// } +// }(eb) +// +// return eb +//} diff --git a/worker_test.go b/worker_test.go index 78738064..43bb8b65 100755 --- a/worker_test.go +++ b/worker_test.go @@ -65,116 +65,116 @@ func Test_OnStarted(t *testing.T) { assert.Equal(t, "can't attach to running process", err.Error()) } -func TestErrBuffer_Write_Len(t *testing.T) { - buf := newErrBuffer(nil) - defer func() { - buf.Close() - }() - - _, err := buf.Write([]byte("hello")) - if err != nil { - t.Errorf("fail to write: error %v", err) - } - assert.Equal(t, 5, buf.Len()) - assert.Equal(t, "hello", buf.String()) -} - -func TestErrBuffer_Write_Event(t *testing.T) { - buf := newErrBuffer(nil) - defer func() { - buf.Close() - }() - - wg := &sync.WaitGroup{} - wg.Add(1) - buf.logCallback = func(log []byte) { - assert.Equal(t, []byte("hello\n"), log) - wg.Done() - } - buf.enable = true - - _, err := buf.Write([]byte("hello\n")) - if err != nil { - t.Errorf("fail to write: error %v", err) - } - - wg.Wait() - - // messages are read - assert.Equal(t, 0, buf.Len()) -} - -func TestErrBuffer_Write_Event_Separated(t *testing.T) { - buf := newErrBuffer(nil) - defer func() { - buf.Close() - }() - - wg := &sync.WaitGroup{} - wg.Add(1) - - buf.logCallback = func(log []byte) { - assert.Equal(t, []byte("hello\nending"), log) - wg.Done() - } - buf.enable = true - - _, err := buf.Write([]byte("hel")) - if err != nil { - t.Errorf("fail to write: error %v", err) - } - - _, err = buf.Write([]byte("lo\n")) - if err != nil { - t.Errorf("fail to write: error %v", err) - } - - _, err = buf.Write([]byte("ending")) - if err != nil { - t.Errorf("fail to write: error %v", err) - } - - wg.Wait() - assert.Equal(t, 0, buf.Len()) - assert.Equal(t, "", buf.String()) -} - -func TestErrBuffer_Write_Event_Separated_NoListener(t *testing.T) { - buf := newErrBuffer(nil) - defer func() { - buf.Close() - }() - - _, err := buf.Write([]byte("hel")) - if err != nil { - t.Errorf("fail to write: error %v", err) - } - - _, err = buf.Write([]byte("lo\n")) - if err != nil { - t.Errorf("fail to write: error %v", err) - } - - _, err = buf.Write([]byte("ending")) - if err != nil { - t.Errorf("fail to write: error %v", err) - } - - assert.Equal(t, 12, buf.Len()) - assert.Equal(t, "hello\nending", buf.String()) -} - -func TestErrBuffer_Write_Remaining(t *testing.T) { - buf := newErrBuffer(nil) - defer func() { - buf.Close() - }() - - _, err := buf.Write([]byte("hel")) - if err != nil { - t.Errorf("fail to write: error %v", err) - } - - assert.Equal(t, 3, buf.Len()) - assert.Equal(t, "hel", buf.String()) -} +//func TestErrBuffer_Write_Len(t *testing.T) { +// buf := newErrBuffer(nil) +// defer func() { +// buf.Close() +// }() +// +// _, err := buf.Write([]byte("hello")) +// if err != nil { +// t.Errorf("fail to write: error %v", err) +// } +// assert.Equal(t, 5, buf.Len()) +// assert.Equal(t, "hello", buf.String()) +//} +// +//func TestErrBuffer_Write_Event(t *testing.T) { +// buf := newErrBuffer(nil) +// defer func() { +// buf.Close() +// }() +// +// wg := &sync.WaitGroup{} +// wg.Add(1) +// buf.logCallback = func(log []byte) { +// assert.Equal(t, []byte("hello\n"), log) +// wg.Done() +// } +// buf.enable = true +// +// _, err := buf.Write([]byte("hello\n")) +// if err != nil { +// t.Errorf("fail to write: error %v", err) +// } +// +// wg.Wait() +// +// // messages are read +// assert.Equal(t, 0, buf.Len()) +//} +// +//func TestErrBuffer_Write_Event_Separated(t *testing.T) { +// buf := newErrBuffer(nil) +// defer func() { +// buf.Close() +// }() +// +// wg := &sync.WaitGroup{} +// wg.Add(1) +// +// buf.logCallback = func(log []byte) { +// assert.Equal(t, []byte("hello\nending"), log) +// wg.Done() +// } +// buf.enable = true +// +// _, err := buf.Write([]byte("hel")) +// if err != nil { +// t.Errorf("fail to write: error %v", err) +// } +// +// _, err = buf.Write([]byte("lo\n")) +// if err != nil { +// t.Errorf("fail to write: error %v", err) +// } +// +// _, err = buf.Write([]byte("ending")) +// if err != nil { +// t.Errorf("fail to write: error %v", err) +// } +// +// wg.Wait() +// assert.Equal(t, 0, buf.Len()) +// assert.Equal(t, "", buf.String()) +//} +// +//func TestErrBuffer_Write_Event_Separated_NoListener(t *testing.T) { +// buf := newErrBuffer(nil) +// defer func() { +// buf.Close() +// }() +// +// _, err := buf.Write([]byte("hel")) +// if err != nil { +// t.Errorf("fail to write: error %v", err) +// } +// +// _, err = buf.Write([]byte("lo\n")) +// if err != nil { +// t.Errorf("fail to write: error %v", err) +// } +// +// _, err = buf.Write([]byte("ending")) +// if err != nil { +// t.Errorf("fail to write: error %v", err) +// } +// +// assert.Equal(t, 12, buf.Len()) +// assert.Equal(t, "hello\nending", buf.String()) +//} +// +//func TestErrBuffer_Write_Remaining(t *testing.T) { +// buf := newErrBuffer(nil) +// defer func() { +// buf.Close() +// }() +// +// _, err := buf.Write([]byte("hel")) +// if err != nil { +// t.Errorf("fail to write: error %v", err) +// } +// +// assert.Equal(t, 3, buf.Len()) +// assert.Equal(t, "hel", buf.String()) +//} |