From 38d694411abbdb0c31b08b96452fa0604a93418a Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sat, 23 Jun 2018 13:15:33 +0300 Subject: support for realtime error aggegration --- error_buffer.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) (limited to 'error_buffer.go') diff --git a/error_buffer.go b/error_buffer.go index fcf566c8..27f35e78 100644 --- a/error_buffer.go +++ b/error_buffer.go @@ -5,10 +5,22 @@ import ( "sync" ) +// EventStderrOutput - is triggered when worker sends data into stderr. The context is output data in []bytes form. +const EventStderrOutput = 1900 + // thread safe errBuffer type errBuffer struct { mu sync.Mutex buffer *bytes.Buffer + lsn func(event int, ctx interface{}) +} + +// Listen attaches error stream even listener. +func (b *errBuffer) Listen(l func(event int, ctx interface{})) { + b.mu.Lock() + defer b.mu.Unlock() + + b.lsn = l } // Len returns the number of bytes of the unread portion of the errBuffer; @@ -27,6 +39,10 @@ func (b *errBuffer) Write(p []byte) (n int, err error) { b.mu.Lock() defer b.mu.Unlock() + if b.lsn != nil { + b.lsn(EventStderrOutput, p) + } + return b.buffer.Write(p) } -- cgit v1.2.3 From 80bd89f91fd0055579717f3cc65b39176eb09000 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sat, 23 Jun 2018 17:06:07 +0300 Subject: realtime error handling and displaying --- error_buffer.go | 70 +++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 43 insertions(+), 27 deletions(-) (limited to 'error_buffer.go') diff --git a/error_buffer.go b/error_buffer.go index 27f35e78..8c240e26 100644 --- a/error_buffer.go +++ b/error_buffer.go @@ -5,51 +5,67 @@ import ( "sync" ) -// EventStderrOutput - is triggered when worker sends data into stderr. The context is output data in []bytes form. +// EventStderrOutput - is triggered when worker sends data into stderr. The context is error message ([]byte). const EventStderrOutput = 1900 // thread safe errBuffer type errBuffer struct { - mu sync.Mutex - buffer *bytes.Buffer - lsn func(event int, ctx interface{}) + mu sync.Mutex + buf []byte + off int + lsn func(event int, ctx interface{}) +} + +func newErrBuffer() *errBuffer { + buf := &errBuffer{buf: make([]byte, 0)} + return buf } // Listen attaches error stream even listener. -func (b *errBuffer) Listen(l func(event int, ctx interface{})) { - b.mu.Lock() - defer b.mu.Unlock() +func (eb *errBuffer) Listen(l func(event int, ctx interface{})) { + eb.mu.Lock() + defer eb.mu.Unlock() - b.lsn = l + eb.lsn = l } -// Len returns the number of bytes of the unread portion of the errBuffer; -// b.Len() == len(b.Bytes()). -func (b *errBuffer) Len() int { - b.mu.Lock() - defer b.mu.Unlock() +// Len returns the number of buf of the unread portion of the errBuffer; +// buf.Len() == len(buf.Bytes()). +func (eb *errBuffer) Len() int { + eb.mu.Lock() + defer eb.mu.Unlock() - return b.buffer.Len() + // currently active message + return len(eb.buf) - eb.off } // Write appends the contents of p to the errBuffer, growing the errBuffer as -// needed. The return value n is the length of p; err is always nil. If the -// errBuffer becomes too large, Write will panic with ErrTooLarge. -func (b *errBuffer) Write(p []byte) (n int, err error) { - b.mu.Lock() - defer b.mu.Unlock() - - if b.lsn != nil { - b.lsn(EventStderrOutput, p) +// needed. The return value n is the length of p; err is always nil. +func (eb *errBuffer) Write(p []byte) (int, error) { + eb.mu.Lock() + defer eb.mu.Unlock() + + eb.buf = append(eb.buf, p...) + for msg := eb.fetchMsg(); msg != nil; msg = eb.fetchMsg() { + eb.lsn(EventStderrOutput, msg) } - return b.buffer.Write(p) + return len(p), nil } // Strings fetches all errBuffer data into string. -func (b *errBuffer) String() string { - b.mu.Lock() - defer b.mu.Unlock() +func (eb *errBuffer) String() string { + eb.mu.Lock() + defer eb.mu.Unlock() + + return string(eb.buf[eb.off:]) +} + +func (eb *errBuffer) fetchMsg() []byte { + if i := bytes.Index(eb.buf[eb.off:], []byte{10, 10}); i != -1 { + eb.off += i + 2 + return eb.buf[eb.off-i-2 : eb.off] + } - return b.buffer.String() + return nil } -- cgit v1.2.3 From 0d926f48f215eedb3f36e991c267301da7906574 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sat, 23 Jun 2018 17:11:34 +0300 Subject: realtime error handling and displaying --- error_buffer.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'error_buffer.go') diff --git a/error_buffer.go b/error_buffer.go index 8c240e26..78effc9b 100644 --- a/error_buffer.go +++ b/error_buffer.go @@ -47,7 +47,9 @@ func (eb *errBuffer) Write(p []byte) (int, error) { eb.buf = append(eb.buf, p...) for msg := eb.fetchMsg(); msg != nil; msg = eb.fetchMsg() { - eb.lsn(EventStderrOutput, msg) + if eb.lsn != nil { + eb.lsn(EventStderrOutput, msg) + } } return len(p), nil -- cgit v1.2.3 From 008a42fc6138e74766cdf9011a8dfc60df71b4a0 Mon Sep 17 00:00:00 2001 From: Wolfy-J Date: Sat, 23 Jun 2018 19:04:50 +0300 Subject: error aggregation --- error_buffer.go | 82 +++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 60 insertions(+), 22 deletions(-) (limited to 'error_buffer.go') diff --git a/error_buffer.go b/error_buffer.go index 78effc9b..8be9c5a8 100644 --- a/error_buffer.go +++ b/error_buffer.go @@ -1,24 +1,69 @@ package roadrunner import ( - "bytes" "sync" + "time" ) -// EventStderrOutput - is triggered when worker sends data into stderr. The context is error message ([]byte). -const EventStderrOutput = 1900 +const ( + // EventStderrOutput - is triggered when worker sends data into stderr. The context is error message ([]byte). + EventStderrOutput = 1900 + + // WaitDuration - for how long error buffer should attempt to aggregate error messages before merging output + // together since lastError update (required to keep error update together). + WaitDuration = 100 * time.Millisecond +) // thread safe errBuffer type errBuffer struct { - mu sync.Mutex - buf []byte - off int - lsn func(event int, ctx interface{}) + mu sync.Mutex + buf []byte + last int + wait *time.Timer + update chan interface{} + stop chan interface{} + lsn func(event int, ctx interface{}) } func newErrBuffer() *errBuffer { - buf := &errBuffer{buf: make([]byte, 0)} - return buf + eb := &errBuffer{ + buf: make([]byte, 0), + update: make(chan interface{}), + wait: time.NewTimer(WaitDuration), + stop: make(chan interface{}), + } + + go func() { + for { + select { + case <-eb.update: + eb.wait.Reset(WaitDuration) + case <-eb.wait.C: + eb.mu.Lock() + if len(eb.buf) > eb.last { + if eb.lsn != nil { + eb.lsn(EventStderrOutput, eb.buf[eb.last:]) + } + eb.last = len(eb.buf) + } + eb.mu.Unlock() + case <-eb.stop: + eb.wait.Stop() + + eb.mu.Lock() + if len(eb.buf) > eb.last { + if eb.lsn != nil { + eb.lsn(EventStderrOutput, eb.buf[eb.last:]) + } + eb.last = len(eb.buf) + } + eb.mu.Unlock() + return + } + } + }() + + return eb } // Listen attaches error stream even listener. @@ -36,7 +81,7 @@ func (eb *errBuffer) Len() int { defer eb.mu.Unlock() // currently active message - return len(eb.buf) - eb.off + return len(eb.buf) } // Write appends the contents of p to the errBuffer, growing the errBuffer as @@ -46,11 +91,7 @@ func (eb *errBuffer) Write(p []byte) (int, error) { defer eb.mu.Unlock() eb.buf = append(eb.buf, p...) - for msg := eb.fetchMsg(); msg != nil; msg = eb.fetchMsg() { - if eb.lsn != nil { - eb.lsn(EventStderrOutput, msg) - } - } + eb.update <- nil return len(p), nil } @@ -60,14 +101,11 @@ func (eb *errBuffer) String() string { eb.mu.Lock() defer eb.mu.Unlock() - return string(eb.buf[eb.off:]) + return string(eb.buf) } -func (eb *errBuffer) fetchMsg() []byte { - if i := bytes.Index(eb.buf[eb.off:], []byte{10, 10}); i != -1 { - eb.off += i + 2 - return eb.buf[eb.off-i-2 : eb.off] - } - +// Close aggregation timer. +func (eb *errBuffer) Close() error { + close(eb.stop) return nil } -- cgit v1.2.3