summaryrefslogtreecommitdiff
path: root/error_buffer.go
diff options
context:
space:
mode:
Diffstat (limited to 'error_buffer.go')
-rw-r--r--error_buffer.go108
1 files changed, 90 insertions, 18 deletions
diff --git a/error_buffer.go b/error_buffer.go
index fcf566c8..8be9c5a8 100644
--- a/error_buffer.go
+++ b/error_buffer.go
@@ -1,39 +1,111 @@
package roadrunner
import (
- "bytes"
"sync"
+ "time"
+)
+
+const (
+ // EventStderrOutput - is triggered when worker sends data into stderr. The context is error message ([]byte).
+ EventStderrOutput = 1900
+
+ // WaitDuration - for how long error buffer should attempt to aggregate error messages before merging output
+ // together since lastError update (required to keep error update together).
+ WaitDuration = 100 * time.Millisecond
)
// thread safe errBuffer
type errBuffer struct {
mu sync.Mutex
- buffer *bytes.Buffer
+ buf []byte
+ last int
+ wait *time.Timer
+ update chan interface{}
+ stop chan interface{}
+ lsn func(event int, ctx interface{})
+}
+
+func newErrBuffer() *errBuffer {
+ eb := &errBuffer{
+ buf: make([]byte, 0),
+ update: make(chan interface{}),
+ wait: time.NewTimer(WaitDuration),
+ stop: make(chan interface{}),
+ }
+
+ go func() {
+ for {
+ select {
+ case <-eb.update:
+ eb.wait.Reset(WaitDuration)
+ case <-eb.wait.C:
+ eb.mu.Lock()
+ if len(eb.buf) > eb.last {
+ if eb.lsn != nil {
+ eb.lsn(EventStderrOutput, eb.buf[eb.last:])
+ }
+ eb.last = len(eb.buf)
+ }
+ eb.mu.Unlock()
+ case <-eb.stop:
+ eb.wait.Stop()
+
+ eb.mu.Lock()
+ if len(eb.buf) > eb.last {
+ if eb.lsn != nil {
+ eb.lsn(EventStderrOutput, eb.buf[eb.last:])
+ }
+ eb.last = len(eb.buf)
+ }
+ eb.mu.Unlock()
+ return
+ }
+ }
+ }()
+
+ return eb
+}
+
+// Listen attaches error stream even listener.
+func (eb *errBuffer) Listen(l func(event int, ctx interface{})) {
+ eb.mu.Lock()
+ defer eb.mu.Unlock()
+
+ eb.lsn = l
}
-// Len returns the number of bytes of the unread portion of the errBuffer;
-// b.Len() == len(b.Bytes()).
-func (b *errBuffer) Len() int {
- b.mu.Lock()
- defer b.mu.Unlock()
+// Len returns the number of buf of the unread portion of the errBuffer;
+// buf.Len() == len(buf.Bytes()).
+func (eb *errBuffer) Len() int {
+ eb.mu.Lock()
+ defer eb.mu.Unlock()
- return b.buffer.Len()
+ // currently active message
+ return len(eb.buf)
}
// Write appends the contents of p to the errBuffer, growing the errBuffer as
-// needed. The return value n is the length of p; err is always nil. If the
-// errBuffer becomes too large, Write will panic with ErrTooLarge.
-func (b *errBuffer) Write(p []byte) (n int, err error) {
- b.mu.Lock()
- defer b.mu.Unlock()
+// needed. The return value n is the length of p; err is always nil.
+func (eb *errBuffer) Write(p []byte) (int, error) {
+ eb.mu.Lock()
+ defer eb.mu.Unlock()
- return b.buffer.Write(p)
+ eb.buf = append(eb.buf, p...)
+ eb.update <- nil
+
+ return len(p), nil
}
// Strings fetches all errBuffer data into string.
-func (b *errBuffer) String() string {
- b.mu.Lock()
- defer b.mu.Unlock()
+func (eb *errBuffer) String() string {
+ eb.mu.Lock()
+ defer eb.mu.Unlock()
+
+ return string(eb.buf)
+}
- return b.buffer.String()
+// Close aggregation timer.
+func (eb *errBuffer) Close() error {
+ close(eb.stop)
+ return nil
}