diff options
Diffstat (limited to 'error_buffer.go')
-rw-r--r-- | error_buffer.go | 70 |
1 files changed, 43 insertions, 27 deletions
diff --git a/error_buffer.go b/error_buffer.go index 27f35e78..8c240e26 100644 --- a/error_buffer.go +++ b/error_buffer.go @@ -5,51 +5,67 @@ import ( "sync" ) -// EventStderrOutput - is triggered when worker sends data into stderr. The context is output data in []bytes form. +// EventStderrOutput - is triggered when worker sends data into stderr. The context is error message ([]byte). const EventStderrOutput = 1900 // thread safe errBuffer type errBuffer struct { - mu sync.Mutex - buffer *bytes.Buffer - lsn func(event int, ctx interface{}) + mu sync.Mutex + buf []byte + off int + lsn func(event int, ctx interface{}) +} + +func newErrBuffer() *errBuffer { + buf := &errBuffer{buf: make([]byte, 0)} + return buf } // Listen attaches error stream even listener. -func (b *errBuffer) Listen(l func(event int, ctx interface{})) { - b.mu.Lock() - defer b.mu.Unlock() +func (eb *errBuffer) Listen(l func(event int, ctx interface{})) { + eb.mu.Lock() + defer eb.mu.Unlock() - b.lsn = l + eb.lsn = l } -// Len returns the number of bytes of the unread portion of the errBuffer; -// b.Len() == len(b.Bytes()). -func (b *errBuffer) Len() int { - b.mu.Lock() - defer b.mu.Unlock() +// Len returns the number of buf of the unread portion of the errBuffer; +// buf.Len() == len(buf.Bytes()). +func (eb *errBuffer) Len() int { + eb.mu.Lock() + defer eb.mu.Unlock() - return b.buffer.Len() + // currently active message + return len(eb.buf) - eb.off } // Write appends the contents of p to the errBuffer, growing the errBuffer as -// needed. The return value n is the length of p; err is always nil. If the -// errBuffer becomes too large, Write will panic with ErrTooLarge. -func (b *errBuffer) Write(p []byte) (n int, err error) { - b.mu.Lock() - defer b.mu.Unlock() - - if b.lsn != nil { - b.lsn(EventStderrOutput, p) +// needed. The return value n is the length of p; err is always nil. +func (eb *errBuffer) Write(p []byte) (int, error) { + eb.mu.Lock() + defer eb.mu.Unlock() + + eb.buf = append(eb.buf, p...) + for msg := eb.fetchMsg(); msg != nil; msg = eb.fetchMsg() { + eb.lsn(EventStderrOutput, msg) } - return b.buffer.Write(p) + return len(p), nil } // Strings fetches all errBuffer data into string. -func (b *errBuffer) String() string { - b.mu.Lock() - defer b.mu.Unlock() +func (eb *errBuffer) String() string { + eb.mu.Lock() + defer eb.mu.Unlock() + + return string(eb.buf[eb.off:]) +} + +func (eb *errBuffer) fetchMsg() []byte { + if i := bytes.Index(eb.buf[eb.off:], []byte{10, 10}); i != -1 { + eb.off += i + 2 + return eb.buf[eb.off-i-2 : eb.off] + } - return b.buffer.String() + return nil } |