summaryrefslogtreecommitdiff
path: root/error_buffer.go
diff options
context:
space:
mode:
authorWolfy-J <[email protected]>2018-06-23 19:04:50 +0300
committerWolfy-J <[email protected]>2018-06-23 19:04:50 +0300
commit008a42fc6138e74766cdf9011a8dfc60df71b4a0 (patch)
treecdf85cb5536c8416020ca21475751dae0b9117a0 /error_buffer.go
parent918e7de39e16eb293567427f2e8e8c3035690163 (diff)
error aggregation
Diffstat (limited to 'error_buffer.go')
-rw-r--r--error_buffer.go82
1 files changed, 60 insertions, 22 deletions
diff --git a/error_buffer.go b/error_buffer.go
index 78effc9b..8be9c5a8 100644
--- a/error_buffer.go
+++ b/error_buffer.go
@@ -1,24 +1,69 @@
package roadrunner
import (
- "bytes"
"sync"
+ "time"
)
-// EventStderrOutput - is triggered when worker sends data into stderr. The context is error message ([]byte).
-const EventStderrOutput = 1900
+const (
+ // EventStderrOutput - is triggered when worker sends data into stderr. The context is error message ([]byte).
+ EventStderrOutput = 1900
+
+ // WaitDuration - for how long error buffer should attempt to aggregate error messages before merging output
+ // together since lastError update (required to keep error update together).
+ WaitDuration = 100 * time.Millisecond
+)
// thread safe errBuffer
type errBuffer struct {
- mu sync.Mutex
- buf []byte
- off int
- lsn func(event int, ctx interface{})
+ mu sync.Mutex
+ buf []byte
+ last int
+ wait *time.Timer
+ update chan interface{}
+ stop chan interface{}
+ lsn func(event int, ctx interface{})
}
func newErrBuffer() *errBuffer {
- buf := &errBuffer{buf: make([]byte, 0)}
- return buf
+ eb := &errBuffer{
+ buf: make([]byte, 0),
+ update: make(chan interface{}),
+ wait: time.NewTimer(WaitDuration),
+ stop: make(chan interface{}),
+ }
+
+ go func() {
+ for {
+ select {
+ case <-eb.update:
+ eb.wait.Reset(WaitDuration)
+ case <-eb.wait.C:
+ eb.mu.Lock()
+ if len(eb.buf) > eb.last {
+ if eb.lsn != nil {
+ eb.lsn(EventStderrOutput, eb.buf[eb.last:])
+ }
+ eb.last = len(eb.buf)
+ }
+ eb.mu.Unlock()
+ case <-eb.stop:
+ eb.wait.Stop()
+
+ eb.mu.Lock()
+ if len(eb.buf) > eb.last {
+ if eb.lsn != nil {
+ eb.lsn(EventStderrOutput, eb.buf[eb.last:])
+ }
+ eb.last = len(eb.buf)
+ }
+ eb.mu.Unlock()
+ return
+ }
+ }
+ }()
+
+ return eb
}
// Listen attaches error stream even listener.
@@ -36,7 +81,7 @@ func (eb *errBuffer) Len() int {
defer eb.mu.Unlock()
// currently active message
- return len(eb.buf) - eb.off
+ return len(eb.buf)
}
// Write appends the contents of p to the errBuffer, growing the errBuffer as
@@ -46,11 +91,7 @@ func (eb *errBuffer) Write(p []byte) (int, error) {
defer eb.mu.Unlock()
eb.buf = append(eb.buf, p...)
- for msg := eb.fetchMsg(); msg != nil; msg = eb.fetchMsg() {
- if eb.lsn != nil {
- eb.lsn(EventStderrOutput, msg)
- }
- }
+ eb.update <- nil
return len(p), nil
}
@@ -60,14 +101,11 @@ func (eb *errBuffer) String() string {
eb.mu.Lock()
defer eb.mu.Unlock()
- return string(eb.buf[eb.off:])
+ return string(eb.buf)
}
-func (eb *errBuffer) fetchMsg() []byte {
- if i := bytes.Index(eb.buf[eb.off:], []byte{10, 10}); i != -1 {
- eb.off += i + 2
- return eb.buf[eb.off-i-2 : eb.off]
- }
-
+// Close aggregation timer.
+func (eb *errBuffer) Close() error {
+ close(eb.stop)
return nil
}