diff options
Diffstat (limited to 'error_buffer.go')
-rw-r--r-- | error_buffer.go | 82 |
1 files changed, 60 insertions, 22 deletions
diff --git a/error_buffer.go b/error_buffer.go index 78effc9b..8be9c5a8 100644 --- a/error_buffer.go +++ b/error_buffer.go @@ -1,24 +1,69 @@ package roadrunner import ( - "bytes" "sync" + "time" ) -// EventStderrOutput - is triggered when worker sends data into stderr. The context is error message ([]byte). -const EventStderrOutput = 1900 +const ( + // EventStderrOutput - is triggered when worker sends data into stderr. The context is error message ([]byte). + EventStderrOutput = 1900 + + // WaitDuration - for how long error buffer should attempt to aggregate error messages before merging output + // together since lastError update (required to keep error update together). + WaitDuration = 100 * time.Millisecond +) // thread safe errBuffer type errBuffer struct { - mu sync.Mutex - buf []byte - off int - lsn func(event int, ctx interface{}) + mu sync.Mutex + buf []byte + last int + wait *time.Timer + update chan interface{} + stop chan interface{} + lsn func(event int, ctx interface{}) } func newErrBuffer() *errBuffer { - buf := &errBuffer{buf: make([]byte, 0)} - return buf + eb := &errBuffer{ + buf: make([]byte, 0), + update: make(chan interface{}), + wait: time.NewTimer(WaitDuration), + stop: make(chan interface{}), + } + + go func() { + for { + select { + case <-eb.update: + eb.wait.Reset(WaitDuration) + case <-eb.wait.C: + eb.mu.Lock() + if len(eb.buf) > eb.last { + if eb.lsn != nil { + eb.lsn(EventStderrOutput, eb.buf[eb.last:]) + } + eb.last = len(eb.buf) + } + eb.mu.Unlock() + case <-eb.stop: + eb.wait.Stop() + + eb.mu.Lock() + if len(eb.buf) > eb.last { + if eb.lsn != nil { + eb.lsn(EventStderrOutput, eb.buf[eb.last:]) + } + eb.last = len(eb.buf) + } + eb.mu.Unlock() + return + } + } + }() + + return eb } // Listen attaches error stream even listener. @@ -36,7 +81,7 @@ func (eb *errBuffer) Len() int { defer eb.mu.Unlock() // currently active message - return len(eb.buf) - eb.off + return len(eb.buf) } // Write appends the contents of p to the errBuffer, growing the errBuffer as @@ -46,11 +91,7 @@ func (eb *errBuffer) Write(p []byte) (int, error) { defer eb.mu.Unlock() eb.buf = append(eb.buf, p...) - for msg := eb.fetchMsg(); msg != nil; msg = eb.fetchMsg() { - if eb.lsn != nil { - eb.lsn(EventStderrOutput, msg) - } - } + eb.update <- nil return len(p), nil } @@ -60,14 +101,11 @@ func (eb *errBuffer) String() string { eb.mu.Lock() defer eb.mu.Unlock() - return string(eb.buf[eb.off:]) + return string(eb.buf) } -func (eb *errBuffer) fetchMsg() []byte { - if i := bytes.Index(eb.buf[eb.off:], []byte{10, 10}); i != -1 { - eb.off += i + 2 - return eb.buf[eb.off-i-2 : eb.off] - } - +// Close aggregation timer. +func (eb *errBuffer) Close() error { + close(eb.stop) return nil } |