summaryrefslogtreecommitdiff
path: root/error_buffer.go
diff options
context:
space:
mode:
Diffstat (limited to 'error_buffer.go')
-rw-r--r--error_buffer.go70
1 files changed, 43 insertions, 27 deletions
diff --git a/error_buffer.go b/error_buffer.go
index 27f35e78..8c240e26 100644
--- a/error_buffer.go
+++ b/error_buffer.go
@@ -5,51 +5,67 @@ import (
"sync"
)
-// EventStderrOutput - is triggered when worker sends data into stderr. The context is output data in []bytes form.
+// EventStderrOutput - is triggered when worker sends data into stderr. The context is error message ([]byte).
const EventStderrOutput = 1900
// thread safe errBuffer
type errBuffer struct {
- mu sync.Mutex
- buffer *bytes.Buffer
- lsn func(event int, ctx interface{})
+ mu sync.Mutex
+ buf []byte
+ off int
+ lsn func(event int, ctx interface{})
+}
+
+func newErrBuffer() *errBuffer {
+ buf := &errBuffer{buf: make([]byte, 0)}
+ return buf
}
// Listen attaches error stream even listener.
-func (b *errBuffer) Listen(l func(event int, ctx interface{})) {
- b.mu.Lock()
- defer b.mu.Unlock()
+func (eb *errBuffer) Listen(l func(event int, ctx interface{})) {
+ eb.mu.Lock()
+ defer eb.mu.Unlock()
- b.lsn = l
+ eb.lsn = l
}
-// Len returns the number of bytes of the unread portion of the errBuffer;
-// b.Len() == len(b.Bytes()).
-func (b *errBuffer) Len() int {
- b.mu.Lock()
- defer b.mu.Unlock()
+// Len returns the number of buf of the unread portion of the errBuffer;
+// buf.Len() == len(buf.Bytes()).
+func (eb *errBuffer) Len() int {
+ eb.mu.Lock()
+ defer eb.mu.Unlock()
- return b.buffer.Len()
+ // currently active message
+ return len(eb.buf) - eb.off
}
// Write appends the contents of p to the errBuffer, growing the errBuffer as
-// needed. The return value n is the length of p; err is always nil. If the
-// errBuffer becomes too large, Write will panic with ErrTooLarge.
-func (b *errBuffer) Write(p []byte) (n int, err error) {
- b.mu.Lock()
- defer b.mu.Unlock()
-
- if b.lsn != nil {
- b.lsn(EventStderrOutput, p)
+// needed. The return value n is the length of p; err is always nil.
+func (eb *errBuffer) Write(p []byte) (int, error) {
+ eb.mu.Lock()
+ defer eb.mu.Unlock()
+
+ eb.buf = append(eb.buf, p...)
+ for msg := eb.fetchMsg(); msg != nil; msg = eb.fetchMsg() {
+ eb.lsn(EventStderrOutput, msg)
}
- return b.buffer.Write(p)
+ return len(p), nil
}
// Strings fetches all errBuffer data into string.
-func (b *errBuffer) String() string {
- b.mu.Lock()
- defer b.mu.Unlock()
+func (eb *errBuffer) String() string {
+ eb.mu.Lock()
+ defer eb.mu.Unlock()
+
+ return string(eb.buf[eb.off:])
+}
+
+func (eb *errBuffer) fetchMsg() []byte {
+ if i := bytes.Index(eb.buf[eb.off:], []byte{10, 10}); i != -1 {
+ eb.off += i + 2
+ return eb.buf[eb.off-i-2 : eb.off]
+ }
- return b.buffer.String()
+ return nil
}